Skip to content

StreamableHttp transport - session management #345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions src/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,77 @@ describe("StreamableHTTPClientTransport", () => {
expect(lastCall[1].headers.get("mcp-session-id")).toBe("test-session-id");
});

it("should terminate session with DELETE request", async () => {
// First, simulate getting a session ID
const message: JSONRPCMessage = {
jsonrpc: "2.0",
method: "initialize",
params: {
clientInfo: { name: "test-client", version: "1.0" },
protocolVersion: "2025-03-26"
},
id: "init-id"
};

(global.fetch as jest.Mock).mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ "content-type": "text/event-stream", "mcp-session-id": "test-session-id" }),
});

await transport.send(message);
expect(transport.sessionId).toBe("test-session-id");

// Now terminate the session
(global.fetch as jest.Mock).mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers()
});

await transport.terminateSession();

// Verify the DELETE request was sent with the session ID
const calls = (global.fetch as jest.Mock).mock.calls;
const lastCall = calls[calls.length - 1];
expect(lastCall[1].method).toBe("DELETE");
expect(lastCall[1].headers.get("mcp-session-id")).toBe("test-session-id");

// The session ID should be cleared after successful termination
expect(transport.sessionId).toBeUndefined();
});

it("should handle 405 response when server doesn't support session termination", async () => {
// First, simulate getting a session ID
const message: JSONRPCMessage = {
jsonrpc: "2.0",
method: "initialize",
params: {
clientInfo: { name: "test-client", version: "1.0" },
protocolVersion: "2025-03-26"
},
id: "init-id"
};

(global.fetch as jest.Mock).mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ "content-type": "text/event-stream", "mcp-session-id": "test-session-id" }),
});

await transport.send(message);

// Now terminate the session, but server responds with 405
(global.fetch as jest.Mock).mockResolvedValueOnce({
ok: false,
status: 405,
statusText: "Method Not Allowed",
headers: new Headers()
});

await expect(transport.terminateSession()).resolves.not.toThrow();
});

