Skip to content

Commit 25af9cf

Browse files
authored
Merge branch 'punkpeye:main' into main
2 parents a4eb600 + 4f990bc commit 25af9cf

File tree

7 files changed

+575
-23
lines changed

7 files changed

+575
-23
lines changed

README.md

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,14 @@ npm install mcp-proxy
2525
npx mcp-proxy --port 8080 --endpoint /sse tsx server.js
2626
```
2727

28-
This starts an SSE server and `stdio` server (`tsx server.js`). The SSE server listens on port 8080 and endpoint `/sse`, and forwards messages to the `stdio` server.
28+
This starts a server and `stdio` server (`tsx server.js`). The server listens on port 8080 and endpoint `/sse` by default, and forwards messages to the `stdio` server.
29+
30+
options:
31+
32+
- `--port`: Specify the port to listen on (default: 8080)
33+
- `--endpoint`: Specify the endpoint to listen on (default: `/sse` for SSE server, `/stream` for stream server)
34+
- `--server`: Specify the server type to use (default: `sse`)
35+
- `--debug`: Enable debug logging
2936

3037
### Node.js SDK
3138

@@ -71,6 +78,26 @@ const { close } = await startSSEServer({
7178
close();
7279
```
7380

81+
#### `startHTTPStreamServer`
82+
83+
Starts a proxy that listens on a `port` and `endpoint`, and sends messages to the attached server via `StreamableHTTPServerTransport`.
84+
85+
```ts
86+
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
87+
import { startHTTPStreamServer, InMemoryEventStore } from "mcp-proxy";
88+
89+
const { close } = await startHTTPStreamServer({
90+
port: 8080,
91+
endpoint: "/stream",
92+
createServer: async () => {
93+
return new Server();
94+
},
95+
eventStore: new InMemoryEventStore(), // optional you can provide your own event store
96+
});
97+
98+
close();
99+
```
100+
74101
#### `tapTransport`
75102

76103
Taps into a transport and logs events.

eslint.config.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
11
import perfectionist from "eslint-plugin-perfectionist";
22

3-
export default [perfectionist.configs["recommended-alphabetical"]];
3+
export default [
4+
perfectionist.configs["recommended-alphabetical"],
5+
{
6+
ignores: [
7+
'**/dist/'
8+
],
9+
},
10+
];

src/InMemoryEventStore.ts

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/**
2+
* This is a copy of the InMemoryEventStore from the typescript-sdk
3+
* https://github.com/modelcontextprotocol/typescript-sdk/blob/main/src/inMemoryEventStore.ts
4+
*/
5+
6+
import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js";
7+
import type { EventStore } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
8+
9+
/**
10+
* Simple in-memory implementation of the EventStore interface for resumability
11+
* This is primarily intended for examples and testing, not for production use
12+
* where a persistent storage solution would be more appropriate.
13+
*/
14+
export class InMemoryEventStore implements EventStore {
15+
private events: Map<string, { streamId: string; message: JSONRPCMessage }> =
16+
new Map();
17+
18+
/**
19+
* Generates a unique event ID for a given stream ID
20+
*/
21+
private generateEventId(streamId: string): string {
22+
return `${streamId}_${Date.now()}_${Math.random().toString(36).substring(2, 10)}`;
23+
}
24+
25+
/**
26+
* Extracts the stream ID from an event ID
27+
*/
28+
private getStreamIdFromEventId(eventId: string): string {
29+
const parts = eventId.split("_");
30+
return parts.length > 0 ? parts[0] : "";
31+
}
32+
33+
/**
34+
* Stores an event with a generated event ID
35+
* Implements EventStore.storeEvent
36+
*/
37+
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
38+
const eventId = this.generateEventId(streamId);
39+
this.events.set(eventId, { streamId, message });
40+
return eventId;
41+
}
42+
43+
/**
44+
* Replays events that occurred after a specific event ID
45+
* Implements EventStore.replayEventsAfter
46+
*/
47+
async replayEventsAfter(
48+
lastEventId: string,
49+
{
50+
send,
51+
}: { send: (eventId: string, message: JSONRPCMessage) => Promise<void> }
52+
): Promise<string> {
53+
if (!lastEventId || !this.events.has(lastEventId)) {
54+
return "";
55+
}
56+
57+
// Extract the stream ID from the event ID
58+
const streamId = this.getStreamIdFromEventId(lastEventId);
59+
if (!streamId) {
60+
return "";
61+
}
62+
63+
let foundLastEvent = false;
64+
65+
// Sort events by eventId for chronological ordering
66+
const sortedEvents = [...this.events.entries()].sort((a, b) =>
67+
a[0].localeCompare(b[0])
68+
);
69+
70+
for (const [
71+
eventId,
72+
{ streamId: eventStreamId, message },
73+
] of sortedEvents) {
74+
// Only include events from the same stream
75+
if (eventStreamId !== streamId) {
76+
continue;
77+
}
78+
79+
// Start sending events after we find the lastEventId
80+
if (eventId === lastEventId) {
81+
foundLastEvent = true;
82+
continue;
83+
}
84+
85+
if (foundLastEvent) {
86+
await send(eventId, message);
87+
}
88+
}
89+
return streamId;
90+
}
91+
}

