Skip to content

Add streamable HTTP python example #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ List of modules:
| [Server Client MCP/SSE in Bedrock Converse Client w/ pgVector RAG](./modules/spring-ai-java-bedrock-mcp-rag/) | Spring AI & Java | A Spring AI dog adoption agent built on Bedrock using PostgreSQL with pgvector for RAG, and an MCP Server for managing adoption appointments. |
| [Server MCP/SSE on ECS](./modules/spring-ai-mcp-server-ecs/) | Spring AI & Kotlin | Very basic Spring AI MCP Server over SSE running on ECS. |
| [MCP/SSE Server - FastAPI Client with Anthropic Bedrock](./modules/anthropic-bedrock-python-ecs-mcp/) | Python | An MCP SSE server with a FastAPI client that leverages Anthropic Bedrock. The sample runs on ECS Fargate with public access through an Application Load Balancer. |
| [MCP/StreamableHTTP Server - Client w/ Anthropic Bedrock](./modules/lambda-streamable-http-mcp/) | Python | An MCP server leveraging Streamable HTTP on AWS Lambda with IAM auth and a function URL |

## Security

Expand Down
65 changes: 65 additions & 0 deletions modules/lambda-streamable-http-mcp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Serverless MCP with AWS Lambda

This project demonstrates how to deploy a Model Context Protocol (MCP) server as a serverless function using AWS Lambda with Function URLs. It uses the new Streamable HTTP protocol and AWS IAM authentication for security.

## Architecture
- Lambda Function with AWS Lambda Web Adapter
- AWS IAM Authentication

## Prerequisites
- AWS CLI configured with appropriate permissions
- Python 3.11+
- boto3
- An existing Lambda layer containing MCP and its dependencies
- Lambda Web Adapter layer ARN

## Step 1: Prepare Lambda Layer

Ensure you have created a Lambda layer containing MCP and its dependencies. You should have the ARN of this layer ready for the deployment script.

## Step 2: Deploy Lambda Function

1. Update `deployment.py` with your MCP layer ARN and the correct Lambda Web Adapter layer ARN for your region.
```python
# Update this lines with your layer ARNs
lambda_web_adapter_layer = "arn:aws:lambda:us-east-1:XXXXXXXX:layer"
```

2. Deploy the function:
```bash
python deployment.py
```

3. Save the Function URL from the output.

## Step 3: Test with MCP Client

Within `client.py`:
```python
# Replace with your Lambda Function URL
FUNCTION_URL = os.getenv('FUNCTION_URL')
```

Update `FUNCTION_URL` within your Lambda Function URL and run:
```bash
python client.py
```

## Security
- Function URL uses AWS IAM authentication
- All requests must be signed with SigV4
- Lambda execution role has minimal required permissions

## Key Features
- Serverless MCP implementation
- Streamable HTTP protocol support
- AWS IAM authentication
- Stateless operation

## Troubleshooting
- Check Lambda CloudWatch logs for execution issues
- Verify IAM permissions for both function and client
- Ensure AWS credentials are properly configured
- Check Layer ARNs are correct in deployment script

This setup provides a secure, scalable, and cost-effective way to deploy MCP servers using AWS Lambda.
145 changes: 145 additions & 0 deletions modules/lambda-streamable-http-mcp/app/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from mcp.shared.message import SessionMessage
from mcp.types import JSONRPCMessage, JSONRPCRequest
import asyncio
import boto3
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
import json
import httpx
from anthropic import AnthropicBedrock
from dotenv import load_dotenv
import os

load_dotenv()

FUNCTION_URL = os.getenv('FUNCTION_URL')

class LambdaMCPClient:
def __init__(self, func_url: str):
self.func_url = func_url
self.session = boto3.Session()
self.credentials = self.session.get_credentials()
self.region = 'us-east-1'
self.anthropic = AnthropicBedrock()
self.headers = {
"Accept": "application/json, text/event-stream",
"Content-Type": "application/json"
}


async def send_to_lambda(self, message: JSONRPCMessage):
request = AWSRequest(
method='POST',
url=self.func_url,
data=message.model_dump_json(),
headers=self.headers
)
SigV4Auth(self.credentials, "lambda", self.region).add_auth(request)

async with httpx.AsyncClient() as client:
response = await client.post(
request.url,
content=request.data,
headers=dict(request.headers)
)
lambda_response = response.json()
if 'body' in lambda_response:
return json.loads(lambda_response['body'])
else:
raise ValueError(f"Unexpected response format: {lambda_response}")

async def list_tools(self):
"""Get available tools from the MCP server"""
list_tools_msg = JSONRPCMessage(
JSONRPCRequest(
jsonrpc="2.0",
method="tools/list",
id=1
)
)
response = await self.send_to_lambda(list_tools_msg)
return response['result']['tools']

