Skip to content

StreamableHttp -- resumability support for servers #587

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 57 commits into from
May 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
2b95598
initial streamable http server
ihrpr Apr 20, 2025
3d790f8
add request validation and tests
ihrpr Apr 20, 2025
27bc01e
session management
ihrpr Apr 20, 2025
3c4cf10
terminations of a session
ihrpr Apr 20, 2025
bce74b3
fix cleaning up
ihrpr Apr 20, 2025
2011579
add happy path test
ihrpr Apr 20, 2025
2cebf08
tests
ihrpr Apr 20, 2025
6c9c320
json mode
ihrpr Apr 20, 2025
ede8cde
clean up
ihrpr Apr 21, 2025
2a3bed8
fix example server
ihrpr Apr 21, 2025
0456b1b
return 405 for get stream
ihrpr Apr 21, 2025
97ca48d
speed up tests
ihrpr Apr 21, 2025
f738cbf
stateless implemetation
ihrpr Apr 21, 2025
92d4287
format
ihrpr Apr 21, 2025
aa9f6e5
uv lock
ihrpr Apr 21, 2025
2fba7f3
Merge branch 'ihrpr/streamablehttp-server' into ihrpr/streamablehttp-…
ihrpr Apr 21, 2025
45723ea
simplify readme
ihrpr Apr 21, 2025
6b7a616
clean up
ihrpr Apr 21, 2025
b1be691
get sse
ihrpr Apr 22, 2025
201ec99
uv lock
ihrpr Apr 22, 2025
46ec72d
clean up
ihrpr Apr 22, 2025
1902abb
Merge branch 'ihrpr/streamablehttp-server' into ihrpr/streamablehttp-…
ihrpr Apr 22, 2025
da1df74
Merge branch 'ihrpr/streamablehttp-stateless' into ihrpr/get-sse
ihrpr Apr 22, 2025
c2be5af
streamable http client
ihrpr Apr 23, 2025
9b096dc
add comments to server example where we use related_request_id
ihrpr Apr 23, 2025
bbe79c2
Merge branch 'main' into ihrpr/streamablehttp-server
ihrpr Apr 23, 2025
a0a9c5b
small fixes
ihrpr Apr 23, 2025
a5ac2e0
use mta field for related_request_id
ihrpr Apr 23, 2025
2e615f3
unrelated test and format
ihrpr Apr 23, 2025
110526d
clean up
ihrpr Apr 23, 2025
7ffd5ba
terminate session
ihrpr Apr 23, 2025
029ec56
format
ihrpr Apr 23, 2025
cae32e2
Merge branch 'ihrpr/streamablehttp-server' into ihrpr/streamablehttp-…
ihrpr Apr 25, 2025
58745c7
remove useless sleep
ihrpr Apr 25, 2025
1387929
rename require_initialization to standalone_mode
ihrpr Apr 25, 2025
bccff75
Merge branch 'ihrpr/streamablehttp-stateless' into ihrpr/get-sse
ihrpr Apr 25, 2025
dd007d7
Merge branch 'ihrpr/get-sse' into ihrpr/client
ihrpr Apr 25, 2025
6482120
remove redundant check for initialize and session
ihrpr Apr 25, 2025
9a6da2e
ruff check
ihrpr Apr 25, 2025
b957fad
Merge branch 'ihrpr/get-sse' into ihrpr/client
ihrpr Apr 25, 2025
3f5fd7e
support for resumability - server
ihrpr Apr 25, 2025
b193242
format
ihrpr Apr 25, 2025
6110435
remove print
ihrpr Apr 25, 2025
e087283
rename files to follow python naming
ihrpr Apr 25, 2025
08247c4
update to use time delta in client
ihrpr Apr 25, 2025
0484dfb
refactor
ihrpr Apr 25, 2025
88ff2ba
Merge branch 'ihrpr/client' into ihrpr/resumability-server
ihrpr Apr 25, 2025
5757f6c
small fixes
ihrpr Apr 25, 2025
ee28ad8
improve event store example implementation
ihrpr Apr 25, 2025
5dbddeb
refactor _create_event_data
ihrpr Apr 25, 2025
ff70bd6
Merge branch 'main' into ihrpr/streamablehttp-server
ihrpr May 2, 2025
179fbc8
Merge branch 'ihrpr/streamablehttp-server' into ihrpr/streamablehttp-…
ihrpr May 2, 2025
a979864
Merge branch 'ihrpr/streamablehttp-stateless' into ihrpr/get-sse
ihrpr May 2, 2025
11b7dd9
Merge branch 'ihrpr/get-sse' into ihrpr/client
ihrpr May 2, 2025
67a899c
Merge branch 'ihrpr/client' into ihrpr/resumability-server
ihrpr May 2, 2025
2dda87e
Merge branch 'main' into ihrpr/resumability-server
ihrpr May 2, 2025
2697f14
apply suggested changes
ihrpr May 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion examples/servers/simple-streamablehttp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ A simple MCP server example demonstrating the StreamableHttp transport, which en
- Task management with anyio task groups
- Ability to send multiple notifications over time to the client
- Proper resource cleanup and lifespan management
- Resumability support via InMemoryEventStore

## Usage

Expand All @@ -32,6 +33,23 @@ The server exposes a tool named "start-notification-stream" that accepts three a
- `count`: Number of notifications to send (e.g., 5)
- `caller`: Identifier string for the caller

## Resumability Support

This server includes resumability support through the InMemoryEventStore. This enables clients to:

- Reconnect to the server after a disconnection
- Resume event streaming from where they left off using the Last-Event-ID header