src/bin/mcp-proxy.ts

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import { setTimeout } from "node:timers";
99
import { StdioClientTransport } from "../StdioClientTransport.js";
1010
import util from "node:util";
1111
import { startSSEServer } from "../startSSEServer.js";
12+
import { startHTTPStreamServer } from "../startHTTPStreamServer.js";
1213
import { proxyServer } from "../proxyServer.js";
14+
import { InMemoryEventStore } from "../InMemoryEventStore.js";
1315

1416
util.inspect.defaultOptions.depth = 8;
1517

@@ -40,14 +42,19 @@ const argv = await yargs(hideBin(process.argv))
4042
},
4143
endpoint: {
4244
type: "string",
43-
describe: "The endpoint to listen on for SSE",
44-
default: "/sse",
45+
describe: "The endpoint to listen on",
4546
},
4647
port: {
4748
type: "number",
48-
describe: "The port to listen on for SSE",
49+
describe: "The port to listen on",
4950
default: 8080,
5051
},
52+
server: {
53+
type: "string",
54+
describe: "The server type to use (sse or stream)",
55+
choices: ["sse", "stream"],
56+
default: "sse",
57+
},
5158
})
5259
.help()
5360
.parseAsync();
@@ -76,7 +83,7 @@ const proxy = async () => {
7683
},
7784
{
7885
capabilities: {},
79-
},
86+
}
8087
);
8188

8289
await connect(client);
@@ -88,25 +95,36 @@ const proxy = async () => {
8895

8996
const serverCapabilities = client.getServerCapabilities() as {};
9097

91-
console.info("starting the SSE server on port %d", argv.port);
98+
console.info("starting the %s server on port %d", argv.server, argv.port);
9299

93-
await startSSEServer({
94-
createServer: async () => {
95-
const server = new Server(serverVersion, {
96-
capabilities: serverCapabilities,
97-
});
100+
const createServer = async () => {
101+
const server = new Server(serverVersion, {
102+
capabilities: serverCapabilities,
103+
});
98104

99-
proxyServer({
100-
server,
101-
client,
102-
serverCapabilities,
103-
});
105+
proxyServer({
106+
server,
107+
client,
108+
serverCapabilities,
109+
});
104110

105-
return server;
106-
},
107-
port: argv.port,
108-
endpoint: argv.endpoint as `/${string}`,
109-
});
111+
return server;
112+
};
113+
114+
if (argv.server === "sse") {
115+
await startSSEServer({
116+
createServer,
117+
port: argv.port,
118+
endpoint: argv.endpoint || ("/sse" as `/${string}`),
119+
});
120+
} else {
121+
await startHTTPStreamServer({
122+
createServer,
123+
port: argv.port,
124+
endpoint: argv.endpoint || ("/stream" as `/${string}`),
125+
eventStore: new InMemoryEventStore(),
126+
});
127+
}
110128
};
111129

