Transports provide pluggable communication mechanisms for MCP clients and servers. The SDK includes multiple built-in transport implementations for different scenarios.
type Transport interface {
Connect(ctx context.Context) (Connection, error)
}All transports implement this interface to establish connections.
type Connection interface {
Read(context.Context) (jsonrpc.Message, error)
Write(context.Context, jsonrpc.Message) error
Close() error
SessionID() string
}Represents a bidirectional JSON-RPC connection between client and server.
Communicates over stdin/stdout using newline-delimited JSON.
type StdioTransport struct{}Methods:
func (t *StdioTransport) Connect(context.Context) (Connection, error)Example:
// Server listening on stdin/stdout
server := mcp.NewServer(impl, nil)
if err := server.Run(ctx, &mcp.StdioTransport{}); err != nil {
log.Fatal(err)
}Runs a command and communicates over its stdin/stdout.
type CommandTransport struct {
Command *exec.Cmd
TerminateDuration time.Duration
}Fields:
Command: Command to execute (e.g., exec.Command("myserver"))TerminateDuration: Time to wait for graceful shutdown before SIGTERM (default: 5s)Methods:
func (t *CommandTransport) Connect(ctx context.Context) (Connection, error)Example:
// Client connecting to a server command
client := mcp.NewClient(impl, nil)
transport := &mcp.CommandTransport{
Command: exec.Command("python", "server.py"),
TerminateDuration: 10 * time.Second,
}
session, err := client.Connect(ctx, transport, nil)
if err != nil {
log.Fatal(err)
}
defer session.Close()Communicates over custom ReadCloser and WriteCloser.
type IOTransport struct {
Reader io.ReadCloser
Writer io.WriteCloser
}Fields:
Reader: Input stream for reading messagesWriter: Output stream for writing messagesMethods:
func (t *IOTransport) Connect(context.Context) (Connection, error)Example:
// Connect over custom streams (e.g., network socket)
reader, writer := getCustomStreams()
transport := &mcp.IOTransport{
Reader: reader,
Writer: writer,
}
session, err := client.Connect(ctx, transport, nil)Provides in-memory communication for testing and local use.
type InMemoryTransport struct {
// Has unexported fields
}Functions:
func NewInMemoryTransports() (*InMemoryTransport, *InMemoryTransport)Returns two connected InMemoryTransport objects for testing.
Methods:
func (t *InMemoryTransport) Connect(context.Context) (Connection, error)Example:
// Create connected transports for testing
clientTransport, serverTransport := mcp.NewInMemoryTransports()
// Run server on one transport
go func() {
server := mcp.NewServer(serverImpl, nil)
if err := server.Run(ctx, serverTransport); err != nil {
log.Printf("Server error: %v", err)
}
}()
// Connect client on the other transport
client := mcp.NewClient(clientImpl, nil)
session, err := client.Connect(ctx, clientTransport, nil)
if err != nil {
log.Fatal(err)
}Client transport for Server-Sent Events (SSE) connections (2024-11-05 spec).
type SSEClientTransport struct {
Endpoint string
HTTPClient *http.Client
}Fields:
Endpoint: SSE endpoint URLHTTPClient: Optional HTTP client (uses http.DefaultClient if nil)Methods:
func (t *SSEClientTransport) Connect(ctx context.Context) (Connection, error)Example:
// Connect to SSE endpoint
transport := &mcp.SSEClientTransport{
Endpoint: "https://example.com/mcp",
HTTPClient: &http.Client{Timeout: 30 * time.Second},
}
session, err := client.Connect(ctx, transport, nil)Server-side transport for SSE sessions created through hanging GET requests.
type SSEServerTransport struct {
Endpoint string
Response http.ResponseWriter
// Has unexported fields
}Fields:
Endpoint: Endpoint URL for client POST requestsResponse: HTTP response writer for sending eventsMethods:
func (t *SSEServerTransport) Connect(context.Context) (Connection, error)
func (t *SSEServerTransport) ServeHTTP(w http.ResponseWriter, req *http.Request)HTTP handler for managing SSE-based MCP sessions.
type SSEHandler struct {
// Has unexported fields
}Functions:
func NewSSEHandler(getServer func(*http.Request) *Server, opts *SSEOptions) *SSEHandlerCreates a new SSEHandler.
Parameters:
getServer: Function that returns a Server for the requestopts: Options (reserved for future use)Methods:
func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)Example:
// Create SSE HTTP handler
server := mcp.NewServer(impl, nil)
handler := mcp.NewSSEHandler(func(req *http.Request) *mcp.Server {
return server
}, nil)
// Serve over HTTP
http.Handle("/mcp", handler)
log.Fatal(http.ListenAndServe(":8080", nil))type SSEOptions struct{}Options for SSEHandler (reserved for future use).
HTTP handler for streamable HTTP transport (2025-03-26 spec) with support for server-sent events.
type StreamableHTTPHandler struct {
// Has unexported fields
}Functions:
func NewStreamableHTTPHandler(getServer func(*http.Request) *Server, opts *StreamableHTTPOptions) *StreamableHTTPHandlerCreates a new StreamableHTTPHandler.
Parameters:
getServer: Function that returns a Server for the requestopts: Options including event storeMethods:
func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)Example:
// Create streamable HTTP handler with event store
server := mcp.NewServer(impl, nil)
handler := mcp.NewStreamableHTTPHandler(
func(req *http.Request) *mcp.Server {
return server
},
&mcp.StreamableHTTPOptions{
EventStore: myEventStore,
},
)
http.Handle("/mcp", handler)
log.Fatal(http.ListenAndServe(":8080", nil))type StreamableHTTPOptions struct {
EventStore EventStore
}Options for StreamableHTTPHandler.
Fields:
EventStore: Optional storage for server-sent eventsWrapper transport that logs all JSON-RPC messages.
type LoggingTransport struct {
Transport Transport
Writer io.Writer
}Fields:
Transport: Underlying transport to wrapWriter: Where to write log messages (e.g., os.Stderr)Methods:
func (t *LoggingTransport) Connect(ctx context.Context) (Connection, error)Example:
// Wrap any transport with logging
baseTransport := &mcp.StdioTransport{}
loggingTransport := &mcp.LoggingTransport{
Transport: baseTransport,
Writer: os.Stderr,
}
server := mcp.NewServer(impl, nil)
if err := server.Run(ctx, loggingTransport); err != nil {
log.Fatal(err)
}type EventStore interface {
Open(ctx context.Context, sessionID, streamID string) error
Append(ctx context.Context, sessionID, streamID string, data []byte) error
After(ctx context.Context, sessionID, streamID string, index int) iter.Seq2[[]byte, error]
Close(ctx context.Context, sessionID, streamID string) error
}Interface for storing and replaying server-sent events in streamable HTTP transport.
Methods:
Open: Initialize storage for a new event streamAppend: Add event data to the streamAfter: Returns iterator over events after given indexClose: Clean up storage for streamtype Event struct {
Name string
ID string
Data []byte
}Represents a server-sent event.
Methods:
func (e Event) Empty() boolReports whether the event is empty.
var ErrEventsPurged = errors.New("data purged")Error returned by EventStore.After if events are no longer available.
var ErrConnectionClosed = errors.New("connection closed")Returned when attempting to send a message to a closed connection.
To implement a custom transport:
Transport interfaceConnection that handles JSON-RPC messagesExample:
type CustomTransport struct {
endpoint string
}
func (t *CustomTransport) Connect(ctx context.Context) (mcp.Connection, error) {
// Establish connection to your backend
conn, err := dialCustomProtocol(t.endpoint)
if err != nil {
return nil, err
}
return &customConnection{conn: conn}, nil
}
type customConnection struct {
conn net.Conn
decoder *json.Decoder
encoder *json.Encoder
mu sync.Mutex
}
func (c *customConnection) Read(ctx context.Context) (jsonrpc.Message, error) {
var msg jsonrpc.Message
if err := c.decoder.Decode(&msg); err != nil {
return nil, err
}
return msg, nil
}
func (c *customConnection) Write(ctx context.Context, msg jsonrpc.Message) error {
c.mu.Lock()
defer c.mu.Unlock()
return c.encoder.Encode(msg)
}
func (c *customConnection) Close() error {
return c.conn.Close()
}
func (c *customConnection) SessionID() string {
return "custom-session-id"
}| Transport | Use Case | Pros | Cons |
|---|---|---|---|
| StdioTransport | CLI tools, subprocess | Simple, universal | No network support |
| CommandTransport | Launching server processes | Auto-lifecycle | Process overhead |
| IOTransport | Custom I/O needs | Flexible | Manual stream management |
| InMemoryTransport | Testing, embedded servers | Fast, no I/O | Not for production |
| SSEClientTransport | Browser-friendly servers | HTTP-based, firewall-friendly | Higher overhead |
| SSEServerTransport | Web applications | Standard HTTP | Unidirectional events |
| StreamableHTTPHandler | Production web servers | Full-duplex HTTP, event replay | More complex |
| LoggingTransport | Debugging | Inspect all messages | Performance impact |
// Wrap CommandTransport to add environment variables
cmd := exec.Command("server")
cmd.Env = append(os.Environ(), "MCP_API_KEY=secret")
transport := &mcp.CommandTransport{Command: cmd}// Route to different servers based on request
handler := mcp.NewSSEHandler(func(req *http.Request) *mcp.Server {
switch req.Header.Get("X-Service") {
case "tools":
return toolsServer
case "resources":
return resourcesServer
default:
return defaultServer
}
}, nil)// Generate custom session IDs
server := mcp.NewServer(impl, &mcp.ServerOptions{
GetSessionID: func() string {
return fmt.Sprintf("session-%s", uuid.New())
},
})// CommandTransport with custom termination timeout
transport := &mcp.CommandTransport{
Command: exec.Command("server"),
TerminateDuration: 30 * time.Second, // Wait 30s before SIGTERM
}// Implement automatic reconnection
func connectWithRetry(ctx context.Context, client *mcp.Client, transport mcp.Transport, maxRetries int) (*mcp.ClientSession, error) {
var session *mcp.ClientSession
var err error
for i := 0; i < maxRetries; i++ {
session, err = client.Connect(ctx, transport, nil)
if err == nil {
return session, nil
}
log.Printf("Connection attempt %d failed: %v", i+1, err)
time.Sleep(time.Second * time.Duration(i+1))
}
return nil, fmt.Errorf("failed after %d retries: %w", maxRetries, err)
}