-
Notifications
You must be signed in to change notification settings - Fork 1.4k
StreamableHttp client transport #573
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
refactoring after implementation of all the features: #595 |
timeout: timedelta = timedelta(seconds=30), | ||
sse_read_timeout: timedelta = timedelta(seconds=60 * 5), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
async with httpx.AsyncClient( | ||
headers=request_headers, | ||
timeout=httpx.Timeout(timeout.seconds, read=sse_read_timeout.seconds), | ||
follow_redirects=True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are constructing AsyncClient everywhere in the codebase but with various different options. I think it makes sense if we just have factory function that creates an async cleint with correct default values that we want everywhere, like follow_redirects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added to follow ups
headers: dict[str, Any] | None = None, | ||
timeout: timedelta = timedelta(seconds=30), | ||
sse_read_timeout: timedelta = timedelta(seconds=60 * 5), | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should have a return value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe i missed this, but all the other transport implementation don't return a terminate_callback. Should we unify this so that the interface is the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
streamable http is quite unique here as we have a specific DELETE request to close the session, none of the other transports have it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding return value in #595
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for termination, unlike other transports, streamable http does not close session when client disconnects, it requires explicit termination by DELETE request. Session has no idea what delete request is, hence needed to have a callback to transport.
This has it's drawbacks, like users need to know that they need to terminate the session. Alternative can be that we pass a parameter terminate_session_on_exit which defaults to true. In this way if someone wants to have benefits of resuming a long running session later, they can, they just need to set a parameter to False, something like
async with streamablehttp_client(url, terminate_session=False) as (
) as client: | ||
tg.start_soon(post_writer, client) | ||
try: | ||
yield read_stream, write_stream, terminate_session |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure i understand why we want terminate session as a callback. We are using a context manager, shouldn't we always be able to just terminate_session if it still exists after we yielded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need this so that we can have a method on a client to delete/terminate the mcp-session
if content_type.startswith(CONTENT_TYPE_JSON): | ||
try: | ||
content = await response.aread() | ||
json_message = JSONRPCMessage.model_validate_json( | ||
content | ||
) | ||
await read_stream_writer.send(json_message) | ||
except Exception as exc: | ||
logger.error(f"Error parsing JSON response: {exc}") | ||
await read_stream_writer.send(exc) | ||
|
||
elif content_type.startswith(CONTENT_TYPE_SSE): | ||
# Parse SSE events from the response | ||
try: | ||
event_source = EventSource(response) | ||
async for sse in event_source.aiter_sse(): | ||
if sse.event == "message": | ||
try: | ||
await read_stream_writer.send( | ||
JSONRPCMessage.model_validate_json( | ||
sse.data | ||
) | ||
) | ||
except Exception as exc: | ||
logger.exception("Error parsing message") | ||
await read_stream_writer.send(exc) | ||
else: | ||
logger.warning(f"Unknown event: {sse.event}") | ||
|
||
except Exception as e: | ||
logger.exception("Error reading SSE stream:") | ||
await read_stream_writer.send(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would feel more intuitive for me if these branches wouldn't be inlined but separate handler functions for non-streamed vs streamed responses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, sorry, refactoring on top of the stack
@ihrpr Thanks for this. When are we planning to release this? I see that this PR has been merged but not released yet. |
I am also curious when the maintainers think they'll make a release. If they think it may be a while, I'll gladly try working off of main |
the release is planned end of this week/early next week |
Adding support for Streamable Http client transport
Follow ups
Stacked on top of #561