it("should handle 404 response when session expires", async () => {
const message: JSONRPCMessage = {
jsonrpc: "2.0",
Expand Down
48 changes: 46 additions & 2 deletions src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Transport } from "../shared/transport.js";
import { isJSONRPCNotification, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
import { isInitializedNotification, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js";
import { EventSourceParserStream } from "eventsource-parser/stream";

Expand Down Expand Up @@ -420,7 +420,7 @@ export class StreamableHTTPClientTransport implements Transport {
if (response.status === 202) {
// if the accepted notification is initialized, we start the SSE stream
// if it's supported by the server
if (isJSONRPCNotification(message) && message.method === "notifications/initialized") {
if (isInitializedNotification(message)) {
// Start without a lastEventId since this is a fresh connection
this._startOrAuthSse({ resumptionToken: undefined }).catch(err => this.onerror?.(err));
}
Expand Down Expand Up @@ -467,4 +467,48 @@ export class StreamableHTTPClientTransport implements Transport {
get sessionId(): string | undefined {
return this._sessionId;
}

/**
* Terminates the current session by sending a DELETE request to the server.
*
* Clients that no longer need a particular session
* (e.g., because the user is leaving the client application) SHOULD send an
* HTTP DELETE to the MCP endpoint with the Mcp-Session-Id header to explicitly
* terminate the session.
*
* The server MAY respond with HTTP 405 Method Not Allowed, indicating that
* the server does not allow clients to terminate sessions.
*/
async terminateSession(): Promise<void> {
if (!this._sessionId) {
return; // No session to terminate
}

try {
const headers = await this._commonHeaders();

const init = {
...this._requestInit,
method: "DELETE",
headers,
signal: this._abortController?.signal,
};

const response = await fetch(this._url, init);

// We specifically handle 405 as a valid response according to the spec,
// meaning the server does not support explicit session termination
if (!response.ok && response.status !== 405) {
throw new StreamableHTTPError(
response.status,
`Failed to terminate session: ${response.statusText}`
);
}

this._sessionId = undefined;
} catch (error) {
this.onerror?.(error as Error);
throw error;
}
}
}
48 changes: 47 additions & 1 deletion src/examples/client/simpleStreamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ function printHelp(): void {
console.log('\nAvailable commands:');
console.log(' connect [url] - Connect to MCP server (default: http://localhost:3000/mcp)');
console.log(' disconnect - Disconnect from server');
console.log(' terminate-session - Terminate the current session');
console.log(' reconnect - Reconnect to the server');
console.log(' list-tools - List available tools');
console.log(' call-tool <name> [args] - Call a tool with optional JSON arguments');
Expand Down Expand Up @@ -76,6 +77,10 @@ function commandLoop(): void {
await disconnect();
break;

case 'terminate-session':
await terminateSession();
break;

case 'reconnect':
await reconnect();
break;
Expand Down Expand Up @@ -249,6 +254,36 @@ async function disconnect(): Promise<void> {
}
}

async function terminateSession(): Promise<void> {
if (!client || !transport) {
console.log('Not connected.');
return;
}

try {
console.log('Terminating session with ID:', transport.sessionId);
await transport.terminateSession();
console.log('Session terminated successfully');

// Check if sessionId was cleared after termination
if (!transport.sessionId) {
console.log('Session ID has been cleared');
sessionId = undefined;

// Also close the transport and clear client objects
await transport.close();
console.log('Transport closed after session termination');
client = null;
transport = null;
} else {
console.log('Server responded with 405 Method Not Allowed (session termination not supported)');
console.log('Session ID is still active:', transport.sessionId);
}
} catch (error) {
console.error('Error terminating session:', error);
}
}

async function reconnect(): Promise<void> {
if (client) {
await disconnect();
Expand Down Expand Up @@ -411,13 +446,24 @@ async function listResources(): Promise<void> {
async function cleanup(): Promise<void> {
if (client && transport) {
try {
// First try to terminate the session gracefully
if (transport.sessionId) {
try {
console.log('Terminating session before exit...');
await transport.terminateSession();
console.log('Session terminated successfully');
} catch (error) {
console.error('Error terminating session:', error);
}
}

// Then close the transport
await transport.close();
} catch (error) {
console.error('Error closing transport:', error);
}
}


process.stdin.setRawMode(false);
readline.close();
console.log('\nGoodbye!');
Expand Down
23 changes: 7 additions & 16 deletions src/examples/server/jsonResponseStreamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { randomUUID } from 'node:crypto';
import { McpServer } from '../../server/mcp.js';
import { StreamableHTTPServerTransport } from '../../server/streamableHttp.js';
import { z } from 'zod';
import { CallToolResult } from '../../types.js';
import { CallToolResult, isInitializeRequest } from '../../types.js';

// Create an MCP server with implementation details
const server = new McpServer({
Expand Down Expand Up @@ -95,18 +95,17 @@ app.post('/mcp', async (req: Request, res: Response) => {
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
enableJsonResponse: true, // Enable JSON response mode
onsessioninitialized: (sessionId) => {
// Store the transport by session ID when session is initialized
// This avoids race conditions where requests might come in before the session is stored
console.log(`Session initialized with ID: ${sessionId}`);
transports[sessionId] = transport;
}
});

// Connect the transport to the MCP server BEFORE handling the request
await server.connect(transport);

// After handling the request, if we get a session ID back, store the transport
await transport.handleRequest(req, res, req.body);

// Store the transport by session ID for future requests
if (transport.sessionId) {
transports[transport.sessionId] = transport;
}
return; // Already handled
} else {
// Invalid request - no session ID or not initialization request
Expand Down Expand Up @@ -145,14 +144,6 @@ app.get('/mcp', async (req: Request, res: Response) => {
res.status(405).set('Allow', 'POST').send('Method Not Allowed');
});

// Helper function to detect initialize requests
function isInitializeRequest(body: unknown): boolean {
if (Array.isArray(body)) {
return body.some(msg => typeof msg === 'object' && msg !== null && 'method' in msg && msg.method === 'initialize');
}
return typeof body === 'object' && body !== null && 'method' in body && body.method === 'initialize';
}

// Start the server
const PORT = 3000;
app.listen(PORT, () => {
Expand Down
62 changes: 48 additions & 14 deletions src/examples/server/simpleStreamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { randomUUID } from 'node:crypto';
import { McpServer } from '../../server/mcp.js';
import { EventStore, StreamableHTTPServerTransport } from '../../server/streamableHttp.js';
import { z } from 'zod';
import { CallToolResult, GetPromptResult, JSONRPCMessage, ReadResourceResult } from '../../types.js';
import { CallToolResult, GetPromptResult, isInitializeRequest, JSONRPCMessage, ReadResourceResult } from '../../types.js';

// Create a simple in-memory EventStore for resumability
class InMemoryEventStore implements EventStore {
Expand Down Expand Up @@ -36,7 +36,7 @@ class InMemoryEventStore implements EventStore {
* Replays events that occurred after a specific event ID
* Implements EventStore.replayEventsAfter
*/
async replayEventsAfter(lastEventId: string,
async replayEventsAfter(lastEventId: string,
{ send }: { send: (eventId: string, message: JSONRPCMessage) => Promise<void> }
): Promise<string> {
if (!lastEventId || !this.events.has(lastEventId)) {
Expand Down Expand Up @@ -247,19 +247,28 @@ app.post('/mcp', async (req: Request, res: Response) => {
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
eventStore, // Enable resumability
onsessioninitialized: (sessionId) => {
// Store the transport by session ID when session is initialized
// This avoids race conditions where requests might come in before the session is stored
console.log(`Session initialized with ID: ${sessionId}`);
transports[sessionId] = transport;
}
});

// Set up onclose handler to clean up transport when closed
transport.onclose = () => {
const sid = transport.sessionId;
if (sid && transports[sid]) {
console.log(`Transport closed for session ${sid}, removing from transports map`);
delete transports[sid];
}
};

// Connect the transport to the MCP server BEFORE handling the request
// so responses can flow back through the same transport
await server.connect(transport);

// After handling the request, if we get a session ID back, store the transport
await transport.handleRequest(req, res, req.body);

// Store the transport by session ID for future requests
if (transport.sessionId) {
transports[transport.sessionId] = transport;
}
return; // Already handled
} else {
// Invalid request - no session ID or not initialization request
Expand Down Expand Up @@ -312,13 +321,26 @@ app.get('/mcp', async (req: Request, res: Response) => {
await transport.handleRequest(req, res);
});

// Helper function to detect initialize requests
function isInitializeRequest(body: unknown): boolean {
if (Array.isArray(body)) {
return body.some(msg => typeof msg === 'object' && msg !== null && 'method' in msg && msg.method === 'initialize');
// Handle DELETE requests for session termination (according to MCP spec)
app.delete('/mcp', async (req: Request, res: Response) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined;
if (!sessionId || !transports[sessionId]) {
res.status(400).send('Invalid or missing session ID');
return;
}
return typeof body === 'object' && body !== null && 'method' in body && body.method === 'initialize';
}

console.log(`Received session termination request for session ${sessionId}`);

try {
const transport = transports[sessionId];
await transport.handleRequest(req, res);
} catch (error) {
console.error('Error handling session termination:', error);
if (!res.headersSent) {
res.status(500).send('Error processing session termination');
}
}
});

// Start the server
const PORT = 3000;
Expand Down Expand Up @@ -351,6 +373,18 @@ app.listen(PORT, () => {
// Handle server shutdown
process.on('SIGINT', async () => {
console.log('Shutting down server...');

// Close all active transports to properly clean up resources
for (const sessionId in transports) {
try {
console.log(`Closing transport for session ${sessionId}`);
await transports[sessionId].close();
delete transports[sessionId];
} catch (error) {
console.error(`Error closing transport for session ${sessionId}:`, error);
}
}
await server.close();
console.log('Server shutdown complete');
process.exit(0);
});
Loading