Skip to content

Commit 93589e5

Browse files
authored
Fix: Add Session ID and Header Support to SSE Transport (#660) (#661)
* feat: add header and auth parity to SSE transport in MCP - Add MessageExtraInfo support to SSE transport matching StreamableHTTP behavior - Pass request headers and auth info through SSE message handling - Add RequestWithAuth type for consistent auth info passing - Filter internal headers (content-type, content-length, upgrade) before passing to MCP server - Add comprehensive tests for header and auth handling in both transports - Ensure transport initialization before processing SSE messages This ensures both SSE and StreamableHTTP transports have consistent header and authentication handling capabilities, allowing MCP servers to access request context regardless of transport type. * chore: add changeset for SSE transport fixes * fix: remove transport initialization check * refactor: update McpSSETransport to use options object for constructor parameters * fix: revert auth inclusion on SSE transport * feat: implement header and auth echo tool for SSE and Streamable HTTP transport tests * refactor: simplify McpSSETransport initialization by using getCurrentAgent util * refactor: replace buildMessageExtraInfo with direct MessageExtraInfo construction in onSSEMcpMessage * fix: ts check errors * patch: remove header deletions * fix: typechecks
1 parent 603b825 commit 93589e5

File tree

7 files changed

+253
-10
lines changed

7 files changed

+253
-10
lines changed

.changeset/stupid-plums-wear.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"agents": patch
3+
---
4+
5+
fix: add session ID and header support to SSE transport
6+
7+
The SSE transport now properly forwards session IDs and request headers to MCP message handlers, achieving closer header parity with StreamableHTTP transport. This allows MCP servers using SSE to access request headers for session management.

packages/agents/src/mcp/index.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import type { Server } from "@modelcontextprotocol/sdk/server/index.js";
22
import type { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
33
import type { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
4-
import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js";
4+
import type {
5+
JSONRPCMessage,
6+
MessageExtraInfo
7+
} from "@modelcontextprotocol/sdk/types.js";
58
import {
69
JSONRPCMessageSchema,
710
isJSONRPCError,
@@ -77,7 +80,7 @@ export abstract class McpAgent<
7780
}
7881

7982
/** Get the unique WebSocket. SSE transport only. */
80-
private getWebSocket() {
83+
getWebSocket() {
8184
const websockets = Array.from(this.getConnections());
8285
if (websockets.length === 0) {
8386
return null;
@@ -89,7 +92,7 @@ export abstract class McpAgent<
8992
private initTransport() {
9093
switch (this.getTransportType()) {
9194
case "sse": {
92-
return new McpSSETransport(() => this.getWebSocket());
95+
return new McpSSETransport();
9396
}
9497
case "streamable-http": {
9598
return new StreamableHTTPServerTransport({});
@@ -188,7 +191,8 @@ export abstract class McpAgent<
188191
/** Handles MCP Messages for the legacy SSE transport. */
189192
async onSSEMcpMessage(
190193
_sessionId: string,
191-
messageBody: unknown
194+
messageBody: unknown,
195+
extraInfo?: MessageExtraInfo
192196
): Promise<Error | null> {
193197
// Since we address the DO via both the protocol and the session id,
194198
// this should never happen, but let's enforce it just in case
@@ -210,7 +214,7 @@ export abstract class McpAgent<
210214
return null; // Message was handled by elicitation system
211215
}
212216

213-
this._transport?.onmessage?.(parsedMessage);
217+
this._transport?.onmessage?.(parsedMessage, extraInfo);
214218
return null;
215219
} catch (error) {
216220
console.error("Error forwarding message to SSE:", error);

packages/agents/src/mcp/transport.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,21 @@ import { MessageType } from "../ai-types";
1616
import { MCP_HTTP_METHOD_HEADER, MCP_MESSAGE_HEADER } from "./utils";
1717

1818
export class McpSSETransport implements Transport {
19-
sessionId?: string;
19+
sessionId: string;
2020
// Set by the server in `server.connect(transport)`
2121
onclose?: () => void;
2222
onerror?: (error: Error) => void;
23-
onmessage?: (message: JSONRPCMessage) => void;
23+
onmessage?: (message: JSONRPCMessage, extra?: MessageExtraInfo) => void;
2424

2525
private _getWebSocket: () => WebSocket | null;
2626
private _started = false;
27-
constructor(getWebSocket: () => WebSocket | null) {
28-
this._getWebSocket = getWebSocket;
27+
constructor() {
28+
const { agent } = getCurrentAgent<McpAgent>();
29+
if (!agent)
30+
throw new Error("McpAgent was not found in Transport constructor");
31+
32+
this.sessionId = agent.getSessionId();
33+
this._getWebSocket = () => agent.getWebSocket();
2934
}
3035

3136
async start() {

packages/agents/src/mcp/utils.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {
22
JSONRPCMessageSchema,
33
type JSONRPCMessage,
4+
type MessageExtraInfo,
45
InitializeRequestSchema,
56
isJSONRPCResponse,
67
isJSONRPCNotification
@@ -682,7 +683,19 @@ export const createLegacySseHandler = (
682683
});
683684

684685
const messageBody = await request.json();
685-
const error = await agent.onSSEMcpMessage(sessionId, messageBody);
686+
687+
// Build MessageExtraInfo with filtered headers
688+
const headers = Object.fromEntries(request.headers.entries());
689+
690+
const extraInfo: MessageExtraInfo = {
691+
requestInfo: { headers }
692+
};
693+
694+
const error = await agent.onSSEMcpMessage(
695+
sessionId,
696+
messageBody,
697+
extraInfo
698+
);
686699

687700
if (error) {
688701
return new Response(error.message, {

packages/agents/src/tests/mcp/transports/sse.test.ts

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
import { createExecutionContext, env } from "cloudflare:test";
22
import { describe, expect, it } from "vitest";
3+
import type {
4+
CallToolResult,
5+
JSONRPCResponse
6+
} from "@modelcontextprotocol/sdk/types.js";
37
import worker, { type Env } from "../../worker";
48
import { establishSSEConnection } from "../../shared/test-utils";
59

@@ -117,4 +121,77 @@ describe("SSE Transport", () => {
117121
expect(response.headers.get("Content-Type")).toBe("text/event-stream");
118122
});
119123
});
124+
125+
describe("Header and Auth Handling", () => {
126+
it("should pass headers and session ID to transport via requestInfo", async () => {
127+
const ctx = createExecutionContext();
128+
const { sessionId, reader } = await establishSSEConnection(ctx);
129+
130+
// Send request with custom headers using the echoRequestInfo tool
131+
const request = new Request(`${baseUrl}/message?sessionId=${sessionId}`, {
132+
method: "POST",
133+
body: JSON.stringify({
134+
id: "echo-headers-1",
135+
jsonrpc: "2.0",
136+
method: "tools/call",
137+
params: {
138+
name: "echoRequestInfo",
139+
arguments: {}
140+
}
141+
}),
142+
headers: {
143+
"Content-Type": "application/json",
144+
"x-user-id": "test-user-123",
145+
"x-request-id": "req-456",
146+
"x-custom-header": "custom-value"
147+
}
148+
});
149+
150+
const response = await worker.fetch(request, env, ctx);
151+
expect(response.status).toBe(202); // SSE returns 202 Accepted
152+
153+
// Read the response from the SSE stream
154+
const { value } = await reader.read();
155+
const event = new TextDecoder().decode(value);
156+
const lines = event.split("\n");
157+
expect(lines[0]).toEqual("event: message");
158+
159+
// Parse the JSON response from the data line
160+
const dataLine = lines.find((line) => line.startsWith("data:"));
161+
const parsed = JSON.parse(
162+
dataLine!.replace("data: ", "")
163+
) as JSONRPCResponse;
164+
expect(parsed.id).toBe("echo-headers-1");
165+
166+
// Extract the echoed request info
167+
const result = parsed.result as CallToolResult;
168+
const textContent = result.content?.[0];
169+
if (!textContent || textContent.type !== "text") {
170+
throw new Error("Expected text content in tool result");
171+
}
172+
const echoedData = JSON.parse(textContent.text);
173+
174+
// Verify custom headers were passed through
175+
expect(echoedData.hasRequestInfo).toBe(true);
176+
expect(echoedData.headers["x-user-id"]).toBe("test-user-123");
177+
expect(echoedData.headers["x-request-id"]).toBe("req-456");
178+
expect(echoedData.headers["x-custom-header"]).toBe("custom-value");
179+
180+
// Verify that certain internal headers that the transport adds are NOT exposed
181+
// The transport filters cf-mcp-method, cf-mcp-message, and upgrade headers
182+
expect(echoedData.headers["cf-mcp-method"]).toBeUndefined();
183+
expect(echoedData.headers["cf-mcp-message"]).toBeUndefined();
184+
expect(echoedData.headers.upgrade).toBeUndefined();
185+
186+
// Verify standard headers are also present
187+
expect(echoedData.headers["content-type"]).toBe("application/json");
188+
189+
// Check what properties are available in extra
190+
expect(echoedData.availableExtraKeys).toBeDefined();
191+
192+
// Verify sessionId is passed through extra data
193+
expect(echoedData.sessionId).toBeDefined();
194+
expect(echoedData.sessionId).toBe(sessionId);
195+
});
196+
});
120197
});

packages/agents/src/tests/mcp/transports/streamable-http.test.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,4 +596,72 @@ describe("Streamable HTTP Transport", () => {
596596
expect(tools.some((t) => t.name === "temp-echo")).toBe(false);
597597
});
598598
});
599+
600+
describe("Header and Auth Handling", () => {
601+
it("should pass custom headers to transport via requestInfo", async () => {
602+
const ctx = createExecutionContext();
603+
const sessionId = await initializeStreamableHTTPServer(ctx);
604+
605+
// Send request with custom headers using the echoRequestInfo tool
606+
const echoMessage: JSONRPCMessage = {
607+
id: "echo-headers-1",
608+
jsonrpc: "2.0",
609+
method: "tools/call",
610+
params: {
611+
name: "echoRequestInfo",
612+
arguments: {}
613+
}
614+
};
615+
616+
const request = new Request(baseUrl, {
617+
body: JSON.stringify(echoMessage),
618+
headers: {
619+
Accept: "application/json, text/event-stream",
620+
"Content-Type": "application/json",
621+
"mcp-session-id": sessionId,
622+
"x-user-id": "test-user-123",
623+
"x-request-id": "req-456",
624+
"x-custom-header": "custom-value"
625+
},
626+
method: "POST"
627+
});
628+
629+
const response = await worker.fetch(request, env, ctx);
630+
expect(response.status).toBe(200);
631+
632+
// Parse the SSE response
633+
const sseText = await readSSEEvent(response);
634+
const parsed = parseSSEData(sseText) as JSONRPCResponse;
635+
expect(parsed.id).toBe("echo-headers-1");
636+
637+
// Extract the echoed request info
638+
const result = parsed.result as CallToolResult;
639+
const firstContent = result.content?.[0];
640+
const contentText =
641+
firstContent?.type === "text" ? firstContent.text : undefined;
642+
const echoedData = JSON.parse(
643+
typeof contentText === "string" ? contentText : "{}"
644+
);
645+
646+
// Verify custom headers were passed through
647+
expect(echoedData.hasRequestInfo).toBe(true);
648+
expect(echoedData.headers["x-user-id"]).toBe("test-user-123");
649+
expect(echoedData.headers["x-request-id"]).toBe("req-456");
650+
expect(echoedData.headers["x-custom-header"]).toBe("custom-value");
651+
652+
// Verify that certain internal headers that the transport adds are NOT exposed
653+
// The transport adds cf-mcp-method and cf-mcp-message internally but should filter them
654+
expect(echoedData.headers["cf-mcp-method"]).toBeUndefined();
655+
expect(echoedData.headers["cf-mcp-message"]).toBeUndefined();
656+
expect(echoedData.headers.upgrade).toBeUndefined();
657+
658+
// Verify standard headers are also present
659+
expect(echoedData.headers.accept).toContain("text/event-stream");
660+
expect(echoedData.headers["content-type"]).toBe("application/json");
661+
662+
// Verify sessionId is passed through extra data
663+
expect(echoedData.sessionId).toBeDefined();
664+
expect(echoedData.sessionId).toBe(sessionId);
665+
});
666+
});
599667
});

packages/agents/src/tests/worker.ts

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
11
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
2+
import type { RequestHandlerExtra } from "@modelcontextprotocol/sdk/shared/protocol.js";
3+
import type {
4+
CallToolResult,
5+
IsomorphicHeaders,
6+
ServerNotification,
7+
ServerRequest
8+
} from "@modelcontextprotocol/sdk/types.js";
29
import { z } from "zod";
310
import { McpAgent } from "../mcp/index.ts";
411
import {
@@ -38,6 +45,19 @@ type Props = {
3845
testValue: string;
3946
};
4047

48+
type ToolExtraInfo = RequestHandlerExtra<ServerRequest, ServerNotification>;
49+
50+
type EchoResponseData = {
51+
headers: IsomorphicHeaders;
52+
authInfo: ToolExtraInfo["authInfo"] | null;
53+
hasRequestInfo: boolean;
54+
hasAuthInfo: boolean;
55+
requestId: ToolExtraInfo["requestId"];
56+
sessionId: string | null;
57+
availableExtraKeys: string[];
58+
[key: string]: unknown;
59+
};
60+
4161
export class TestMcpAgent extends McpAgent<Env, State, Props> {
4262
observability = undefined;
4363
private tempToolHandle?: { remove: () => void };
@@ -181,6 +201,55 @@ export class TestMcpAgent extends McpAgent<Env, State, Props> {
181201
};
182202
}
183203
);
204+
205+
// Echo request info for testing header and auth passthrough
206+
this.server.tool(
207+
"echoRequestInfo",
208+
"Echo back request headers and auth info",
209+
{},
210+
async (_args, extra: ToolExtraInfo): Promise<CallToolResult> => {
211+
// Extract headers from requestInfo, auth from authInfo
212+
const headers: IsomorphicHeaders = extra.requestInfo?.headers ?? {};
213+
const authInfo = extra.authInfo ?? null;
214+
215+
// Track non-function properties available in extra
216+
const extraRecord = extra as Record<string, unknown>;
217+
const extraKeys = Object.keys(extraRecord).filter(
218+
(key) => typeof extraRecord[key] !== "function"
219+
);
220+
221+
// Build response object with all available data
222+
const responseData: EchoResponseData = {
223+
headers,
224+
authInfo,
225+
hasRequestInfo: !!extra.requestInfo,
226+
hasAuthInfo: !!extra.authInfo,
227+
requestId: extra.requestId,
228+
// Include any sessionId if it exists
229+
sessionId: extra.sessionId ?? null,
230+
// List all available properties in extra
231+
availableExtraKeys: extraKeys
232+
};
233+
234+
// Add any other properties from extra that aren't already included
235+
extraKeys.forEach((key) => {
236+
if (
237+
!["requestInfo", "authInfo", "requestId", "sessionId"].includes(key)
238+
) {
239+
responseData[`extra_${key}`] = extraRecord[key];
240+
}
241+
});
242+
243+
return {
244+
content: [
245+
{
246+
type: "text",
247+
text: JSON.stringify(responseData, null, 2)
248+
}
249+
]
250+
};
251+
}
252+
);
184253
}
185254
}
186255

0 commit comments

Comments
 (0)