112130
const main = async () => {

src/index.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1-
export { tapTransport } from "./tapTransport.js";
1+
export { InMemoryEventStore } from "./InMemoryEventStore.js";
22
export { proxyServer } from "./proxyServer.js";
3+
export { startHTTPStreamServer } from "./startHTTPStreamServer.js";
34
export { startSSEServer } from "./startSSEServer.js";
5+
export { tapTransport } from "./tapTransport.js";

src/startHTTPStreamServer.test.ts

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
2+
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
3+
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
4+
import { it, expect, vi } from "vitest";
5+
import { startHTTPStreamServer } from "./startHTTPStreamServer.js";
6+
import { getRandomPort } from "get-port-please";
7+
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
8+
import { EventSource } from "eventsource";
9+
import { setTimeout as delay } from "node:timers/promises";
10+
import { proxyServer } from "./proxyServer.js";
11+
12+
if (!("EventSource" in global)) {
13+
// @ts-expect-error - figure out how to use --experimental-eventsource with vitest
14+
global.EventSource = EventSource;
15+
}
16+
17+
it("proxies messages between HTTP stream and stdio servers", async () => {
18+
const stdioTransport = new StdioClientTransport({
19+
command: "tsx",
20+
args: ["src/simple-stdio-server.ts"],
21+
});
22+
23+
const stdioClient = new Client(
24+
{
25+
name: "mcp-proxy",
26+
version: "1.0.0",
27+
},
28+
{
29+
capabilities: {},
30+
}
31+
);
32+
33+
await stdioClient.connect(stdioTransport);
34+
35+
const serverVersion = stdioClient.getServerVersion() as {
36+
name: string;
37+
version: string;
38+
};
39+
40+
const serverCapabilities = stdioClient.getServerCapabilities() as {};
41+
42+
const port = await getRandomPort();
43+
44+
const onConnect = vi.fn();
45+
const onClose = vi.fn();
46+
47+
await startHTTPStreamServer({
48+
createServer: async () => {
49+
const mcpServer = new Server(serverVersion, {
50+
capabilities: serverCapabilities,
51+
});
52+
53+
await proxyServer({
54+
server: mcpServer,
55+
client: stdioClient,
56+
serverCapabilities,
57+
});
58+
59+
return mcpServer;
60+
},
61+
port,
62+
endpoint: "/stream",
63+
onConnect,
64+
onClose,
65+
});
66+
67+
const streamClient = new Client(
68+
{
69+
name: "stream-client",
70+
version: "1.0.0",
71+
},
72+
{
73+
capabilities: {},
74+
}
75+
);
76+
77+
const transport = new StreamableHTTPClientTransport(
78+
new URL(`http://localhost:${port}/stream`)
79+
);
80+
81+
await streamClient.connect(transport);
82+
83+
const result = await streamClient.listResources();
84+
expect(result).toEqual({
85+
resources: [
86+
{
87+
uri: "file:///example.txt",
88+
name: "Example Resource",
89+
},
90+
],
91+
});
92+
93+
expect(
94+
await streamClient.readResource({ uri: result.resources[0].uri }, {})
95+
).toEqual({
96+
contents: [
97+
{
98+
uri: "file:///example.txt",
99+
mimeType: "text/plain",
100+
text: "This is the content of the example resource.",
101+
},
102+
],
103+
});
104+
expect(await streamClient.subscribeResource({ uri: "xyz" })).toEqual({});
105+
expect(await streamClient.unsubscribeResource({ uri: "xyz" })).toEqual({});
106+
expect(await streamClient.listResourceTemplates()).toEqual({
107+
resourceTemplates: [
108+
{
109+
uriTemplate: `file://{filename}`,
110+
name: "Example resource template",
111+
description: "Specify the filename to retrieve",
112+
},
113+
],
114+
});
115+
116+
expect(onConnect).toHaveBeenCalled();
117+
expect(onClose).not.toHaveBeenCalled();
118+
119+
// the transport no requires the function terminateSession to be called but the client does not implement it
120+
// so we need to call it manually
121+
await transport.terminateSession();
122+
await streamClient.close();
123+
124+
await delay(1000);
125+
126+
expect(onClose).toHaveBeenCalled();
127+
});

0 commit comments

Comments
 (0)