package xyz.wbsite.mcp.basic.controller; import cn.hutool.json.JSONUtil; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import xyz.wbsite.mcp.basic.model.McpResponse; import java.io.IOException; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; /** * SSE广播器类 * 用于管理和广播SSE事件给所有已连接的客户端 * * @author wangbing */ @Service public class SseBroadcaster { private static final Logger log = LoggerFactory.getLogger(SseBroadcaster.class); private final List emitters = new CopyOnWriteArrayList<>(); /** * 注册新的SSE发射器 * * @param emitter sse发射器 */ public void registerEmitter(SseEmitter emitter) { // Add the emitter to the list emitters.add(emitter); log.info("New SSE emitter registered, total emitters: {}", emitters.size()); // Set up completion handlers to remove the emitter when the connection is closed emitter.onCompletion(() -> { emitters.remove(emitter); log.info("SSE emitter completed, remaining emitters: {}", emitters.size()); }); emitter.onTimeout(() -> { emitters.remove(emitter); log.info("SSE emitter timed out, remaining emitters: {}", emitters.size()); }); emitter.onError((e) -> { emitters.remove(emitter); log.error("Error on SSE emitter: {}", e.getMessage(), e); }); } /** * 将初始端点事件发送到特定发射器 * * @param emitter sse发射器 */ public void sendEndpointEvent(SseEmitter emitter) { try { emitter.send(SseEmitter.event() .name("endpoint") .data("/mcp/post")); log.info("Sent endpoint event to SSE emitter"); } catch (IOException e) { log.error("Failed to send endpoint event: {}", e.getMessage(), e); emitter.completeWithError(e); } } /** * 由POST端点处理程序调用,通过SSE发回响应 * * @param response 响应. */ public void broadcastResponse(McpResponse response) { if (emitters.isEmpty()) { log.warn("No active SSE emitters to broadcast response (ID: {})\n", response.getId()); return; } // Responses are sent as events of type "message" String jsonResponse = JSONUtil.toJsonStr(response); for (SseEmitter emitter : emitters) { try { emitter.send(SseEmitter.event() .name("message") .data(jsonResponse)); log.debug("Successfully sent SSE message for Response ID: {}", response.getId()); } catch (IOException e) { log.error("Failed to send SSE message (Response ID: {}): {}", response.getId(), e.getMessage(), e); // 考虑移除有故障的发射器 emitter.completeWithError(e); } } } /** * Sends a simple notification (like log messages, tool updates) over SSE. * This isn't used in the simple weather example but shows how non-response * messages would be sent. * * @param notification An object representing the notification. * @param eventName The SSE event name (e.g., "notifications/message"). */ public void broadcastNotification(Object notification, String eventName) { if (emitters.isEmpty()) { log.warn("No active SSE emitters to broadcast notification ({})\n", eventName); return; } String jsonNotification = JSONUtil.toJsonStr(notification); for (SseEmitter emitter : emitters) { try { emitter.send(SseEmitter.event() .name(eventName) .data(jsonNotification)); log.debug("Successfully sent SSE notification ({})\n", eventName); } catch (IOException e) { log.error("Failed to send SSE notification ({}): {}", eventName, e.getMessage(), e); emitter.completeWithError(e); } } } }