async def call_tool(self, name: str, arguments: dict):
"""Call a specific tool with arguments"""
call_tool_msg = JSONRPCMessage(
JSONRPCRequest(
jsonrpc="2.0",
method="tools/call",
params={
"name": name,
"arguments": arguments
},
id=2
)
)
response = await self.send_to_lambda(call_tool_msg)
return response['result']

async def process_query(self, query: str) -> str:
"""Process a query using Claude and available tools"""
messages = [{"role": "user", "content": query}]
final_text = []
tools = await self.list_tools()

available_tools = [
{
"name": tool['name'],
"description": tool['description'],
"input_schema": tool['inputSchema']
}
for tool in tools
]

while True:
# Get response from Claude
response = self.anthropic.messages.create(
model="us.anthropic.claude-3-5-sonnet-20241022-v2:0",
max_tokens=1000,
messages=messages,
tools=available_tools,
)

for content in response.content:
if content.type == "text":
final_text.append(content.text)
elif content.type == "tool_use":
tool_name = content.name
tool_args = content.input

# Call the tool
result = await self.call_tool(tool_name, tool_args)
final_text.append(f"[Tool Result: {result}]")

# Add the tool result to the conversation
messages.append({
"role": "assistant",
"content": response.content[0].text
})
messages.append({
"role": "user",
"content": f"Tool result: {result}"
})

# Continue the conversation
continue
break

return "\n".join(final_text)


async def main():
func_url = FUNCTION_URL
client = LambdaMCPClient(func_url)

# List available tools
tools = await client.list_tools()
print("Available tools:", tools)

# Test the model with a query
query = "Please greet someone named Bob using the greeting tool"
result = await client.process_query(query)
print("\nQuery result:", result)

if __name__ == "__main__":
asyncio.run(main())
132 changes: 132 additions & 0 deletions modules/lambda-streamable-http-mcp/app/deployment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import boto3
import json
import io
import zipfile

def create_lambda_function():
lambda_client = boto3.client('lambda')
iam = boto3.client('iam')

function_name = 'mcp-lambda'
role_name = 'mcp-lambda-role'
lambda_handler = 'server.lambda_handler'

# Create IAM role for Lambda
assume_role_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole"
}
]
}

try:
role = iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(assume_role_policy)
)
except iam.exceptions.EntityAlreadyExistsException:
role = iam.get_role(RoleName=role_name)

# Attach necessary policies
iam.attach_role_policy(
RoleName=role_name,
PolicyArn='arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
)

# Create custom policy for Bedrock
bedrock_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"bedrock:InvokeModelWithResponseStream",
"bedrock:InvokeModel"
],
"Resource": "*"
}
]
}

try:
policy = iam.create_policy(
PolicyName='mcp-lambda-bedrock-policy',
PolicyDocument=json.dumps(bedrock_policy)
)
iam.attach_role_policy(
RoleName=role_name,
PolicyArn=policy['Policy']['Arn']
)
except iam.exceptions.EntityAlreadyExistsException:
pass

# Create deployment package
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w') as zip_file:
zip_file.write('server.py')

# Wait for role to be ready
import time
time.sleep(10)

# Lambda Web Adapter Layer ARN (replace with the correct ARN for your region)
lambda_web_adapter_layer = "YOUR_LAMBDA_LAYER_HERE"

try:
# Create Lambda function
response = lambda_client.create_function(
FunctionName=function_name,
Runtime='python3.12',
Role=role['Role']['Arn'],
Handler=lambda_handler,
Code={'ZipFile': zip_buffer.getvalue()},
MemorySize=1024,
Timeout=900,
Environment={
'Variables': {
'AWS_LWA_INVOKE_MODE': 'RESPONSE_STREAM',
'PORT': '8080'
}
},
Layers=[lambda_web_adapter_layer]
)
except lambda_client.exceptions.ResourceConflictException:
response = lambda_client.update_function_code(
FunctionName=function_name,
ZipFile=zip_buffer.getvalue()
)
lambda_client.update_function_configuration(
FunctionName=function_name,
Environment={
'Variables': {
'AWS_LWA_INVOKE_MODE': 'RESPONSE_STREAM',
'PORT': '8080'
}
},
Layers=[lambda_web_adapter_layer]
)

# Create or update function URL
try:
url_config = lambda_client.create_function_url_config(
FunctionName=function_name,
AuthType='AWS_IAM',
InvokeMode='RESPONSE_STREAM'
)
except lambda_client.exceptions.ResourceConflictException:
url_config = lambda_client.update_function_url_config(
FunctionName=function_name,
AuthType='AWS_IAM',
InvokeMode='RESPONSE_STREAM'
)

print(f"Function URL: {url_config['FunctionUrl']}")
return url_config['FunctionUrl']

if __name__ == "__main__":
function_url = create_lambda_function()
print(f"\nDeployment complete. Function URL: {function_url}")
Loading