Skip to content

Commit 35e1c5f

Browse files
authored
Merge pull request #1657 from nasonawa/mcp_streamhttp_recordparse_sse_events
added Record Parser to parse each sse event
2 parents ac78650 + f9e5a90 commit 35e1c5f

File tree

1 file changed

+8
-5
lines changed

1 file changed

+8
-5
lines changed

mcp/runtime/src/main/java/io/quarkiverse/langchain4j/mcp/runtime/http/QuarkusStreamableHttpMcpTransport.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.vertx.core.http.HttpClient;
3535
import io.vertx.core.http.HttpMethod;
3636
import io.vertx.core.http.RequestOptions;
37+
import io.vertx.core.parsetools.RecordParser;
3738

3839
public class QuarkusStreamableHttpMcpTransport implements McpTransport {
3940

@@ -143,14 +144,16 @@ private Uni<JsonNode> execute(McpClientMessage request, Long id) {
143144
this.mcpSessionId.set(mcpSessionId);
144145
}
145146

147+
RecordParser sseEventparser = RecordParser.newDelimited("\n\n", bodyBuffer -> {
148+
String responseString = bodyBuffer.toString();
149+
SseEvent<String> sseEvent = parseSseEvent(responseString);
150+
sseSubscriber.accept(sseEvent);
151+
});
152+
146153
String contentType = response.result().getHeader("Content-Type");
147154
if (id != null && contentType != null && contentType.contains("text/event-stream")) {
148155
// the server has started a SSE channel
149-
response.result().handler(bodyBuffer -> {
150-
String responseString = bodyBuffer.toString();
151-
SseEvent<String> sseEvent = parseSseEvent(responseString);
152-
sseSubscriber.accept(sseEvent);
153-
});
156+
response.result().handler(sseEventparser);
154157
} else {
155158
// the server has sent a single regular response
156159
if (id == null) {

0 commit comments

Comments
 (0)