The server will:
- Generate unique event IDs for each SSE message
- Store events in memory for later replay
- Replay missed events when a client reconnects with a Last-Event-ID header

Note: The InMemoryEventStore is designed for demonstration purposes only. For production use, consider implementing a persistent storage solution.



## Client

You can connect to this server using an HTTP client, for now only Typescript SDK has streamable HTTP client examples or you can use (Inspector)[https://github.com/modelcontextprotocol/inspector]
You can connect to this server using an HTTP client, for now only Typescript SDK has streamable HTTP client examples or you can use [Inspector](https://github.com/modelcontextprotocol/inspector)
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""
In-memory event store for demonstrating resumability functionality.

This is a simple implementation intended for examples and testing,
not for production use where a persistent storage solution would be more appropriate.
"""

import logging
from collections import deque
from dataclasses import dataclass
from uuid import uuid4

from mcp.server.streamable_http import (
EventCallback,
EventId,
EventMessage,
EventStore,
StreamId,
)
from mcp.types import JSONRPCMessage

logger = logging.getLogger(__name__)


@dataclass
class EventEntry:
"""
Represents an event entry in the event store.
"""

event_id: EventId
stream_id: StreamId
message: JSONRPCMessage


class InMemoryEventStore(EventStore):
"""
Simple in-memory implementation of the EventStore interface for resumability.
This is primarily intended for examples and testing, not for production use
where a persistent storage solution would be more appropriate.

This implementation keeps only the last N events per stream for memory efficiency.
"""

def __init__(self, max_events_per_stream: int = 100):
"""Initialize the event store.

Args:
max_events_per_stream: Maximum number of events to keep per stream
"""
self.max_events_per_stream = max_events_per_stream
# for maintaining last N events per stream
self.streams: dict[StreamId, deque[EventEntry]] = {}
# event_id -> EventEntry for quick lookup
self.event_index: dict[EventId, EventEntry] = {}

async def store_event(
self, stream_id: StreamId, message: JSONRPCMessage
) -> EventId:
"""Stores an event with a generated event ID."""
event_id = str(uuid4())
event_entry = EventEntry(
event_id=event_id, stream_id=stream_id, message=message
)

# Get or create deque for this stream
if stream_id not in self.streams:
self.streams[stream_id] = deque(maxlen=self.max_events_per_stream)

# If deque is full, the oldest event will be automatically removed
# We need to remove it from the event_index as well
if len(self.streams[stream_id]) == self.max_events_per_stream:
oldest_event = self.streams[stream_id][0]
self.event_index.pop(oldest_event.event_id, None)

# Add new event
self.streams[stream_id].append(event_entry)
self.event_index[event_id] = event_entry

return event_id

async def replay_events_after(
self,
last_event_id: EventId,
send_callback: EventCallback,
) -> StreamId | None:
"""Replays events that occurred after the specified event ID."""
if last_event_id not in self.event_index:
logger.warning(f"Event ID {last_event_id} not found in store")
return None

# Get the stream and find events after the last one
last_event = self.event_index[last_event_id]
stream_id = last_event.stream_id
stream_events = self.streams.get(last_event.stream_id, deque())

# Events in deque are already in chronological order
found_last = False
for event in stream_events:
if found_last:
await send_callback(EventMessage(event.message, event.event_id))
elif event.event_id == last_event_id:
found_last = True

return stream_id
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,24 @@
from starlette.responses import Response
from starlette.routing import Mount

from .event_store import InMemoryEventStore

# Configure logging
logger = logging.getLogger(__name__)

# Global task group that will be initialized in the lifespan
task_group = None

# Event store for resumability
# The InMemoryEventStore enables resumability support for StreamableHTTP transport.
# It stores SSE events with unique IDs, allowing clients to:
# 1. Receive event IDs for each SSE message
# 2. Resume streams by sending Last-Event-ID in GET requests
# 3. Replay missed events after reconnection
# Note: This in-memory implementation is for demonstration ONLY.
# For production, use a persistent storage solution.
event_store = InMemoryEventStore()


@contextlib.asynccontextmanager
async def lifespan(app):
Expand Down Expand Up @@ -79,9 +91,14 @@ async def call_tool(

# Send the specified number of notifications with the given interval
for i in range(count):
# Include more detailed message for resumability demonstration
notification_msg = (
f"[{i+1}/{count}] Event from '{caller}' - "
f"Use Last-Event-ID to resume if disconnected"
)
await ctx.session.send_log_message(
level="info",
data=f"Notification {i+1}/{count} from caller: {caller}",
data=notification_msg,
logger="notification_stream",
# Associates this notification with the original request
# Ensures notifications are sent to the correct response stream
Expand All @@ -90,6 +107,7 @@ async def call_tool(
# - nowhere (if GET request isn't supported)
related_request_id=ctx.request_id,
)
logger.debug(f"Sent notification {i+1}/{count} for caller: {caller}")
if i < count - 1: # Don't wait after the last notification
await anyio.sleep(interval)

Expand Down Expand Up @@ -163,8 +181,10 @@ async def handle_streamable_http(scope, receive, send):
http_transport = StreamableHTTPServerTransport(
mcp_session_id=new_session_id,
is_json_response_enabled=json_response,
event_store=event_store, # Enable resumability
)
server_instances[http_transport.mcp_session_id] = http_transport
logger.info(f"Created new transport with session ID: {new_session_id}")
async with http_transport.connect() as streams:
read_stream, write_stream = streams

Expand Down
Loading
Loading