|
|
|
@ -1,33 +1,35 @@
|
|
|
|
|
package xyz.wbsite.achat.core.message;
|
|
|
|
|
|
|
|
|
|
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
|
|
|
|
import org.springframework.http.codec.ServerSentEvent;
|
|
|
|
|
import reactor.core.publisher.Flux;
|
|
|
|
|
import reactor.core.publisher.FluxSink;
|
|
|
|
|
import xyz.wbsite.achat.core.base.Event;
|
|
|
|
|
import xyz.wbsite.achat.core.event.CompleteEvent;
|
|
|
|
|
import xyz.wbsite.achat.core.event.PartialEvent;
|
|
|
|
|
import xyz.wbsite.achat.core.prompt.MessagePrompt;
|
|
|
|
|
import xyz.wbsite.achat.core.service.MessageGenerator;
|
|
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 对话SSE发射器
|
|
|
|
|
* 对话响应式SSE处理器
|
|
|
|
|
* 只负责SSE通信,不包含业务逻辑
|
|
|
|
|
*
|
|
|
|
|
* @author wangbing
|
|
|
|
|
* @version 0.0.1
|
|
|
|
|
* @since 1.8
|
|
|
|
|
*/
|
|
|
|
|
public class MessageSseEmitter extends SseEmitter {
|
|
|
|
|
public class MessageSseEmitter {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 用户消息
|
|
|
|
|
*/
|
|
|
|
|
private MessagePrompt messagePrompt;
|
|
|
|
|
private final MessagePrompt messagePrompt;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 是否完成
|
|
|
|
|
*/
|
|
|
|
|
private boolean complete;
|
|
|
|
|
private final AtomicBoolean complete = new AtomicBoolean(false);
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* AI回答
|
|
|
|
@ -37,58 +39,39 @@ public class MessageSseEmitter extends SseEmitter {
|
|
|
|
|
/**
|
|
|
|
|
* 消息处理器
|
|
|
|
|
*/
|
|
|
|
|
private MessageGenerator messageGenerator;
|
|
|
|
|
private final MessageGenerator messageGenerator; // 可能为null,使用时需要检查
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Flux接收器
|
|
|
|
|
*/
|
|
|
|
|
private FluxSink<ServerSentEvent<?>> sink;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 构造函数
|
|
|
|
|
*
|
|
|
|
|
* @param message 消息对象
|
|
|
|
|
* @param processor 消息处理器
|
|
|
|
|
*/
|
|
|
|
|
public MessageSseEmitter(MessagePrompt message, MessageGenerator processor) {
|
|
|
|
|
super(0L);
|
|
|
|
|
if (message == null) {
|
|
|
|
|
throw new IllegalArgumentException("MessagePrompt cannot be null");
|
|
|
|
|
}
|
|
|
|
|
this.messagePrompt = message;
|
|
|
|
|
this.messageGenerator = processor;
|
|
|
|
|
// TaskUtil.taskAsync(task);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 消息处理任务
|
|
|
|
|
*/
|
|
|
|
|
// public Runnable task = () -> {
|
|
|
|
|
// try {
|
|
|
|
|
// // 检查会话是否存在
|
|
|
|
|
// if (!messageProcessor.checkSessionExists(message.getChatId(), uid)) {
|
|
|
|
|
// this.sendMessage(createPartialMessage("当前会话不存在,请刷新后再试!"));
|
|
|
|
|
// return;
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// // 更新新会话的标题(如果为空)
|
|
|
|
|
// messageProcessor.updateSessionTitleIfEmpty(message.getChatId(), message.getText(), uid);
|
|
|
|
|
//
|
|
|
|
|
// // 保存本次用户消息
|
|
|
|
|
// messageProcessor.saveUserMessage(message.getChatId(), message.getText(), uid);
|
|
|
|
|
//
|
|
|
|
|
// // 根据是否有附件选择不同的处理方式
|
|
|
|
|
// TokenStream tokenStream;
|
|
|
|
|
// if (this.hasAttachment()) {
|
|
|
|
|
// String attachment = messageProcessor.parseAttachment(message.getAttachments(), uid);
|
|
|
|
|
// tokenStream = messageProcessor.createAssistantStreamWithAttachment(
|
|
|
|
|
// message.getChatId(), message.getText(), attachment, uid);
|
|
|
|
|
// } else {
|
|
|
|
|
// tokenStream = messageProcessor.createAssistantStream(
|
|
|
|
|
// message.getChatId(), message.getText(), uid);
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// // 设置流回调
|
|
|
|
|
// tokenStream
|
|
|
|
|
// .onPartialResponse(this::onPartialResponse)
|
|
|
|
|
// .onCompleteResponse(this::onCompleteResponse)
|
|
|
|
|
// .onError(this::onError)
|
|
|
|
|
// .start();
|
|
|
|
|
// } catch (Exception e) {
|
|
|
|
|
// onError(e);
|
|
|
|
|
// }
|
|
|
|
|
// };
|
|
|
|
|
this.messageGenerator = processor; // 可能为null,使用时需要检查
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 创建响应式流
|
|
|
|
|
*/
|
|
|
|
|
public Flux<ServerSentEvent<?>> createFlux() {
|
|
|
|
|
return Flux.create(emitter -> {
|
|
|
|
|
this.sink = emitter;
|
|
|
|
|
// 设置取消回调
|
|
|
|
|
emitter.onCancel(() -> complete.set(true));
|
|
|
|
|
// 设置释放回调(当sink完成或取消时调用)
|
|
|
|
|
emitter.onDispose(() -> complete.set(true));
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 错误处理
|
|
|
|
@ -103,7 +86,7 @@ public class MessageSseEmitter extends SseEmitter {
|
|
|
|
|
* 部分响应处理
|
|
|
|
|
*/
|
|
|
|
|
public void onPartialResponse(String msg) {
|
|
|
|
|
if (complete) {
|
|
|
|
|
if (complete.get()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
this.sendMessage(createPartialMessage(msg));
|
|
|
|
@ -114,7 +97,7 @@ public class MessageSseEmitter extends SseEmitter {
|
|
|
|
|
* 完成响应处理
|
|
|
|
|
*/
|
|
|
|
|
public void onCompleteResponse(Object chatResponse) {
|
|
|
|
|
if (this.complete) {
|
|
|
|
|
if (this.complete.get()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
// 推送结束
|
|
|
|
@ -127,6 +110,9 @@ public class MessageSseEmitter extends SseEmitter {
|
|
|
|
|
* 创建部分消息事件
|
|
|
|
|
*/
|
|
|
|
|
private Event createPartialMessage(String partial) {
|
|
|
|
|
if (messagePrompt == null) {
|
|
|
|
|
throw new IllegalStateException("MessagePrompt is null");
|
|
|
|
|
}
|
|
|
|
|
return new PartialEvent(messagePrompt.getSid(), partial);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -134,29 +120,34 @@ public class MessageSseEmitter extends SseEmitter {
|
|
|
|
|
* 创建完成消息事件
|
|
|
|
|
*/
|
|
|
|
|
private Event createCompleteMessage() {
|
|
|
|
|
if (messagePrompt == null) {
|
|
|
|
|
throw new IllegalStateException("MessagePrompt is null");
|
|
|
|
|
}
|
|
|
|
|
return new CompleteEvent(messagePrompt.getSid());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 重写send方法,处理异常
|
|
|
|
|
* 发送消息
|
|
|
|
|
*/
|
|
|
|
|
@Override
|
|
|
|
|
public void send(SseEventBuilder builder) throws IOException {
|
|
|
|
|
private void sendMessage(Event message) {
|
|
|
|
|
if (sink != null && !complete.get()) {
|
|
|
|
|
try {
|
|
|
|
|
super.send(builder);
|
|
|
|
|
sink.next(ServerSentEvent.builder(message).build());
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
complete = true;
|
|
|
|
|
complete.set(true);
|
|
|
|
|
if (sink != null && !sink.isCancelled()) {
|
|
|
|
|
sink.error(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 发送消息
|
|
|
|
|
* 完成响应
|
|
|
|
|
*/
|
|
|
|
|
private void sendMessage(Event message) {
|
|
|
|
|
try {
|
|
|
|
|
this.send(message);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
complete = true;
|
|
|
|
|
public void complete() {
|
|
|
|
|
if (!complete.getAndSet(true) && sink != null && !sink.isCancelled()) {
|
|
|
|
|
sink.complete();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -164,13 +155,16 @@ public class MessageSseEmitter extends SseEmitter {
|
|
|
|
|
* 获取完成状态
|
|
|
|
|
*/
|
|
|
|
|
public boolean isComplete() {
|
|
|
|
|
return complete;
|
|
|
|
|
return complete.get();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 设置完成状态
|
|
|
|
|
*/
|
|
|
|
|
public void setComplete(boolean complete) {
|
|
|
|
|
this.complete = complete;
|
|
|
|
|
this.complete.set(complete);
|
|
|
|
|
if (complete && sink != null && !sink.isCancelled()) {
|
|
|
|
|
sink.complete();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|