-
Notifications
You must be signed in to change notification settings - Fork 1.5k
StreamableHttp - Server transport with state management #553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
2b95598
3d790f8
27bc01e
3c4cf10
bce74b3
2011579
2cebf08
6c9c320
ede8cde
2a3bed8
0456b1b
97ca48d
92d4287
aa9f6e5
46ec72d
9b096dc
bbe79c2
a0a9c5b
a5ac2e0
2e615f3
ff70bd6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
# MCP Simple StreamableHttp Server Example | ||
|
||
A simple MCP server example demonstrating the StreamableHttp transport, which enables HTTP-based communication with MCP servers using streaming. | ||
|
||
## Features | ||
|
||
- Uses the StreamableHTTP transport for server-client communication | ||
- Task management with anyio task groups | ||
- Ability to send multiple notifications over time to the client | ||
- Proper resource cleanup and lifespan management | ||
|
||
## Usage | ||
|
||
Start the server on the default or custom port: | ||
|
||
```bash | ||
|
||
# Using custom port | ||
uv run mcp-simple-streamablehttp --port 3000 | ||
|
||
# Custom logging level | ||
uv run mcp-simple-streamablehttp --log-level DEBUG | ||
|
||
# Enable JSON responses instead of SSE streams | ||
uv run mcp-simple-streamablehttp --json-response | ||
``` | ||
|
||
The server exposes a tool named "start-notification-stream" that accepts three arguments: | ||
|
||
- `interval`: Time between notifications in seconds (e.g., 1.0) | ||
- `count`: Number of notifications to send (e.g., 5) | ||
- `caller`: Identifier string for the caller | ||
|
||
## Client | ||
|
||
You can connect to this server using an HTTP client, for now only Typescript SDK has streamable HTTP client examples or you can use (Inspector)[https://github.com/modelcontextprotocol/inspector] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from .server import main | ||
|
||
if __name__ == "__main__": | ||
main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,196 @@ | ||
import contextlib | ||
import logging | ||
from http import HTTPStatus | ||
from uuid import uuid4 | ||
|
||
import anyio | ||
import click | ||
import mcp.types as types | ||
from mcp.server.lowlevel import Server | ||
from mcp.server.streamableHttp import ( | ||
MCP_SESSION_ID_HEADER, | ||
StreamableHTTPServerTransport, | ||
) | ||
Comment on lines
+10
to
+13
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's call this |
||
from starlette.applications import Starlette | ||
from starlette.requests import Request | ||
from starlette.responses import Response | ||
from starlette.routing import Mount | ||
|
||
# Configure logging | ||
logger = logging.getLogger(__name__) | ||
|
||
# Global task group that will be initialized in the lifespan | ||
task_group = None | ||
|
||
|
||
@contextlib.asynccontextmanager | ||
async def lifespan(app): | ||
"""Application lifespan context manager for managing task group.""" | ||
global task_group | ||
|
||
async with anyio.create_task_group() as tg: | ||
task_group = tg | ||
logger.info("Application started, task group initialized!") | ||
try: | ||
yield | ||
finally: | ||
logger.info("Application shutting down, cleaning up resources...") | ||
if task_group: | ||
tg.cancel_scope.cancel() | ||
task_group = None | ||
logger.info("Resources cleaned up successfully.") | ||
|
||
|
||
@click.command() | ||
@click.option("--port", default=3000, help="Port to listen on for HTTP") | ||
@click.option( | ||
"--log-level", | ||
default="INFO", | ||
help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)", | ||
) | ||
@click.option( | ||
"--json-response", | ||
is_flag=True, | ||
default=False, | ||
help="Enable JSON responses instead of SSE streams", | ||
) | ||
def main( | ||
port: int, | ||
log_level: str, | ||
json_response: bool, | ||
) -> int: | ||
# Configure logging | ||
logging.basicConfig( | ||
level=getattr(logging, log_level.upper()), | ||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | ||
) | ||
|
||
app = Server("mcp-streamable-http-demo") | ||
|
||
@app.call_tool() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit, also could be a follow up] Would be nice to use fastmcp here - i usually end up going: server = FastMCP(...)
...
server._mcp_server # <- to access the lower level mcp interface But perhaps we should just make it easier to get to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I was quite confused here, we have two folders for servers: fastmcp and servers. I was planning to add fastmcp exaple as a follow up. fastmcp might also help with |
||
async def call_tool( | ||
name: str, arguments: dict | ||
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: | ||
ctx = app.request_context | ||
interval = arguments.get("interval", 1.0) | ||
count = arguments.get("count", 5) | ||
caller = arguments.get("caller", "unknown") | ||
|
||
# Send the specified number of notifications with the given interval | ||
for i in range(count): | ||
await ctx.session.send_log_message( | ||
level="info", | ||
data=f"Notification {i+1}/{count} from caller: {caller}", | ||
logger="notification_stream", | ||
related_request_id=ctx.request_id, | ||
) | ||
if i < count - 1: # Don't wait after the last notification | ||
await anyio.sleep(interval) | ||
|
||
return [ | ||
types.TextContent( | ||
type="text", | ||
text=( | ||
f"Sent {count} notifications with {interval}s interval" | ||
f" for caller: {caller}" | ||
), | ||
) | ||
] | ||
|
||
@app.list_tools() | ||
async def list_tools() -> list[types.Tool]: | ||
return [ | ||
types.Tool( | ||
name="start-notification-stream", | ||
description=( | ||
"Sends a stream of notifications with configurable count" | ||
" and interval" | ||
), | ||
inputSchema={ | ||
"type": "object", | ||
"required": ["interval", "count", "caller"], | ||
"properties": { | ||
"interval": { | ||
"type": "number", | ||
"description": "Interval between notifications in seconds", | ||
}, | ||
"count": { | ||
"type": "number", | ||
"description": "Number of notifications to send", | ||
}, | ||
"caller": { | ||
"type": "string", | ||
"description": ( | ||
"Identifier of the caller to include in notifications" | ||
), | ||
}, | ||
}, | ||
}, | ||
) | ||
] | ||
|
||
# We need to store the server instances between requests | ||
server_instances = {} | ||
# Lock to prevent race conditions when creating new sessions | ||
session_creation_lock = anyio.Lock() | ||
|
||
# ASGI handler for streamable HTTP connections | ||
async def handle_streamable_http(scope, receive, send): | ||
request = Request(scope, receive) | ||
request_mcp_session_id = request.headers.get(MCP_SESSION_ID_HEADER) | ||
if ( | ||
request_mcp_session_id is not None | ||
and request_mcp_session_id in server_instances | ||
): | ||
transport = server_instances[request_mcp_session_id] | ||
logger.debug("Session already exists, handling request directly") | ||
await transport.handle_request(scope, receive, send) | ||
elif request_mcp_session_id is None: | ||
# try to establish new session | ||
logger.debug("Creating new transport") | ||
# Use lock to prevent race conditions when creating new sessions | ||
async with session_creation_lock: | ||
new_session_id = uuid4().hex | ||
http_transport = StreamableHTTPServerTransport( | ||
mcp_session_id=new_session_id, | ||
is_json_response_enabled=json_response, | ||
) | ||
server_instances[http_transport.mcp_session_id] = http_transport | ||
async with http_transport.connect() as streams: | ||
read_stream, write_stream = streams | ||
|
||
async def run_server(): | ||
await app.run( | ||
read_stream, | ||
write_stream, | ||
app.create_initialization_options(), | ||
) | ||
|
||
if not task_group: | ||
raise RuntimeError("Task group is not initialized") | ||
|
||
task_group.start_soon(run_server) | ||
|
||
# Handle the HTTP request and return the response | ||
await http_transport.handle_request(scope, receive, send) | ||
else: | ||
response = Response( | ||
"Bad Request: No valid session ID provided", | ||
status_code=HTTPStatus.BAD_REQUEST, | ||
) | ||
await response(scope, receive, send) | ||
Comment on lines
+143
to
+186
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels awfully complex as a lowlevel API, particularly if every MCP Server using the lowlevel API would have to go and use something like this. I wonder if we can provide a more high-level abstraction for this, that makes it more managable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, we can make it happen, will add as a separate PR |
||
|
||
# Create an ASGI application using the transport | ||
starlette_app = Starlette( | ||
debug=True, | ||
routes=[ | ||
Mount("/mcp", app=handle_streamable_http), | ||
], | ||
lifespan=lifespan, | ||
) | ||
|
||
import uvicorn | ||
|
||
uvicorn.run(starlette_app, host="0.0.0.0", port=port) | ||
|
||
return 0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
[project] | ||
name = "mcp-simple-streamablehttp" | ||
version = "0.1.0" | ||
description = "A simple MCP server exposing a StreamableHttp transport for testing" | ||
readme = "README.md" | ||
requires-python = ">=3.10" | ||
authors = [{ name = "Anthropic, PBC." }] | ||
keywords = ["mcp", "llm", "automation", "web", "fetch", "http", "streamable"] | ||
license = { text = "MIT" } | ||
dependencies = ["anyio>=4.5", "click>=8.1.0", "httpx>=0.27", "mcp", "starlette", "uvicorn"] | ||
|
||
[project.scripts] | ||
mcp-simple-streamablehttp = "mcp_simple_streamablehttp.server:main" | ||
|
||
[build-system] | ||
requires = ["hatchling"] | ||
build-backend = "hatchling.build" | ||
|
||
[tool.hatch.build.targets.wheel] | ||
packages = ["mcp_simple_streamablehttp"] | ||
|
||
[tool.pyright] | ||
include = ["mcp_simple_streamablehttp"] | ||
venvPath = "." | ||
venv = ".venv" | ||
|
||
[tool.ruff.lint] | ||
select = ["E", "F", "I"] | ||
ignore = [] | ||
|
||
[tool.ruff] | ||
line-length = 88 | ||
target-version = "py310" | ||
|
||
[tool.uv] | ||
dev-dependencies = ["pyright>=1.1.378", "pytest>=8.3.3", "ruff>=0.6.9"] |
Uh oh!
There was an error while loading. Please reload this page.