-
Notifications
You must be signed in to change notification settings - Fork 7
Open
Description
Issue Details
Problem Description
The MCP client could only connect to servers using HTTP/SSE (Server-Sent Events) transport, but many MCP servers use a simpler "Streamable HTTP" pattern:
- Only SSE transport supported: All HTTP URLs were treated as SSE endpoints, requiring
GET /sseinitialization - No URL-based transport detection: Client couldn't distinguish between SSE endpoints (
/sse,/events) and simple HTTP endpoints (/rpc,/mcp,/api) - Filter chain always waited for SSE endpoint: Even for simple HTTP servers, the filter chain would wait indefinitely for an SSE "endpoint" event that never comes
- Response processing broken for non-SSE: JSON-RPC responses in HTTP response body weren't processed correctly - only SSE event stream parsing worked
Root Cause
The MCP protocol supports two HTTP transport modes:
HTTP/SSE (Server-Sent Events):
Client Server
|-- GET /sse ----------------->| (establish SSE stream)
|<-- event: endpoint ----------| (receive POST URL)
|<-- event: message -----------| (receive responses via SSE)
|-- POST /message ------------>| (send requests to separate endpoint)
Streamable HTTP (Simple POST/Response):
Client Server
|-- POST /rpc ---------------->| (send JSON-RPC request)
|<-- HTTP 200 + JSON-RPC -----| (receive response in body)
The client only implemented the first pattern, making it incompatible with simpler MCP servers.
Technical Flow (Before Fix)
Client (connecting to http://server:8080/rpc)
|
|-- negotiateTransport("http://server:8080/rpc")
| returns TransportType::HttpSse ❌ Always returns SSE
|
|-- Create HttpSseFilterChainFactory
| waiting_for_sse_endpoint_ = true ❌ Waits for endpoint event
| http_filter_->setUseSseGet(true) ❌ Sends GET instead of POST
|
|-- Connection established
|-- GET /rpc HTTP/1.1 ----------------->| ❌ Wrong! Server expects POST
| Accept: text/event-stream |
| |
|<-- HTTP 404 Not Found ----------------| ❌ Server doesn't have SSE endpoint
| OR |
|<-- HTTP 200 + JSON-RPC --------------| ❌ Response ignored (waiting for SSE)
| |
|-- (waiting for SSE endpoint event...) | ❌ Hangs forever
Technical Flow (After Fix)
Client (connecting to http://server:8080/rpc)
|
|-- negotiateTransport("http://server:8080/rpc")
| Path "/rpc" doesn't contain "/sse" or "/events"
| returns TransportType::StreamableHttp ✅
|
|-- Create HttpSseFilterChainFactory
| use_sse = false ✅
| waiting_for_sse_endpoint_ = false ✅ No waiting
| http_filter_->setUseSseGet(false) ✅ Send POST directly
|
|-- Connection established
|-- POST /rpc HTTP/1.1 ---------------->| ✅ Correct!
| {"jsonrpc":"2.0","method":"init"} |
| |
|<-- HTTP 200 OK -----------------------|
| {"jsonrpc":"2.0","result":...} |
| |
|-- Response processed directly ✅ |
Client (connecting to http://server:8080/sse)
|
|-- negotiateTransport("http://server:8080/sse")
| Path "/sse" contains "/sse"
| returns TransportType::HttpSse ✅
|
|-- (SSE flow as before, unchanged)
How to Reproduce
Issue 1: Cannot Connect to Streamable HTTP Server
void reproduceStreamableHttpFailure() {
McpClient client;
// Server uses simple POST/response pattern (not SSE)
client.setUri("http://localhost:8080/rpc");
// Before fix: connect() succeeds but...
auto result = client.connect();
// Before fix: Client sends GET /rpc with SSE headers
// Server either returns 404 or returns JSON directly
// Client ignores response (waiting for SSE endpoint event)
// Try to send request
auto init_result = client.initializeProtocol();
// Before fix: Hangs forever waiting for SSE endpoint
// After fix: Sends POST directly, receives response
}Issue 2: Transport Negotiation Always Returns SSE
void reproduceTransportNegotiation() {
McpClient client;
// These should use different transports:
client.setUri("http://server:8080/sse"); // Should use HttpSse
client.setUri("http://server:8080/events"); // Should use HttpSse
client.setUri("http://server:8080/rpc"); // Should use StreamableHttp
client.setUri("http://server:8080/mcp"); // Should use StreamableHttp
client.setUri("http://server:8080/api"); // Should use StreamableHttp
// Before fix: ALL return TransportType::HttpSse
// After fix: Correctly distinguishes based on path
}Issue 3: Filter Chain Waits for Non-Existent Endpoint Event
void reproduceEndpointWaiting() {
// Create filter chain for client mode
HttpSseFilterChainFactory factory(dispatcher, callbacks,
false, // client mode
"/rpc", "localhost");
// Before fix: factory.use_sse_ defaults to true
// Filter sets waiting_for_sse_endpoint_ = true
// All requests are queued waiting for endpoint event
// Server returns JSON-RPC directly in HTTP response body
// But filter is looking for SSE "event: message" format
// Response is ignored or parsing fails
}Issue 4: Response Body Not Processed as JSON-RPC
void reproduceResponseProcessing() {
// HTTP response from Streamable HTTP server:
// HTTP/1.1 200 OK
// Content-Type: application/json
//
// {"jsonrpc":"2.0","result":{"capabilities":{}},"id":1}
// Before fix:
// - onBody() accumulates data in pending_json_data_
// - Only processes on end_stream
// - Doesn't add newline delimiter for JSON-RPC parser
// After fix:
// - onBody() processes each chunk immediately
// - Adds newline delimiter for JSON-RPC parsing
// - Works with chunked transfer encoding
}Minimal Reproduction Test
TEST(McpClient, StreamableHttpTransport) {
// Test transport negotiation
EXPECT_EQ(negotiateTransport("http://server/sse"), TransportType::HttpSse);
EXPECT_EQ(negotiateTransport("http://server/events"), TransportType::HttpSse);
EXPECT_EQ(negotiateTransport("http://server/rpc"), TransportType::StreamableHttp);
EXPECT_EQ(negotiateTransport("http://server/mcp"), TransportType::StreamableHttp);
EXPECT_EQ(negotiateTransport("http://server/"), TransportType::StreamableHttp);
}
TEST(HttpSseFilterChainFactory, StreamableHttpMode) {
// Create factory with use_sse=false
HttpSseFilterChainFactory factory(dispatcher, callbacks,
false, "/rpc", "localhost",
false /* use_sse */);
// Filter chain should NOT wait for endpoint event
// Filter chain should send POST requests directly
// Filter chain should process JSON-RPC in response body
}Key Changes in the Fix
| Component | Change |
|---|---|
| TransportType enum | Added StreamableHttp value |
| negotiateTransport() | Check URL path for /sse or /events to distinguish transport |
| createConnectionConfig() | Added case for StreamableHttp with HTTPS support |
| McpConnectionManager::connect() | Added connection flow for StreamableHttp |
| HttpSseFilterChainFactory | Added use_sse parameter (default true for backward compat) |
| HttpSseJsonRpcProtocolFilter | Added use_sse_ member to control behavior |
| HttpSseJsonRpcProtocolFilter | Don't call setUseSseGet(true) when use_sse=false |
| HttpSseJsonRpcProtocolFilter | Don't set waiting_for_sse_endpoint_ when use_sse=false |
| onBody() | Process JSON-RPC chunks immediately in non-SSE mode |
| createFilterChainFactory() | Pass use_sse=false for StreamableHttp transport |
Transport Negotiation Logic
TransportType McpClient::negotiateTransport(const std::string& uri) {
if (uri.find("stdio://") == 0) {
return TransportType::Stdio;
} else if (uri.find("ws://") == 0 || uri.find("wss://") == 0) {
return TransportType::WebSocket;
} else if (uri.find("http://") == 0 || uri.find("https://") == 0) {
// Extract path from URI
std::string path = extractPath(uri);
// Check for SSE-specific paths
if (path.find("/sse") != std::string::npos ||
path.find("/events") != std::string::npos) {
return TransportType::HttpSse;
}
// Default to Streamable HTTP for most HTTP endpoints
return TransportType::StreamableHttp;
}
return TransportType::StreamableHttp;
}Filter Chain Mode Selection
// HttpSseFilterChainFactory constructor
HttpSseFilterChainFactory(event::Dispatcher& dispatcher,
McpProtocolCallbacks& message_callbacks,
bool is_server = true,
const std::string& http_path = "/rpc",
const std::string& http_host = "localhost",
bool use_sse = true) // NEW PARAMETER
// In HttpSseJsonRpcProtocolFilter constructor
if (!is_server) {
http_filter_->setClientEndpoint(http_path, http_host);
// Only enable SSE GET mode if use_sse is true
if (use_sse) {
http_filter_->setUseSseGet(true);
}
}
// In initializeReadFilterCallbacks()
if (!is_server_ && use_sse_) {
waiting_for_sse_endpoint_ = true;
}
// For Streamable HTTP (use_sse_ = false), skip endpoint waitingResponse Body Processing
void HttpSseJsonRpcProtocolFilter::onBody(const OwnedBuffer& data, bool end_stream) {
if (!use_sse_) {
// Streamable HTTP mode: body contains JSON-RPC response
// Process each chunk immediately
OwnedBuffer temp_buffer;
temp_buffer.add(data);
// Add newline for JSON-RPC parsing (expects newline-delimited messages)
if (!data.empty() && data.back() != '\n') {
temp_buffer.add("\n", 1);
}
jsonrpc_filter_->onData(temp_buffer, end_stream);
} else {
// SSE mode: body contains SSE event stream
sse_filter_->onData(data, end_stream);
}
}Connection Manager Filter Factory Selection
std::shared_ptr<network::FilterChainFactory>
McpConnectionManager::createFilterChainFactory() {
if (config_.transport_type == TransportType::HttpSse) {
return std::make_shared<filter::HttpSseFilterChainFactory>(
dispatcher_, *this, is_server_, config_.http_path, config_.http_host,
true /* use_sse */);
} else if (config_.transport_type == TransportType::StreamableHttp) {
return std::make_shared<filter::HttpSseFilterChainFactory>(
dispatcher_, *this, is_server_, config_.http_path, config_.http_host,
false /* use_sse */);
}
// ...
}Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels