Skip to content

Improve MCP Client: Cannot Connect to Streamable HTTP Servers (Non-SSE) #182

@bettercallsaulj

Description

@bettercallsaulj

Issue Details

Problem Description

The MCP client could only connect to servers using HTTP/SSE (Server-Sent Events) transport, but many MCP servers use a simpler "Streamable HTTP" pattern:

  1. Only SSE transport supported: All HTTP URLs were treated as SSE endpoints, requiring GET /sse initialization
  2. No URL-based transport detection: Client couldn't distinguish between SSE endpoints (/sse, /events) and simple HTTP endpoints (/rpc, /mcp, /api)
  3. Filter chain always waited for SSE endpoint: Even for simple HTTP servers, the filter chain would wait indefinitely for an SSE "endpoint" event that never comes
  4. Response processing broken for non-SSE: JSON-RPC responses in HTTP response body weren't processed correctly - only SSE event stream parsing worked

Root Cause

The MCP protocol supports two HTTP transport modes:

HTTP/SSE (Server-Sent Events):

Client                          Server
  |-- GET /sse ----------------->|  (establish SSE stream)
  |<-- event: endpoint ----------|  (receive POST URL)
  |<-- event: message -----------|  (receive responses via SSE)
  |-- POST /message ------------>|  (send requests to separate endpoint)

Streamable HTTP (Simple POST/Response):

Client                          Server
  |-- POST /rpc ---------------->|  (send JSON-RPC request)
  |<-- HTTP 200 + JSON-RPC -----|  (receive response in body)

The client only implemented the first pattern, making it incompatible with simpler MCP servers.

Technical Flow (Before Fix)

Client (connecting to http://server:8080/rpc)
  |
  |-- negotiateTransport("http://server:8080/rpc")
  |   returns TransportType::HttpSse  ❌ Always returns SSE
  |
  |-- Create HttpSseFilterChainFactory
  |   waiting_for_sse_endpoint_ = true  ❌ Waits for endpoint event
  |   http_filter_->setUseSseGet(true)  ❌ Sends GET instead of POST
  |
  |-- Connection established
  |-- GET /rpc HTTP/1.1 ----------------->|  ❌ Wrong! Server expects POST
  |   Accept: text/event-stream           |
  |                                       |
  |<-- HTTP 404 Not Found ----------------|  ❌ Server doesn't have SSE endpoint
  |   OR                                  |
  |<-- HTTP 200 + JSON-RPC --------------|  ❌ Response ignored (waiting for SSE)
  |                                       |
  |-- (waiting for SSE endpoint event...) |  ❌ Hangs forever

Technical Flow (After Fix)

Client (connecting to http://server:8080/rpc)
  |
  |-- negotiateTransport("http://server:8080/rpc")
  |   Path "/rpc" doesn't contain "/sse" or "/events"
  |   returns TransportType::StreamableHttp  ✅
  |
  |-- Create HttpSseFilterChainFactory
  |   use_sse = false  ✅
  |   waiting_for_sse_endpoint_ = false  ✅ No waiting
  |   http_filter_->setUseSseGet(false)  ✅ Send POST directly
  |
  |-- Connection established
  |-- POST /rpc HTTP/1.1 ---------------->|  ✅ Correct!
  |   {"jsonrpc":"2.0","method":"init"}   |
  |                                       |
  |<-- HTTP 200 OK -----------------------|
  |   {"jsonrpc":"2.0","result":...}      |
  |                                       |
  |-- Response processed directly  ✅     |

Client (connecting to http://server:8080/sse)
  |
  |-- negotiateTransport("http://server:8080/sse")
  |   Path "/sse" contains "/sse"
  |   returns TransportType::HttpSse  ✅
  |
  |-- (SSE flow as before, unchanged)

How to Reproduce

Issue 1: Cannot Connect to Streamable HTTP Server

void reproduceStreamableHttpFailure() {
  McpClient client;

  // Server uses simple POST/response pattern (not SSE)
  client.setUri("http://localhost:8080/rpc");

  // Before fix: connect() succeeds but...
  auto result = client.connect();

  // Before fix: Client sends GET /rpc with SSE headers
  // Server either returns 404 or returns JSON directly
  // Client ignores response (waiting for SSE endpoint event)

  // Try to send request
  auto init_result = client.initializeProtocol();

  // Before fix: Hangs forever waiting for SSE endpoint
  // After fix: Sends POST directly, receives response
}

Issue 2: Transport Negotiation Always Returns SSE

void reproduceTransportNegotiation() {
  McpClient client;

  // These should use different transports:
  client.setUri("http://server:8080/sse");     // Should use HttpSse
  client.setUri("http://server:8080/events");  // Should use HttpSse
  client.setUri("http://server:8080/rpc");     // Should use StreamableHttp
  client.setUri("http://server:8080/mcp");     // Should use StreamableHttp
  client.setUri("http://server:8080/api");     // Should use StreamableHttp

  // Before fix: ALL return TransportType::HttpSse
  // After fix: Correctly distinguishes based on path
}

Issue 3: Filter Chain Waits for Non-Existent Endpoint Event

void reproduceEndpointWaiting() {
  // Create filter chain for client mode
  HttpSseFilterChainFactory factory(dispatcher, callbacks,
                                    false,  // client mode
                                    "/rpc", "localhost");

  // Before fix: factory.use_sse_ defaults to true
  // Filter sets waiting_for_sse_endpoint_ = true
  // All requests are queued waiting for endpoint event

  // Server returns JSON-RPC directly in HTTP response body
  // But filter is looking for SSE "event: message" format
  // Response is ignored or parsing fails
}

Issue 4: Response Body Not Processed as JSON-RPC

void reproduceResponseProcessing() {
  // HTTP response from Streamable HTTP server:
  // HTTP/1.1 200 OK
  // Content-Type: application/json
  //
  // {"jsonrpc":"2.0","result":{"capabilities":{}},"id":1}

  // Before fix:
  // - onBody() accumulates data in pending_json_data_
  // - Only processes on end_stream
  // - Doesn't add newline delimiter for JSON-RPC parser

  // After fix:
  // - onBody() processes each chunk immediately
  // - Adds newline delimiter for JSON-RPC parsing
  // - Works with chunked transfer encoding
}

Minimal Reproduction Test

TEST(McpClient, StreamableHttpTransport) {
  // Test transport negotiation
  EXPECT_EQ(negotiateTransport("http://server/sse"), TransportType::HttpSse);
  EXPECT_EQ(negotiateTransport("http://server/events"), TransportType::HttpSse);
  EXPECT_EQ(negotiateTransport("http://server/rpc"), TransportType::StreamableHttp);
  EXPECT_EQ(negotiateTransport("http://server/mcp"), TransportType::StreamableHttp);
  EXPECT_EQ(negotiateTransport("http://server/"), TransportType::StreamableHttp);
}

TEST(HttpSseFilterChainFactory, StreamableHttpMode) {
  // Create factory with use_sse=false
  HttpSseFilterChainFactory factory(dispatcher, callbacks,
                                    false, "/rpc", "localhost",
                                    false /* use_sse */);

  // Filter chain should NOT wait for endpoint event
  // Filter chain should send POST requests directly
  // Filter chain should process JSON-RPC in response body
}

Key Changes in the Fix

Component Change
TransportType enum Added StreamableHttp value
negotiateTransport() Check URL path for /sse or /events to distinguish transport
createConnectionConfig() Added case for StreamableHttp with HTTPS support
McpConnectionManager::connect() Added connection flow for StreamableHttp
HttpSseFilterChainFactory Added use_sse parameter (default true for backward compat)
HttpSseJsonRpcProtocolFilter Added use_sse_ member to control behavior
HttpSseJsonRpcProtocolFilter Don't call setUseSseGet(true) when use_sse=false
HttpSseJsonRpcProtocolFilter Don't set waiting_for_sse_endpoint_ when use_sse=false
onBody() Process JSON-RPC chunks immediately in non-SSE mode
createFilterChainFactory() Pass use_sse=false for StreamableHttp transport

Transport Negotiation Logic

TransportType McpClient::negotiateTransport(const std::string& uri) {
  if (uri.find("stdio://") == 0) {
    return TransportType::Stdio;
  } else if (uri.find("ws://") == 0 || uri.find("wss://") == 0) {
    return TransportType::WebSocket;
  } else if (uri.find("http://") == 0 || uri.find("https://") == 0) {
    // Extract path from URI
    std::string path = extractPath(uri);

    // Check for SSE-specific paths
    if (path.find("/sse") != std::string::npos ||
        path.find("/events") != std::string::npos) {
      return TransportType::HttpSse;
    }

    // Default to Streamable HTTP for most HTTP endpoints
    return TransportType::StreamableHttp;
  }
  return TransportType::StreamableHttp;
}

Filter Chain Mode Selection

// HttpSseFilterChainFactory constructor
HttpSseFilterChainFactory(event::Dispatcher& dispatcher,
                          McpProtocolCallbacks& message_callbacks,
                          bool is_server = true,
                          const std::string& http_path = "/rpc",
                          const std::string& http_host = "localhost",
                          bool use_sse = true)  // NEW PARAMETER

// In HttpSseJsonRpcProtocolFilter constructor
if (!is_server) {
  http_filter_->setClientEndpoint(http_path, http_host);
  // Only enable SSE GET mode if use_sse is true
  if (use_sse) {
    http_filter_->setUseSseGet(true);
  }
}

// In initializeReadFilterCallbacks()
if (!is_server_ && use_sse_) {
  waiting_for_sse_endpoint_ = true;
}
// For Streamable HTTP (use_sse_ = false), skip endpoint waiting

Response Body Processing

void HttpSseJsonRpcProtocolFilter::onBody(const OwnedBuffer& data, bool end_stream) {
  if (!use_sse_) {
    // Streamable HTTP mode: body contains JSON-RPC response
    // Process each chunk immediately
    OwnedBuffer temp_buffer;
    temp_buffer.add(data);
    // Add newline for JSON-RPC parsing (expects newline-delimited messages)
    if (!data.empty() && data.back() != '\n') {
      temp_buffer.add("\n", 1);
    }
    jsonrpc_filter_->onData(temp_buffer, end_stream);
  } else {
    // SSE mode: body contains SSE event stream
    sse_filter_->onData(data, end_stream);
  }
}

Connection Manager Filter Factory Selection

std::shared_ptr<network::FilterChainFactory>
McpConnectionManager::createFilterChainFactory() {
  if (config_.transport_type == TransportType::HttpSse) {
    return std::make_shared<filter::HttpSseFilterChainFactory>(
        dispatcher_, *this, is_server_, config_.http_path, config_.http_host,
        true /* use_sse */);

  } else if (config_.transport_type == TransportType::StreamableHttp) {
    return std::make_shared<filter::HttpSseFilterChainFactory>(
        dispatcher_, *this, is_server_, config_.http_path, config_.http_host,
        false /* use_sse */);
  }
  // ...
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions