From bb22203c69ed5d58cfb23a5d15ff9b2ed1eb21f5 Mon Sep 17 00:00:00 2001 From: Naseem AlNaji Date: Tue, 18 Nov 2025 14:23:39 -0500 Subject: [PATCH 1/9] feat: add header and auth parity to SSE transport in MCP - Add MessageExtraInfo support to SSE transport matching StreamableHTTP behavior - Pass request headers and auth info through SSE message handling - Add RequestWithAuth type for consistent auth info passing - Filter internal headers (content-type, content-length, upgrade) before passing to MCP server - Add comprehensive tests for header and auth handling in both transports - Ensure transport initialization before processing SSE messages This ensures both SSE and StreamableHTTP transports have consistent header and authentication handling capabilities, allowing MCP servers to access request context regardless of transport type. --- packages/agents/src/mcp/index.ts | 44 +++++++- packages/agents/src/mcp/transport.ts | 8 +- packages/agents/src/mcp/types.ts | 8 ++ packages/agents/src/mcp/utils.ts | 7 +- .../mcp/transports/streamable-http.test.ts | 100 ++++++++++++++++++ 5 files changed, 158 insertions(+), 9 deletions(-) diff --git a/packages/agents/src/mcp/index.ts b/packages/agents/src/mcp/index.ts index d7b8e629..6be56a39 100644 --- a/packages/agents/src/mcp/index.ts +++ b/packages/agents/src/mcp/index.ts @@ -1,13 +1,17 @@ import type { Server } from "@modelcontextprotocol/sdk/server/index.js"; import type { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import type { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; -import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js"; +import type { + JSONRPCMessage, + MessageExtraInfo +} from "@modelcontextprotocol/sdk/types.js"; import { JSONRPCMessageSchema, isJSONRPCError, isJSONRPCResponse, type ElicitResult } from "@modelcontextprotocol/sdk/types.js"; +import type { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types.js"; import type { Connection, ConnectionContext } from "../"; import { Agent } from "../index"; import type { BaseTransportType, MaybePromise, ServeOptions } from "./types"; @@ -85,11 +89,34 @@ export abstract class McpAgent< return websockets[0]; } + /** Build MessageExtraInfo from extracted headers and auth info */ + private buildMessageExtraInfo(extraInfo?: { + headers: Record; + auth?: AuthInfo; + }): MessageExtraInfo | undefined { + if (!extraInfo) return; + + const headers = { ...extraInfo.headers }; + + // Remove internal headers that are not part of the original request + delete headers["content-type"]; + delete headers["content-length"]; + delete headers["upgrade"]; + + return { + authInfo: extraInfo.auth, + requestInfo: { headers } + }; + } + /** Returns a new transport matching the type of the Agent. */ private initTransport() { switch (this.getTransportType()) { case "sse": { - return new McpSSETransport(() => this.getWebSocket()); + return new McpSSETransport( + () => this.getWebSocket(), + this.getSessionId() + ); } case "streamable-http": { return new StreamableHTTPServerTransport({}); @@ -188,7 +215,8 @@ export abstract class McpAgent< /** Handles MCP Messages for the legacy SSE transport. */ async onSSEMcpMessage( _sessionId: string, - messageBody: unknown + messageBody: unknown, + extraInfo?: { headers: Record; auth?: AuthInfo } ): Promise { // Since we address the DO via both the protocol and the session id, // this should never happen, but let's enforce it just in case @@ -196,6 +224,11 @@ export abstract class McpAgent< return new Error("Internal Server Error: Expected SSE transport"); } + // Ensure transport is initialized before processing messages + if (!this._transport) { + await this.onStart(); + } + try { let parsedMessage: JSONRPCMessage; try { @@ -210,7 +243,10 @@ export abstract class McpAgent< return null; // Message was handled by elicitation system } - this._transport?.onmessage?.(parsedMessage); + // Build extra info with auth and request headers, matching StreamableHTTP behavior + const extra = this.buildMessageExtraInfo(extraInfo); + + this._transport?.onmessage?.(parsedMessage, extra); return null; } catch (error) { console.error("Error forwarding message to SSE:", error); diff --git a/packages/agents/src/mcp/transport.ts b/packages/agents/src/mcp/transport.ts index 9dade9f4..ee8f2fcf 100644 --- a/packages/agents/src/mcp/transport.ts +++ b/packages/agents/src/mcp/transport.ts @@ -12,6 +12,7 @@ import { import type { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types.js"; import { getCurrentAgent, type Connection } from ".."; import type { McpAgent } from "."; +import type { RequestWithAuth } from "./types"; import { MessageType } from "../ai-types"; import { MCP_HTTP_METHOD_HEADER, MCP_MESSAGE_HEADER } from "./utils"; @@ -20,12 +21,13 @@ export class McpSSETransport implements Transport { // Set by the server in `server.connect(transport)` onclose?: () => void; onerror?: (error: Error) => void; - onmessage?: (message: JSONRPCMessage) => void; + onmessage?: (message: JSONRPCMessage, extra?: MessageExtraInfo) => void; private _getWebSocket: () => WebSocket | null; private _started = false; - constructor(getWebSocket: () => WebSocket | null) { + constructor(getWebSocket: () => WebSocket | null, sessionId?: string) { this._getWebSocket = getWebSocket; + this.sessionId = sessionId; } async start() { @@ -220,7 +222,7 @@ export class StreamableHTTPServerTransport implements Transport { * Handles POST requests containing JSON-RPC messages */ async handlePostRequest( - req: Request & { auth?: AuthInfo }, + req: RequestWithAuth, parsedBody: unknown ): Promise { const authInfo: AuthInfo | undefined = req.auth; diff --git a/packages/agents/src/mcp/types.ts b/packages/agents/src/mcp/types.ts index 404c50b3..1a4514cf 100644 --- a/packages/agents/src/mcp/types.ts +++ b/packages/agents/src/mcp/types.ts @@ -1,9 +1,17 @@ +import type { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types.js"; + export type MaybePromise = T | Promise; export type MaybeConnectionTag = { role: string } | undefined; export type BaseTransportType = "sse" | "streamable-http"; export type TransportType = BaseTransportType | "auto"; +/** + * Extended Request type that includes optional authentication info. + * Used throughout MCP transport handlers to pass auth context from middleware. + */ +export type RequestWithAuth = Request & { auth?: AuthInfo }; + export interface CORSOptions { origin?: string; methods?: string; diff --git a/packages/agents/src/mcp/utils.ts b/packages/agents/src/mcp/utils.ts index 0996b909..f7dcef2c 100644 --- a/packages/agents/src/mcp/utils.ts +++ b/packages/agents/src/mcp/utils.ts @@ -7,7 +7,7 @@ import { } from "@modelcontextprotocol/sdk/types.js"; import type { McpAgent } from "."; import { getAgentByName } from ".."; -import type { CORSOptions } from "./types"; +import type { CORSOptions, RequestWithAuth } from "./types"; import { MessageType } from "../ai-types"; /** @@ -682,7 +682,10 @@ export const createLegacySseHandler = ( }); const messageBody = await request.json(); - const error = await agent.onSSEMcpMessage(sessionId, messageBody); + const error = await agent.onSSEMcpMessage(sessionId, messageBody, { + headers: Object.fromEntries(request.headers.entries()), + auth: (request as RequestWithAuth).auth + }); if (error) { return new Response(error.message, { diff --git a/packages/agents/src/tests/mcp/transports/streamable-http.test.ts b/packages/agents/src/tests/mcp/transports/streamable-http.test.ts index a6802f4f..e727df64 100644 --- a/packages/agents/src/tests/mcp/transports/streamable-http.test.ts +++ b/packages/agents/src/tests/mcp/transports/streamable-http.test.ts @@ -583,4 +583,104 @@ describe("Streamable HTTP Transport", () => { expect(tools.some((t) => t.name === "temp-echo")).toBe(false); }); }); + + describe("Header and Auth Handling", () => { + it("should pass custom headers to transport via requestInfo", async () => { + const ctx = createExecutionContext(); + const sessionId = await initializeStreamableHTTPServer(ctx); + + // Send request with custom headers + const request = new Request(baseUrl, { + body: JSON.stringify(TEST_MESSAGES.toolsList), + headers: { + Accept: "application/json, text/event-stream", + "Content-Type": "application/json", + "mcp-session-id": sessionId, + "x-user-id": "test-user-123", + "x-request-id": "req-456", + "x-custom-header": "custom-value" + }, + method: "POST" + }); + + // Spy on the transport to capture the extraInfo + // Note: In actual implementation, the headers are available to the MCP server + // through MessageExtraInfo.requestInfo.headers + const response = await worker.fetch(request, env, ctx); + + expect(response.status).toBe(200); + + // The test verifies that custom headers would be available in the transport + // through the MessageExtraInfo structure, filtered of internal headers + const sseText = await readSSEEvent(response); + const result = parseSSEData(sseText); + expectValidToolsList(result); + }); + + it("should extract auth info from RequestWithAuth", async () => { + const ctx = createExecutionContext(); + const sessionId = await initializeStreamableHTTPServer(ctx); + + // Create a request with auth info attached (simulating middleware) + const request = new Request(baseUrl, { + body: JSON.stringify(TEST_MESSAGES.toolsList), + headers: { + Accept: "application/json, text/event-stream", + "Content-Type": "application/json", + "mcp-session-id": sessionId + }, + method: "POST" + }) as any; // Cast to any to add auth property + + // Simulate auth middleware adding auth info + request.auth = { + userId: "test-user", + scopes: ["read", "write"], + metadata: { + role: "admin" + } + }; + + const response = await worker.fetch(request, env, ctx); + + expect(response.status).toBe(200); + + // The auth info would be available to the MCP server through + // MessageExtraInfo.authInfo + const sseText = await readSSEEvent(response); + const result = parseSSEData(sseText); + expectValidToolsList(result); + }); + + it("should filter internal MCP headers from requestInfo", async () => { + const ctx = createExecutionContext(); + const sessionId = await initializeStreamableHTTPServer(ctx); + + // Send request with custom headers + // Internal headers (cf-mcp-method, cf-mcp-message) are added by the transport layer + // and should be filtered before passing to the MCP server + const request = new Request(baseUrl, { + body: JSON.stringify(TEST_MESSAGES.toolsList), + headers: { + Accept: "application/json, text/event-stream", + "Content-Type": "application/json", + "mcp-session-id": sessionId, + "x-custom-header": "should-be-preserved", + "x-another-header": "also-preserved" + }, + method: "POST" + }); + + const response = await worker.fetch(request, env, ctx); + + expect(response.status).toBe(200); + + // The test verifies that internal headers (cf-mcp-method, cf-mcp-message, upgrade) + // are filtered out before being passed to the MCP server + // Only custom headers like x-custom-header should be preserved in requestInfo.headers + const sseText = await readSSEEvent(response); + const result = parseSSEData(sseText); + expectValidToolsList(result); + }); + }); }); From 2a4fc7b5f0275d7f5ad43436c2af8f7916d265e9 Mon Sep 17 00:00:00 2001 From: Naseem AlNaji Date: Tue, 18 Nov 2025 14:51:20 -0500 Subject: [PATCH 2/9] chore: add changeset for SSE transport fixes --- .changeset/stupid-plums-wear.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/stupid-plums-wear.md diff --git a/.changeset/stupid-plums-wear.md b/.changeset/stupid-plums-wear.md new file mode 100644 index 00000000..eb3a7f65 --- /dev/null +++ b/.changeset/stupid-plums-wear.md @@ -0,0 +1,7 @@ +--- +"agents": patch +--- + +fix: add session ID, auth, and header support to SSE transport + +The SSE transport now properly forwards session IDs, authentication info, and request headers to MCP message handlers, achieving feature parity with StreamableHTTP transport. This allows MCP servers using SSE to access critical request context for authentication and session management. From 041159990d7e261162c9514980a4116e44a542e3 Mon Sep 17 00:00:00 2001 From: Naseem AlNaji Date: Wed, 19 Nov 2025 14:45:33 -0500 Subject: [PATCH 3/9] fix: remove transport initialization check --- packages/agents/src/mcp/index.ts | 5 ----- 1 file changed, 5 deletions(-) diff --git a/packages/agents/src/mcp/index.ts b/packages/agents/src/mcp/index.ts index 6be56a39..99efae0a 100644 --- a/packages/agents/src/mcp/index.ts +++ b/packages/agents/src/mcp/index.ts @@ -224,11 +224,6 @@ export abstract class McpAgent< return new Error("Internal Server Error: Expected SSE transport"); } - // Ensure transport is initialized before processing messages - if (!this._transport) { - await this.onStart(); - } - try { let parsedMessage: JSONRPCMessage; try { From d767e737c22653c86351b8fccfd15bc586249ef2 Mon Sep 17 00:00:00 2001 From: Naseem AlNaji Date: Wed, 19 Nov 2025 15:20:55 -0500 Subject: [PATCH 4/9] refactor: update McpSSETransport to use options object for constructor parameters --- packages/agents/src/mcp/index.ts | 10 ++++------ packages/agents/src/mcp/transport.ts | 11 ++++++++--- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/packages/agents/src/mcp/index.ts b/packages/agents/src/mcp/index.ts index 99efae0a..4dec71d8 100644 --- a/packages/agents/src/mcp/index.ts +++ b/packages/agents/src/mcp/index.ts @@ -99,8 +99,6 @@ export abstract class McpAgent< const headers = { ...extraInfo.headers }; // Remove internal headers that are not part of the original request - delete headers["content-type"]; - delete headers["content-length"]; delete headers["upgrade"]; return { @@ -113,10 +111,10 @@ export abstract class McpAgent< private initTransport() { switch (this.getTransportType()) { case "sse": { - return new McpSSETransport( - () => this.getWebSocket(), - this.getSessionId() - ); + return new McpSSETransport({ + getWebSocket: () => this.getWebSocket(), + sessionId: this.getSessionId() + }); } case "streamable-http": { return new StreamableHTTPServerTransport({}); diff --git a/packages/agents/src/mcp/transport.ts b/packages/agents/src/mcp/transport.ts index ee8f2fcf..7d65def4 100644 --- a/packages/agents/src/mcp/transport.ts +++ b/packages/agents/src/mcp/transport.ts @@ -16,6 +16,11 @@ import type { RequestWithAuth } from "./types"; import { MessageType } from "../ai-types"; import { MCP_HTTP_METHOD_HEADER, MCP_MESSAGE_HEADER } from "./utils"; +export interface McpSSETransportOptions { + getWebSocket: () => WebSocket | null; + sessionId?: string; +} + export class McpSSETransport implements Transport { sessionId?: string; // Set by the server in `server.connect(transport)` @@ -25,9 +30,9 @@ export class McpSSETransport implements Transport { private _getWebSocket: () => WebSocket | null; private _started = false; - constructor(getWebSocket: () => WebSocket | null, sessionId?: string) { - this._getWebSocket = getWebSocket; - this.sessionId = sessionId; + constructor(options: McpSSETransportOptions) { + this._getWebSocket = options.getWebSocket; + this.sessionId = options.sessionId; } async start() { From 4effb09cac9f407d1660bca9083f9e8846ff2660 Mon Sep 17 00:00:00 2001 From: Naseem AlNaji Date: Wed, 19 Nov 2025 15:59:40 -0500 Subject: [PATCH 5/9] fix: revert auth inclusion on SSE transport --- .changeset/stupid-plums-wear.md | 4 ++-- packages/agents/src/mcp/index.ts | 5 +--- packages/agents/src/mcp/transport.ts | 3 +-- packages/agents/src/mcp/types.ts | 8 ------- packages/agents/src/mcp/utils.ts | 5 ++-- .../mcp/transports/streamable-http.test.ts | 23 +++++++------------ 6 files changed, 14 insertions(+), 34 deletions(-) diff --git a/.changeset/stupid-plums-wear.md b/.changeset/stupid-plums-wear.md index eb3a7f65..f4dac530 100644 --- a/.changeset/stupid-plums-wear.md +++ b/.changeset/stupid-plums-wear.md @@ -2,6 +2,6 @@ "agents": patch --- -fix: add session ID, auth, and header support to SSE transport +fix: add session ID and header support to SSE transport -The SSE transport now properly forwards session IDs, authentication info, and request headers to MCP message handlers, achieving feature parity with StreamableHTTP transport. This allows MCP servers using SSE to access critical request context for authentication and session management. +The SSE transport now properly forwards session IDs and request headers to MCP message handlers, achieving closer header parity with StreamableHTTP transport. This allows MCP servers using SSE to access request headers for session management. diff --git a/packages/agents/src/mcp/index.ts b/packages/agents/src/mcp/index.ts index 4dec71d8..4ad8a176 100644 --- a/packages/agents/src/mcp/index.ts +++ b/packages/agents/src/mcp/index.ts @@ -11,7 +11,6 @@ import { isJSONRPCResponse, type ElicitResult } from "@modelcontextprotocol/sdk/types.js"; -import type { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types.js"; import type { Connection, ConnectionContext } from "../"; import { Agent } from "../index"; import type { BaseTransportType, MaybePromise, ServeOptions } from "./types"; @@ -92,7 +91,6 @@ export abstract class McpAgent< /** Build MessageExtraInfo from extracted headers and auth info */ private buildMessageExtraInfo(extraInfo?: { headers: Record; - auth?: AuthInfo; }): MessageExtraInfo | undefined { if (!extraInfo) return; @@ -102,7 +100,6 @@ export abstract class McpAgent< delete headers["upgrade"]; return { - authInfo: extraInfo.auth, requestInfo: { headers } }; } @@ -214,7 +211,7 @@ export abstract class McpAgent< async onSSEMcpMessage( _sessionId: string, messageBody: unknown, - extraInfo?: { headers: Record; auth?: AuthInfo } + extraInfo?: { headers: Record } ): Promise { // Since we address the DO via both the protocol and the session id, // this should never happen, but let's enforce it just in case diff --git a/packages/agents/src/mcp/transport.ts b/packages/agents/src/mcp/transport.ts index 7d65def4..521a74ff 100644 --- a/packages/agents/src/mcp/transport.ts +++ b/packages/agents/src/mcp/transport.ts @@ -12,7 +12,6 @@ import { import type { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types.js"; import { getCurrentAgent, type Connection } from ".."; import type { McpAgent } from "."; -import type { RequestWithAuth } from "./types"; import { MessageType } from "../ai-types"; import { MCP_HTTP_METHOD_HEADER, MCP_MESSAGE_HEADER } from "./utils"; @@ -227,7 +226,7 @@ export class StreamableHTTPServerTransport implements Transport { * Handles POST requests containing JSON-RPC messages */ async handlePostRequest( - req: RequestWithAuth, + req: Request & { auth?: AuthInfo }, parsedBody: unknown ): Promise { const authInfo: AuthInfo | undefined = req.auth; diff --git a/packages/agents/src/mcp/types.ts b/packages/agents/src/mcp/types.ts index 1a4514cf..404c50b3 100644 --- a/packages/agents/src/mcp/types.ts +++ b/packages/agents/src/mcp/types.ts @@ -1,17 +1,9 @@ -import type { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types.js"; - export type MaybePromise = T | Promise; export type MaybeConnectionTag = { role: string } | undefined; export type BaseTransportType = "sse" | "streamable-http"; export type TransportType = BaseTransportType | "auto"; -/** - * Extended Request type that includes optional authentication info. - * Used throughout MCP transport handlers to pass auth context from middleware. - */ -export type RequestWithAuth = Request & { auth?: AuthInfo }; - export interface CORSOptions { origin?: string; methods?: string; diff --git a/packages/agents/src/mcp/utils.ts b/packages/agents/src/mcp/utils.ts index f7dcef2c..bf3b28b2 100644 --- a/packages/agents/src/mcp/utils.ts +++ b/packages/agents/src/mcp/utils.ts @@ -7,7 +7,7 @@ import { } from "@modelcontextprotocol/sdk/types.js"; import type { McpAgent } from "."; import { getAgentByName } from ".."; -import type { CORSOptions, RequestWithAuth } from "./types"; +import type { CORSOptions } from "./types"; import { MessageType } from "../ai-types"; /** @@ -683,8 +683,7 @@ export const createLegacySseHandler = ( const messageBody = await request.json(); const error = await agent.onSSEMcpMessage(sessionId, messageBody, { - headers: Object.fromEntries(request.headers.entries()), - auth: (request as RequestWithAuth).auth + headers: Object.fromEntries(request.headers.entries()) }); if (error) { diff --git a/packages/agents/src/tests/mcp/transports/streamable-http.test.ts b/packages/agents/src/tests/mcp/transports/streamable-http.test.ts index e727df64..bd57ee78 100644 --- a/packages/agents/src/tests/mcp/transports/streamable-http.test.ts +++ b/packages/agents/src/tests/mcp/transports/streamable-http.test.ts @@ -617,36 +617,29 @@ describe("Streamable HTTP Transport", () => { expectValidToolsList(result); }); - it("should extract auth info from RequestWithAuth", async () => { + it("should extract request headers from request", async () => { const ctx = createExecutionContext(); const sessionId = await initializeStreamableHTTPServer(ctx); - // Create a request with auth info attached (simulating middleware) + // Create a request with custom headers const request = new Request(baseUrl, { body: JSON.stringify(TEST_MESSAGES.toolsList), headers: { Accept: "application/json, text/event-stream", "Content-Type": "application/json", - "mcp-session-id": sessionId + "mcp-session-id": sessionId, + "X-Custom-Header": "custom-value", + "X-Request-Id": "test-123" }, method: "POST" - }) as any; // Cast to any to add auth property - - // Simulate auth middleware adding auth info - request.auth = { - userId: "test-user", - scopes: ["read", "write"], - metadata: { - role: "admin" - } - }; + }); const response = await worker.fetch(request, env, ctx); expect(response.status).toBe(200); - // The auth info would be available to the MCP server through - // MessageExtraInfo.authInfo + // The request headers would be available to the MCP server through + // MessageExtraInfo.requestInfo.headers const sseText = await readSSEEvent(response); const result = parseSSEData(sseText); expectValidToolsList(result); From 3702f3194dbead2fa766af88e23e52a6db417c2a Mon Sep 17 00:00:00 2001 From: Naseem AlNaji Date: Wed, 19 Nov 2025 16:40:40 -0500 Subject: [PATCH 6/9] feat: implement header and auth echo tool for SSE and Streamable HTTP transport tests --- packages/agents/src/mcp/index.ts | 2 + .../src/tests/mcp/transports/sse.test.ts | 77 +++++++++++++ .../mcp/transports/streamable-http.test.ts | 106 +++++++----------- packages/agents/src/tests/worker.ts | 50 +++++++++ 4 files changed, 167 insertions(+), 68 deletions(-) diff --git a/packages/agents/src/mcp/index.ts b/packages/agents/src/mcp/index.ts index 4ad8a176..70f947e8 100644 --- a/packages/agents/src/mcp/index.ts +++ b/packages/agents/src/mcp/index.ts @@ -97,6 +97,8 @@ export abstract class McpAgent< const headers = { ...extraInfo.headers }; // Remove internal headers that are not part of the original request + delete headers[MCP_HTTP_METHOD_HEADER]; + delete headers[MCP_MESSAGE_HEADER]; delete headers["upgrade"]; return { diff --git a/packages/agents/src/tests/mcp/transports/sse.test.ts b/packages/agents/src/tests/mcp/transports/sse.test.ts index a66cf29e..7b0f4715 100644 --- a/packages/agents/src/tests/mcp/transports/sse.test.ts +++ b/packages/agents/src/tests/mcp/transports/sse.test.ts @@ -1,5 +1,9 @@ import { createExecutionContext, env } from "cloudflare:test"; import { describe, expect, it } from "vitest"; +import type { + CallToolResult, + JSONRPCResponse +} from "@modelcontextprotocol/sdk/types.js"; import worker, { type Env } from "../../worker"; import { establishSSEConnection } from "../../shared/test-utils"; @@ -117,4 +121,77 @@ describe("SSE Transport", () => { expect(response.headers.get("Content-Type")).toBe("text/event-stream"); }); }); + + describe("Header and Auth Handling", () => { + it("should pass headers and session ID to transport via requestInfo", async () => { + const ctx = createExecutionContext(); + const { sessionId, reader } = await establishSSEConnection(ctx); + + // Send request with custom headers using the echoRequestInfo tool + const request = new Request(`${baseUrl}/message?sessionId=${sessionId}`, { + method: "POST", + body: JSON.stringify({ + id: "echo-headers-1", + jsonrpc: "2.0", + method: "tools/call", + params: { + name: "echoRequestInfo", + arguments: {} + } + }), + headers: { + "Content-Type": "application/json", + "x-user-id": "test-user-123", + "x-request-id": "req-456", + "x-custom-header": "custom-value" + } + }); + + const response = await worker.fetch(request, env, ctx); + expect(response.status).toBe(202); // SSE returns 202 Accepted + + // Read the response from the SSE stream + const { value } = await reader.read(); + const event = new TextDecoder().decode(value); + const lines = event.split("\n"); + expect(lines[0]).toEqual("event: message"); + + // Parse the JSON response from the data line + const dataLine = lines.find((line) => line.startsWith("data:")); + const parsed = JSON.parse( + dataLine!.replace("data: ", "") + ) as JSONRPCResponse; + expect(parsed.id).toBe("echo-headers-1"); + + // Extract the echoed request info + const result = parsed.result as CallToolResult; + const textContent = result.content?.[0]; + if (!textContent || textContent.type !== "text") { + throw new Error("Expected text content in tool result"); + } + const echoedData = JSON.parse(textContent.text); + + // Verify custom headers were passed through + expect(echoedData.hasRequestInfo).toBe(true); + expect(echoedData.headers["x-user-id"]).toBe("test-user-123"); + expect(echoedData.headers["x-request-id"]).toBe("req-456"); + expect(echoedData.headers["x-custom-header"]).toBe("custom-value"); + + // Verify that certain internal headers that the transport adds are NOT exposed + // The transport filters cf-mcp-method, cf-mcp-message, and upgrade headers + expect(echoedData.headers["cf-mcp-method"]).toBeUndefined(); + expect(echoedData.headers["cf-mcp-message"]).toBeUndefined(); + expect(echoedData.headers["upgrade"]).toBeUndefined(); + + // Verify standard headers are also present + expect(echoedData.headers["content-type"]).toBe("application/json"); + + // Check what properties are available in extra + expect(echoedData.availableExtraKeys).toBeDefined(); + + // Verify sessionId is passed through extra data + expect(echoedData.sessionId).toBeDefined(); + expect(echoedData.sessionId).toBe(sessionId); + }); + }); }); diff --git a/packages/agents/src/tests/mcp/transports/streamable-http.test.ts b/packages/agents/src/tests/mcp/transports/streamable-http.test.ts index bd57ee78..63f80214 100644 --- a/packages/agents/src/tests/mcp/transports/streamable-http.test.ts +++ b/packages/agents/src/tests/mcp/transports/streamable-http.test.ts @@ -589,9 +589,19 @@ describe("Streamable HTTP Transport", () => { const ctx = createExecutionContext(); const sessionId = await initializeStreamableHTTPServer(ctx); - // Send request with custom headers + // Send request with custom headers using the echoRequestInfo tool + const echoMessage: JSONRPCMessage = { + id: "echo-headers-1", + jsonrpc: "2.0", + method: "tools/call", + params: { + name: "echoRequestInfo", + arguments: {} + } + }; + const request = new Request(baseUrl, { - body: JSON.stringify(TEST_MESSAGES.toolsList), + body: JSON.stringify(echoMessage), headers: { Accept: "application/json, text/event-stream", "Content-Type": "application/json", @@ -603,77 +613,37 @@ describe("Streamable HTTP Transport", () => { method: "POST" }); - // Spy on the transport to capture the extraInfo - // Note: In actual implementation, the headers are available to the MCP server - // through MessageExtraInfo.requestInfo.headers const response = await worker.fetch(request, env, ctx); - expect(response.status).toBe(200); - // The test verifies that custom headers would be available in the transport - // through the MessageExtraInfo structure, filtered of internal headers + // Parse the SSE response const sseText = await readSSEEvent(response); - const result = parseSSEData(sseText); - expectValidToolsList(result); - }); - - it("should extract request headers from request", async () => { - const ctx = createExecutionContext(); - const sessionId = await initializeStreamableHTTPServer(ctx); - - // Create a request with custom headers - const request = new Request(baseUrl, { - body: JSON.stringify(TEST_MESSAGES.toolsList), - headers: { - Accept: "application/json, text/event-stream", - "Content-Type": "application/json", - "mcp-session-id": sessionId, - "X-Custom-Header": "custom-value", - "X-Request-Id": "test-123" - }, - method: "POST" - }); - - const response = await worker.fetch(request, env, ctx); - - expect(response.status).toBe(200); - - // The request headers would be available to the MCP server through - // MessageExtraInfo.requestInfo.headers - const sseText = await readSSEEvent(response); - const result = parseSSEData(sseText); - expectValidToolsList(result); - }); - - it("should filter internal MCP headers from requestInfo", async () => { - const ctx = createExecutionContext(); - const sessionId = await initializeStreamableHTTPServer(ctx); - - // Send request with custom headers - // Internal headers (cf-mcp-method, cf-mcp-message) are added by the transport layer - // and should be filtered before passing to the MCP server - const request = new Request(baseUrl, { - body: JSON.stringify(TEST_MESSAGES.toolsList), - headers: { - Accept: "application/json, text/event-stream", - "Content-Type": "application/json", - "mcp-session-id": sessionId, - "x-custom-header": "should-be-preserved", - "x-another-header": "also-preserved" - }, - method: "POST" - }); - - const response = await worker.fetch(request, env, ctx); - - expect(response.status).toBe(200); + const parsed = parseSSEData(sseText) as JSONRPCResponse; + expect(parsed.id).toBe("echo-headers-1"); - // The test verifies that internal headers (cf-mcp-method, cf-mcp-message, upgrade) - // are filtered out before being passed to the MCP server - // Only custom headers like x-custom-header should be preserved in requestInfo.headers - const sseText = await readSSEEvent(response); - const result = parseSSEData(sseText); - expectValidToolsList(result); + // Extract the echoed request info + const result = parsed.result as CallToolResult; + const echoedData = JSON.parse(result.content?.[0]?.text || "{}"); + + // Verify custom headers were passed through + expect(echoedData.hasRequestInfo).toBe(true); + expect(echoedData.headers["x-user-id"]).toBe("test-user-123"); + expect(echoedData.headers["x-request-id"]).toBe("req-456"); + expect(echoedData.headers["x-custom-header"]).toBe("custom-value"); + + // Verify that certain internal headers that the transport adds are NOT exposed + // The transport adds cf-mcp-method and cf-mcp-message internally but should filter them + expect(echoedData.headers["cf-mcp-method"]).toBeUndefined(); + expect(echoedData.headers["cf-mcp-message"]).toBeUndefined(); + expect(echoedData.headers["upgrade"]).toBeUndefined(); + + // Verify standard headers are also present + expect(echoedData.headers["accept"]).toContain("text/event-stream"); + expect(echoedData.headers["content-type"]).toBe("application/json"); + + // Verify sessionId is passed through extra data + expect(echoedData.sessionId).toBeDefined(); + expect(echoedData.sessionId).toBe(sessionId); }); }); }); diff --git a/packages/agents/src/tests/worker.ts b/packages/agents/src/tests/worker.ts index 73167c9b..1a86c719 100644 --- a/packages/agents/src/tests/worker.ts +++ b/packages/agents/src/tests/worker.ts @@ -127,6 +127,56 @@ export class TestMcpAgent extends McpAgent { return { content: [{ type: "text", text: "nothing to remove" }] }; } ); + + // Echo request info for testing header and auth passthrough + this.server.tool( + "echoRequestInfo", + "Echo back request headers and auth info", + {}, + async (_args, extra): Promise => { + // Extract headers from requestInfo, auth from authInfo + const headers = extra?.requestInfo?.headers || {}; + const authInfo = extra?.authInfo || null; + + // Get all properties from extra, excluding functions + const extraKeys = extra + ? Object.keys(extra).filter( + (key) => typeof extra[key as keyof typeof extra] !== "function" + ) + : []; + + // Build response object with all available data + const responseData: Record = { + headers: headers, + authInfo: authInfo, + hasRequestInfo: !!extra?.requestInfo, + hasAuthInfo: !!extra?.authInfo, + requestId: extra?.requestId, + // Include any sessionId if it exists + sessionId: (extra as any)?.sessionId || null, + // List all available properties in extra + availableExtraKeys: extraKeys + }; + + // Add any other properties from extra that aren't already included + extraKeys.forEach((key) => { + if ( + !["requestInfo", "authInfo", "requestId", "sessionId"].includes(key) + ) { + responseData[`extra_${key}`] = (extra as any)[key]; + } + }); + + return { + content: [ + { + type: "text", + text: JSON.stringify(responseData, null, 2) + } + ] + }; + } + ); } } From 3497dc4f9ba7b296f3e9cd42407f40203c34b42d Mon Sep 17 00:00:00 2001 From: Naseem AlNaji Date: Thu, 20 Nov 2025 15:45:55 -0500 Subject: [PATCH 7/9] refactor: simplify McpSSETransport initialization by using getCurrentAgent util --- packages/agents/src/mcp/index.ts | 7 ++----- packages/agents/src/mcp/transport.ts | 17 ++++++++--------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/packages/agents/src/mcp/index.ts b/packages/agents/src/mcp/index.ts index 2719483a..d7af1a50 100644 --- a/packages/agents/src/mcp/index.ts +++ b/packages/agents/src/mcp/index.ts @@ -80,7 +80,7 @@ export abstract class McpAgent< } /** Get the unique WebSocket. SSE transport only. */ - private getWebSocket() { + getWebSocket() { const websockets = Array.from(this.getConnections()); if (websockets.length === 0) { return null; @@ -110,10 +110,7 @@ export abstract class McpAgent< private initTransport() { switch (this.getTransportType()) { case "sse": { - return new McpSSETransport({ - getWebSocket: () => this.getWebSocket(), - sessionId: this.getSessionId() - }); + return new McpSSETransport(); } case "streamable-http": { return new StreamableHTTPServerTransport({}); diff --git a/packages/agents/src/mcp/transport.ts b/packages/agents/src/mcp/transport.ts index 521a74ff..243044d3 100644 --- a/packages/agents/src/mcp/transport.ts +++ b/packages/agents/src/mcp/transport.ts @@ -15,13 +15,8 @@ import type { McpAgent } from "."; import { MessageType } from "../ai-types"; import { MCP_HTTP_METHOD_HEADER, MCP_MESSAGE_HEADER } from "./utils"; -export interface McpSSETransportOptions { - getWebSocket: () => WebSocket | null; - sessionId?: string; -} - export class McpSSETransport implements Transport { - sessionId?: string; + sessionId: string; // Set by the server in `server.connect(transport)` onclose?: () => void; onerror?: (error: Error) => void; @@ -29,9 +24,13 @@ export class McpSSETransport implements Transport { private _getWebSocket: () => WebSocket | null; private _started = false; - constructor(options: McpSSETransportOptions) { - this._getWebSocket = options.getWebSocket; - this.sessionId = options.sessionId; + constructor() { + const { agent } = getCurrentAgent(); + if (!agent) + throw new Error("McpAgent was not found in Transport constructor"); + + this.sessionId = agent.getSessionId(); + this._getWebSocket = () => agent.getWebSocket(); } async start() { From 65c36f5faf3ef8cb795d0f9e5c757bc592134c72 Mon Sep 17 00:00:00 2001 From: Naseem AlNaji Date: Thu, 20 Nov 2025 16:05:51 -0500 Subject: [PATCH 8/9] refactor: replace buildMessageExtraInfo with direct MessageExtraInfo construction in onSSEMcpMessage --- packages/agents/src/mcp/index.ts | 25 ++----------------------- packages/agents/src/mcp/utils.ts | 22 +++++++++++++++++++--- 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/packages/agents/src/mcp/index.ts b/packages/agents/src/mcp/index.ts index d7af1a50..082b9314 100644 --- a/packages/agents/src/mcp/index.ts +++ b/packages/agents/src/mcp/index.ts @@ -88,24 +88,6 @@ export abstract class McpAgent< return websockets[0]; } - /** Build MessageExtraInfo from extracted headers and auth info */ - private buildMessageExtraInfo(extraInfo?: { - headers: Record; - }): MessageExtraInfo | undefined { - if (!extraInfo) return; - - const headers = { ...extraInfo.headers }; - - // Remove internal headers that are not part of the original request - delete headers[MCP_HTTP_METHOD_HEADER]; - delete headers[MCP_MESSAGE_HEADER]; - delete headers["upgrade"]; - - return { - requestInfo: { headers } - }; - } - /** Returns a new transport matching the type of the Agent. */ private initTransport() { switch (this.getTransportType()) { @@ -210,7 +192,7 @@ export abstract class McpAgent< async onSSEMcpMessage( _sessionId: string, messageBody: unknown, - extraInfo?: { headers: Record } + extraInfo?: MessageExtraInfo ): Promise { // Since we address the DO via both the protocol and the session id, // this should never happen, but let's enforce it just in case @@ -232,10 +214,7 @@ export abstract class McpAgent< return null; // Message was handled by elicitation system } - // Build extra info with auth and request headers, matching StreamableHTTP behavior - const extra = this.buildMessageExtraInfo(extraInfo); - - this._transport?.onmessage?.(parsedMessage, extra); + this._transport?.onmessage?.(parsedMessage, extraInfo); return null; } catch (error) { console.error("Error forwarding message to SSE:", error); diff --git a/packages/agents/src/mcp/utils.ts b/packages/agents/src/mcp/utils.ts index bf3b28b2..2d3205d7 100644 --- a/packages/agents/src/mcp/utils.ts +++ b/packages/agents/src/mcp/utils.ts @@ -1,6 +1,7 @@ import { JSONRPCMessageSchema, type JSONRPCMessage, + type MessageExtraInfo, InitializeRequestSchema, isJSONRPCResponse, isJSONRPCNotification @@ -682,9 +683,24 @@ export const createLegacySseHandler = ( }); const messageBody = await request.json(); - const error = await agent.onSSEMcpMessage(sessionId, messageBody, { - headers: Object.fromEntries(request.headers.entries()) - }); + + // Build MessageExtraInfo with filtered headers + const headers = Object.fromEntries(request.headers.entries()); + + // Remove internal headers that are not part of the original request + delete headers[MCP_HTTP_METHOD_HEADER]; + delete headers[MCP_MESSAGE_HEADER]; + delete headers["upgrade"]; + + const extraInfo: MessageExtraInfo = { + requestInfo: { headers } + }; + + const error = await agent.onSSEMcpMessage( + sessionId, + messageBody, + extraInfo + ); if (error) { return new Response(error.message, { From 3f1e4f40b5b2047f18358693ce76bf2e7af68eaf Mon Sep 17 00:00:00 2001 From: Naseem AlNaji Date: Fri, 21 Nov 2025 14:01:14 -0500 Subject: [PATCH 9/9] fix: ts check errors --- packages/agents/src/mcp/utils.ts | 2 +- .../src/tests/mcp/transports/sse.test.ts | 2 +- .../mcp/transports/streamable-http.test.ts | 9 ++-- packages/agents/src/tests/worker.ts | 54 ++++++++++++------- 4 files changed, 44 insertions(+), 23 deletions(-) diff --git a/packages/agents/src/mcp/utils.ts b/packages/agents/src/mcp/utils.ts index 2d3205d7..5fd40b2d 100644 --- a/packages/agents/src/mcp/utils.ts +++ b/packages/agents/src/mcp/utils.ts @@ -690,7 +690,7 @@ export const createLegacySseHandler = ( // Remove internal headers that are not part of the original request delete headers[MCP_HTTP_METHOD_HEADER]; delete headers[MCP_MESSAGE_HEADER]; - delete headers["upgrade"]; + delete headers.upgrade; const extraInfo: MessageExtraInfo = { requestInfo: { headers } diff --git a/packages/agents/src/tests/mcp/transports/sse.test.ts b/packages/agents/src/tests/mcp/transports/sse.test.ts index 7b0f4715..bca5d24e 100644 --- a/packages/agents/src/tests/mcp/transports/sse.test.ts +++ b/packages/agents/src/tests/mcp/transports/sse.test.ts @@ -181,7 +181,7 @@ describe("SSE Transport", () => { // The transport filters cf-mcp-method, cf-mcp-message, and upgrade headers expect(echoedData.headers["cf-mcp-method"]).toBeUndefined(); expect(echoedData.headers["cf-mcp-message"]).toBeUndefined(); - expect(echoedData.headers["upgrade"]).toBeUndefined(); + expect(echoedData.headers.upgrade).toBeUndefined(); // Verify standard headers are also present expect(echoedData.headers["content-type"]).toBe("application/json"); diff --git a/packages/agents/src/tests/mcp/transports/streamable-http.test.ts b/packages/agents/src/tests/mcp/transports/streamable-http.test.ts index 63f80214..d48ee8d0 100644 --- a/packages/agents/src/tests/mcp/transports/streamable-http.test.ts +++ b/packages/agents/src/tests/mcp/transports/streamable-http.test.ts @@ -623,7 +623,10 @@ describe("Streamable HTTP Transport", () => { // Extract the echoed request info const result = parsed.result as CallToolResult; - const echoedData = JSON.parse(result.content?.[0]?.text || "{}"); + const contentText = result.content?.[0]?.text; + const echoedData = JSON.parse( + typeof contentText === "string" ? contentText : "{}" + ); // Verify custom headers were passed through expect(echoedData.hasRequestInfo).toBe(true); @@ -635,10 +638,10 @@ describe("Streamable HTTP Transport", () => { // The transport adds cf-mcp-method and cf-mcp-message internally but should filter them expect(echoedData.headers["cf-mcp-method"]).toBeUndefined(); expect(echoedData.headers["cf-mcp-message"]).toBeUndefined(); - expect(echoedData.headers["upgrade"]).toBeUndefined(); + expect(echoedData.headers.upgrade).toBeUndefined(); // Verify standard headers are also present - expect(echoedData.headers["accept"]).toContain("text/event-stream"); + expect(echoedData.headers.accept).toContain("text/event-stream"); expect(echoedData.headers["content-type"]).toBe("application/json"); // Verify sessionId is passed through extra data diff --git a/packages/agents/src/tests/worker.ts b/packages/agents/src/tests/worker.ts index e0ad4b2d..b1709ce3 100644 --- a/packages/agents/src/tests/worker.ts +++ b/packages/agents/src/tests/worker.ts @@ -1,5 +1,11 @@ import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; -import type { CallToolResult } from "@modelcontextprotocol/sdk/types.js"; +import type { RequestHandlerExtra } from "@modelcontextprotocol/sdk/shared/protocol.js"; +import type { + CallToolResult, + IsomorphicHeaders, + ServerNotification, + ServerRequest +} from "@modelcontextprotocol/sdk/types.js"; import { z } from "zod"; import { McpAgent } from "../mcp/index.ts"; import { @@ -39,6 +45,19 @@ type Props = { testValue: string; }; +type ToolExtraInfo = RequestHandlerExtra; + +type EchoResponseData = { + headers: IsomorphicHeaders; + authInfo: ToolExtraInfo["authInfo"] | null; + hasRequestInfo: boolean; + hasAuthInfo: boolean; + requestId: ToolExtraInfo["requestId"]; + sessionId: string | null; + availableExtraKeys: string[]; + [key: string]: unknown; +}; + export class TestMcpAgent extends McpAgent { observability = undefined; private tempToolHandle?: { remove: () => void }; @@ -175,27 +194,26 @@ export class TestMcpAgent extends McpAgent { "echoRequestInfo", "Echo back request headers and auth info", {}, - async (_args, extra): Promise => { + async (_args, extra: ToolExtraInfo): Promise => { // Extract headers from requestInfo, auth from authInfo - const headers = extra?.requestInfo?.headers || {}; - const authInfo = extra?.authInfo || null; + const headers: IsomorphicHeaders = extra.requestInfo?.headers ?? {}; + const authInfo = extra.authInfo ?? null; - // Get all properties from extra, excluding functions - const extraKeys = extra - ? Object.keys(extra).filter( - (key) => typeof extra[key as keyof typeof extra] !== "function" - ) - : []; + // Track non-function properties available in extra + const extraRecord = extra as Record; + const extraKeys = Object.keys(extraRecord).filter( + (key) => typeof extraRecord[key] !== "function" + ); // Build response object with all available data - const responseData: Record = { - headers: headers, - authInfo: authInfo, - hasRequestInfo: !!extra?.requestInfo, - hasAuthInfo: !!extra?.authInfo, - requestId: extra?.requestId, + const responseData: EchoResponseData = { + headers, + authInfo, + hasRequestInfo: !!extra.requestInfo, + hasAuthInfo: !!extra.authInfo, + requestId: extra.requestId, // Include any sessionId if it exists - sessionId: (extra as any)?.sessionId || null, + sessionId: extra.sessionId ?? null, // List all available properties in extra availableExtraKeys: extraKeys }; @@ -205,7 +223,7 @@ export class TestMcpAgent extends McpAgent { if ( !["requestInfo", "authInfo", "requestId", "sessionId"].includes(key) ) { - responseData[`extra_${key}`] = (extra as any)[key]; + responseData[`extra_${key}`] = extraRecord[key]; } });