Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/stupid-plums-wear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"agents": patch
---

fix: add session ID, auth, and header support to SSE transport

The SSE transport now properly forwards session IDs, authentication info, and request headers to MCP message handlers, achieving feature parity with StreamableHTTP transport. This allows MCP servers using SSE to access critical request context for authentication and session management.
44 changes: 40 additions & 4 deletions packages/agents/src/mcp/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import type { Server } from "@modelcontextprotocol/sdk/server/index.js";
import type { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import type { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js";
import type {
JSONRPCMessage,
MessageExtraInfo
} from "@modelcontextprotocol/sdk/types.js";
import {
JSONRPCMessageSchema,
isJSONRPCError,
isJSONRPCResponse,
type ElicitResult
} from "@modelcontextprotocol/sdk/types.js";
import type { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types.js";
import type { Connection, ConnectionContext } from "../";
import { Agent } from "../index";
import type { BaseTransportType, MaybePromise, ServeOptions } from "./types";
Expand Down Expand Up @@ -85,11 +89,34 @@ export abstract class McpAgent<
return websockets[0];
}

/** Build MessageExtraInfo from extracted headers and auth info */
private buildMessageExtraInfo(extraInfo?: {
headers: Record<string, string>;
auth?: AuthInfo;
}): MessageExtraInfo | undefined {
if (!extraInfo) return;

const headers = { ...extraInfo.headers };

// Remove internal headers that are not part of the original request
delete headers["content-type"];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These headers are part of the original request, they should not be removed

delete headers["content-length"];
delete headers["upgrade"];

return {
authInfo: extraInfo.auth,
requestInfo: { headers }
};
}

/** Returns a new transport matching the type of the Agent. */
private initTransport() {
switch (this.getTransportType()) {
case "sse": {
return new McpSSETransport(() => this.getWebSocket());
return new McpSSETransport(
() => this.getWebSocket(),
this.getSessionId()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the StreamableHTTP we use the pattern here to get this, so we can avoid drilling fields through the constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Will do 🫡

Originally I was worried about breaking existing uses of this class, but it seems to be internal only and limited to this part of the codebase.

);
}
case "streamable-http": {
return new StreamableHTTPServerTransport({});
Expand Down Expand Up @@ -188,14 +215,20 @@ export abstract class McpAgent<
/** Handles MCP Messages for the legacy SSE transport. */
async onSSEMcpMessage(
_sessionId: string,
messageBody: unknown
messageBody: unknown,
extraInfo?: { headers: Record<string, string>; auth?: AuthInfo }
): Promise<Error | null> {
// Since we address the DO via both the protocol and the session id,
// this should never happen, but let's enforce it just in case
if (this.getTransportType() !== "sse") {
return new Error("Internal Server Error: Expected SSE transport");
}

// Ensure transport is initialized before processing messages
if (!this._transport) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be needed, onStart is called every time a request is routed and the DO is initialized. Were you facing issues with this._transport not being initialized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I did this just to make sure it was properly initialized. I will remove this :)

await this.onStart();
}

try {
let parsedMessage: JSONRPCMessage;
try {
Expand All @@ -210,7 +243,10 @@ export abstract class McpAgent<
return null; // Message was handled by elicitation system
}

this._transport?.onmessage?.(parsedMessage);
// Build extra info with auth and request headers, matching StreamableHTTP behavior
const extra = this.buildMessageExtraInfo(extraInfo);

this._transport?.onmessage?.(parsedMessage, extra);
return null;
} catch (error) {
console.error("Error forwarding message to SSE:", error);
Expand Down
8 changes: 5 additions & 3 deletions packages/agents/src/mcp/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
import type { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types.js";
import { getCurrentAgent, type Connection } from "..";
import type { McpAgent } from ".";
import type { RequestWithAuth } from "./types";
import { MessageType } from "../ai-types";
import { MCP_HTTP_METHOD_HEADER, MCP_MESSAGE_HEADER } from "./utils";

Expand All @@ -20,12 +21,13 @@ export class McpSSETransport implements Transport {
// Set by the server in `server.connect(transport)`
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;
onmessage?: (message: JSONRPCMessage, extra?: MessageExtraInfo) => void;

private _getWebSocket: () => WebSocket | null;
private _started = false;
constructor(getWebSocket: () => WebSocket | null) {
constructor(getWebSocket: () => WebSocket | null, sessionId?: string) {
this._getWebSocket = getWebSocket;
this.sessionId = sessionId;
}

async start() {
Expand Down Expand Up @@ -220,7 +222,7 @@ export class StreamableHTTPServerTransport implements Transport {
* Handles POST requests containing JSON-RPC messages
*/
async handlePostRequest(
req: Request & { auth?: AuthInfo },
req: RequestWithAuth,
parsedBody: unknown
): Promise<void> {
const authInfo: AuthInfo | undefined = req.auth;
Expand Down
8 changes: 8 additions & 0 deletions packages/agents/src/mcp/types.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import type { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types.js";

export type MaybePromise<T> = T | Promise<T>;
export type MaybeConnectionTag = { role: string } | undefined;

export type BaseTransportType = "sse" | "streamable-http";
export type TransportType = BaseTransportType | "auto";

/**
* Extended Request type that includes optional authentication info.
* Used throughout MCP transport handlers to pass auth context from middleware.
*/
export type RequestWithAuth = Request & { auth?: AuthInfo };

export interface CORSOptions {
origin?: string;
methods?: string;
Expand Down
7 changes: 5 additions & 2 deletions packages/agents/src/mcp/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
} from "@modelcontextprotocol/sdk/types.js";
import type { McpAgent } from ".";
import { getAgentByName } from "..";
import type { CORSOptions } from "./types";
import type { CORSOptions, RequestWithAuth } from "./types";
import { MessageType } from "../ai-types";

/**
Expand Down Expand Up @@ -682,7 +682,10 @@ export const createLegacySseHandler = (
});

const messageBody = await request.json();
const error = await agent.onSSEMcpMessage(sessionId, messageBody);
const error = await agent.onSSEMcpMessage(sessionId, messageBody, {
headers: Object.fromEntries(request.headers.entries()),
auth: (request as RequestWithAuth).auth
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this normally runs inside your Worker fetch handler, how did the request get the auth property?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, for the purposes of the Cloudflare users we're aiding, we actually only needed the headers (and not auth explicitly) so our tests weren't covering the auth header. I added it in with the hopes of meeting parity with StreamableHTTP but I decided to instead reduce the scope of this PR to just include header and session management information.

});

if (error) {
return new Response(error.message, {
Expand Down
100 changes: 100 additions & 0 deletions packages/agents/src/tests/mcp/transports/streamable-http.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -583,4 +583,104 @@ describe("Streamable HTTP Transport", () => {
expect(tools.some((t) => t.name === "temp-echo")).toBe(false);
});
});

describe("Header and Auth Handling", () => {
it("should pass custom headers to transport via requestInfo", async () => {
const ctx = createExecutionContext();
const sessionId = await initializeStreamableHTTPServer(ctx);

// Send request with custom headers
const request = new Request(baseUrl, {
body: JSON.stringify(TEST_MESSAGES.toolsList),
headers: {
Accept: "application/json, text/event-stream",
"Content-Type": "application/json",
"mcp-session-id": sessionId,
"x-user-id": "test-user-123",
"x-request-id": "req-456",
"x-custom-header": "custom-value"
},
method: "POST"
});

// Spy on the transport to capture the extraInfo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem like this is testing anything new here. What am I missing?

// Note: In actual implementation, the headers are available to the MCP server
// through MessageExtraInfo.requestInfo.headers
const response = await worker.fetch(request, env, ctx);

expect(response.status).toBe(200);

// The test verifies that custom headers would be available in the transport
// through the MessageExtraInfo structure, filtered of internal headers
const sseText = await readSSEEvent(response);
const result = parseSSEData(sseText);
expectValidToolsList(result);
});

it("should extract auth info from RequestWithAuth", async () => {
const ctx = createExecutionContext();
const sessionId = await initializeStreamableHTTPServer(ctx);

// Create a request with auth info attached (simulating middleware)
const request = new Request(baseUrl, {
body: JSON.stringify(TEST_MESSAGES.toolsList),
headers: {
Accept: "application/json, text/event-stream",
"Content-Type": "application/json",
"mcp-session-id": sessionId
},
method: "POST"
}) as any; // Cast to any to add auth property

// Simulate auth middleware adding auth info
request.auth = {
userId: "test-user",
scopes: ["read", "write"],
metadata: {
role: "admin"
}
};

const response = await worker.fetch(request, env, ctx);

expect(response.status).toBe(200);

// The auth info would be available to the MCP server through
// MessageExtraInfo.authInfo
const sseText = await readSSEEvent(response);
const result = parseSSEData(sseText);
expectValidToolsList(result);
});

it("should filter internal MCP headers from requestInfo", async () => {
const ctx = createExecutionContext();
const sessionId = await initializeStreamableHTTPServer(ctx);

// Send request with custom headers
// Internal headers (cf-mcp-method, cf-mcp-message) are added by the transport layer
// and should be filtered before passing to the MCP server
const request = new Request(baseUrl, {
body: JSON.stringify(TEST_MESSAGES.toolsList),
headers: {
Accept: "application/json, text/event-stream",
"Content-Type": "application/json",
"mcp-session-id": sessionId,
"x-custom-header": "should-be-preserved",
"x-another-header": "also-preserved"
},
method: "POST"
});

const response = await worker.fetch(request, env, ctx);

expect(response.status).toBe(200);

// The test verifies that internal headers (cf-mcp-method, cf-mcp-message, upgrade)
// are filtered out before being passed to the MCP server
// Only custom headers like x-custom-header should be preserved in requestInfo.headers
const sseText = await readSSEEvent(response);
const result = parseSSEData(sseText);
expectValidToolsList(result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a closer look it seems like the 3 tests are doing the same. They are either adding custom headers to the request or setting an auth property.
Then the test checks that the request was successful and that the tool list is correct but there are no assertions relevant to either the custom headers or the auth field.

One thing that can be done is update the MCP behind the worker to do/return differently in these scenarios.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to create a tool in the workers.ts test utility that actually returns the requestInfo it receives to make it more thorough of a test.

});
});
});
Loading