Model Context Protocol SDK for building MCP servers and clients in Python
—
Transport layer implementations for various connection types including stdio, HTTP, WebSocket, and streaming protocols. The MCP SDK supports multiple transport mechanisms with authentication and security features for different deployment scenarios.
Standard input/output transport for local process communication, ideal for command-line tools and local server execution.
def stdio_client(server: StdioServerParameters) -> AsyncContextManager:
"""
Create a stdio transport client connection.
Parameters:
- server: Stdio server parameters
Returns:
Async context manager yielding (read_stream, write_stream)
"""
def stdio_server() -> AsyncContextManager:
"""
Create a stdio server transport.
Returns:
Async context manager yielding (read_stream, write_stream)
"""
class StdioServerParameters:
def __init__(
self,
command: str | list[str],
args: list[str] | None = None,
env: dict[str, str] | None = None,
cwd: str | None = None,
):
"""
Parameters for stdio server connection.
Parameters:
- command: Server command to execute
- args: Additional command arguments
- env: Environment variables for server process
- cwd: Working directory for server process
"""Real-time bidirectional communication using WebSocket protocol for web applications and real-time scenarios.
def websocket_client(
url: str,
headers: dict[str, str] | None = None,
**kwargs
) -> AsyncContextManager:
"""
Create a WebSocket client connection.
Parameters:
- url: WebSocket server URL (ws:// or wss://)
- headers: HTTP headers for WebSocket handshake
- **kwargs: Additional connection options
Returns:
Async context manager yielding (read_stream, write_stream)
"""
def websocket_server(
host: str = "127.0.0.1",
port: int = 8000,
**kwargs
) -> AsyncContextManager:
"""
Create a WebSocket server.
Parameters:
- host: Server host address
- port: Server port number
- **kwargs: Additional server options
Returns:
Async context manager for server lifecycle
"""HTTP-based transport using Server-Sent Events for web applications requiring unidirectional streaming.
def sse_client(
url: str,
headers: dict[str, str] | None = None,
**kwargs
) -> AsyncContextManager:
"""
Create a Server-Sent Events client connection.
Parameters:
- url: SSE endpoint URL
- headers: HTTP headers for connection
- **kwargs: Additional connection options
Returns:
Async context manager yielding (read_stream, write_stream)
"""
def sse_server(
host: str = "127.0.0.1",
port: int = 8000,
**kwargs
) -> AsyncContextManager:
"""
Create a Server-Sent Events server.
Parameters:
- host: Server host address
- port: Server port number
- **kwargs: Additional server options
Returns:
Async context manager for server lifecycle
"""
class SseServerParameters:
def __init__(
self,
url: str,
headers: dict[str, str] | None = None,
**kwargs
):
"""
Parameters for Server-Sent Events connection.
Parameters:
- url: SSE endpoint URL
- headers: HTTP headers
- **kwargs: Additional options
"""Efficient HTTP streaming connections for high-throughput data transfer scenarios.
def streamablehttp_client(
url: str,
headers: dict[str, str] | None = None,
**kwargs
) -> AsyncContextManager:
"""
Create a streaming HTTP client connection.
Parameters:
- url: HTTP endpoint URL
- headers: HTTP headers for connection
- **kwargs: Additional connection options
Returns:
Async context manager yielding (read_stream, write_stream)
"""
class StreamableHttpParameters:
def __init__(
self,
url: str,
headers: dict[str, str] | None = None,
method: str = "POST",
**kwargs
):
"""
Parameters for streaming HTTP connection.
Parameters:
- url: HTTP endpoint URL
- headers: HTTP headers
- method: HTTP method
- **kwargs: Additional options
"""Security settings and utilities for protecting transport connections.
class TransportSecuritySettings:
def __init__(
self,
dns_rebinding_protection: bool = True,
allowed_origins: list[str] | None = None,
**kwargs
):
"""
Transport security configuration.
Parameters:
- dns_rebinding_protection: Enable DNS rebinding protection
- allowed_origins: List of allowed origins for CORS
- **kwargs: Additional security options
"""import asyncio
from mcp import ClientSession, stdio_client, StdioServerParameters
async def stdio_example():
# Client connecting to stdio server
server_params = StdioServerParameters(
command=["python", "my_server.py"],
env={"DEBUG": "1"},
cwd="/path/to/server"
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
tools = await session.list_tools()
print(f"Tools: {[t.name for t in tools.tools]}")
# Server using stdio transport
async def stdio_server_example():
from mcp.server import FastMCP
app = FastMCP("stdio-server")
@app.tool()
async def hello() -> str:
return "Hello from stdio server!"
# Run with stdio transport
app.run_stdio()
asyncio.run(stdio_example())import asyncio
from mcp import ClientSession, websocket_client
async def websocket_example():
# Client connecting to WebSocket server
async with websocket_client("ws://localhost:8000/mcp") as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
resources = await session.list_resources()
print(f"Resources: {[r.name for r in resources.resources]}")
# Server using WebSocket transport
async def websocket_server_example():
from mcp.server import FastMCP
app = FastMCP("websocket-server")
@app.resource("ws://data")
async def get_data() -> str:
return "WebSocket server data"
# Run with WebSocket transport
await app.run_websocket(host="0.0.0.0", port=8000)
asyncio.run(websocket_example())import asyncio
from mcp import ClientSession, sse_client
async def sse_example():
# Client connecting to SSE server
headers = {"Authorization": "Bearer token123"}
async with sse_client("http://localhost:8080/mcp", headers=headers) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
prompts = await session.list_prompts()
print(f"Prompts: {[p.name for p in prompts.prompts]}")
# Server using SSE transport
async def sse_server_example():
from mcp.server import FastMCP
app = FastMCP("sse-server")
@app.prompt()
async def generate_report(data_type: str) -> str:
return f"Report for {data_type}: [generated content]"
# Run with SSE transport
await app.run_sse(host="localhost", port=8080)
asyncio.run(sse_example())import asyncio
from mcp import ClientSession, sse_client
from mcp.client.auth import ClientAuth
async def authenticated_transport():
# Client with authentication
auth = ClientAuth(token="bearer_token_here")
headers = await auth.get_headers()
async with sse_client("https://api.example.com/mcp", headers=headers) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Use authenticated session
result = await session.call_tool("secure_tool", {"param": "value"})
print(f"Secure result: {result}")
asyncio.run(authenticated_transport())import asyncio
from mcp.server import FastMCP
app = FastMCP("multi-transport-server")
@app.tool()
async def universal_tool(message: str) -> str:
"""Tool available on all transports."""
return f"Processed: {message}"
@app.resource("config://server")
async def server_config() -> str:
"""Server configuration accessible via all transports."""
return "transport=multi\nstatus=active"
async def run_multi_transport():
"""Run server on multiple transports simultaneously."""
import asyncio
# Create tasks for different transports
tasks = [
asyncio.create_task(app.run_sse(port=8080)),
asyncio.create_task(app.run_websocket(port=8081)),
]
# Also support stdio in separate process
print("Server available on:")
print("- SSE: http://localhost:8080/mcp")
print("- WebSocket: ws://localhost:8081/mcp")
print("- Stdio: python server.py")
await asyncio.gather(*tasks)
if __name__ == "__main__":
# Choose transport via command line or environment
import sys
if len(sys.argv) > 1:
transport = sys.argv[1]
if transport == "stdio":
app.run_stdio()
elif transport == "sse":
asyncio.run(app.run_sse(port=8080))
elif transport == "websocket":
asyncio.run(app.run_websocket(port=8081))
elif transport == "multi":
asyncio.run(run_multi_transport())
else:
# Default to stdio
app.run_stdio()import asyncio
from mcp import ClientSession, streamablehttp_client
from mcp.client.streamable_http import StreamableHttpParameters
async def custom_transport():
# Custom HTTP streaming transport
params = StreamableHttpParameters(
url="https://api.example.com/mcp/stream",
headers={
"Content-Type": "application/json",
"User-Agent": "MCP-Client/1.0",
"X-API-Key": "secret_key"
},
method="POST"
)
async with streamablehttp_client(params.url, headers=params.headers) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# Use streaming connection
for i in range(10):
result = await session.call_tool("stream_processor", {"batch": i})
print(f"Batch {i} result: {result}")
asyncio.run(custom_transport())Install with Tessl CLI
npx tessl i tessl/pypi-mcp