or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

auth.mdindex.mdjsonrpc.mdmcp-capabilities.mdmcp-client.mdmcp-content.mdmcp-protocol.mdmcp-server.mdmcp-transports.mdoauthex.md
tile.json

mcp-transports.mddocs/

MCP Transports

Transports provide pluggable communication mechanisms for MCP clients and servers. The SDK includes multiple built-in transport implementations for different scenarios.

Transport Interface

type Transport interface {
	Connect(ctx context.Context) (Connection, error)
}

All transports implement this interface to establish connections.

Connection Interface

type Connection interface {
	Read(context.Context) (jsonrpc.Message, error)
	Write(context.Context, jsonrpc.Message) error
	Close() error
	SessionID() string
}

Represents a bidirectional JSON-RPC connection between client and server.

Built-in Transports

StdioTransport

Communicates over stdin/stdout using newline-delimited JSON.

type StdioTransport struct{}

Methods:

func (t *StdioTransport) Connect(context.Context) (Connection, error)

Example:

// Server listening on stdin/stdout
server := mcp.NewServer(impl, nil)
if err := server.Run(ctx, &mcp.StdioTransport{}); err != nil {
	log.Fatal(err)
}

CommandTransport

Runs a command and communicates over its stdin/stdout.

type CommandTransport struct {
	Command           *exec.Cmd
	TerminateDuration time.Duration
}

Fields:

  • Command: Command to execute (e.g., exec.Command("myserver"))
  • TerminateDuration: Time to wait for graceful shutdown before SIGTERM (default: 5s)

Methods:

func (t *CommandTransport) Connect(ctx context.Context) (Connection, error)

Example:

// Client connecting to a server command
client := mcp.NewClient(impl, nil)
transport := &mcp.CommandTransport{
	Command:           exec.Command("python", "server.py"),
	TerminateDuration: 10 * time.Second,
}
session, err := client.Connect(ctx, transport, nil)
if err != nil {
	log.Fatal(err)
}
defer session.Close()

IOTransport

Communicates over custom ReadCloser and WriteCloser.

type IOTransport struct {
	Reader io.ReadCloser
	Writer io.WriteCloser
}

Fields:

  • Reader: Input stream for reading messages
  • Writer: Output stream for writing messages

Methods:

func (t *IOTransport) Connect(context.Context) (Connection, error)

Example:

// Connect over custom streams (e.g., network socket)
reader, writer := getCustomStreams()
transport := &mcp.IOTransport{
	Reader: reader,
	Writer: writer,
}
session, err := client.Connect(ctx, transport, nil)

InMemoryTransport

Provides in-memory communication for testing and local use.

type InMemoryTransport struct {
	// Has unexported fields
}

Functions:

func NewInMemoryTransports() (*InMemoryTransport, *InMemoryTransport)

Returns two connected InMemoryTransport objects for testing.

Methods:

func (t *InMemoryTransport) Connect(context.Context) (Connection, error)

Example:

// Create connected transports for testing
clientTransport, serverTransport := mcp.NewInMemoryTransports()

// Run server on one transport
go func() {
	server := mcp.NewServer(serverImpl, nil)
	if err := server.Run(ctx, serverTransport); err != nil {
		log.Printf("Server error: %v", err)
	}
}()

// Connect client on the other transport
client := mcp.NewClient(clientImpl, nil)
session, err := client.Connect(ctx, clientTransport, nil)
if err != nil {
	log.Fatal(err)
}

SSEClientTransport

Client transport for Server-Sent Events (SSE) connections (2024-11-05 spec).

type SSEClientTransport struct {
	Endpoint   string
	HTTPClient *http.Client
}

Fields:

  • Endpoint: SSE endpoint URL
  • HTTPClient: Optional HTTP client (uses http.DefaultClient if nil)

Methods:

func (t *SSEClientTransport) Connect(ctx context.Context) (Connection, error)

Example:

// Connect to SSE endpoint
transport := &mcp.SSEClientTransport{
	Endpoint:   "https://example.com/mcp",
	HTTPClient: &http.Client{Timeout: 30 * time.Second},
}
session, err := client.Connect(ctx, transport, nil)

SSEServerTransport

Server-side transport for SSE sessions created through hanging GET requests.

type SSEServerTransport struct {
	Endpoint string
	Response http.ResponseWriter
	// Has unexported fields
}

Fields:

  • Endpoint: Endpoint URL for client POST requests
  • Response: HTTP response writer for sending events

Methods:

func (t *SSEServerTransport) Connect(context.Context) (Connection, error)
func (t *SSEServerTransport) ServeHTTP(w http.ResponseWriter, req *http.Request)

SSEHandler

HTTP handler for managing SSE-based MCP sessions.

type SSEHandler struct {
	// Has unexported fields
}

Functions:

func NewSSEHandler(getServer func(*http.Request) *Server, opts *SSEOptions) *SSEHandler

Creates a new SSEHandler.

Parameters:

  • getServer: Function that returns a Server for the request
  • opts: Options (reserved for future use)

Methods:

func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)

Example:

// Create SSE HTTP handler
server := mcp.NewServer(impl, nil)
handler := mcp.NewSSEHandler(func(req *http.Request) *mcp.Server {
	return server
}, nil)

// Serve over HTTP
http.Handle("/mcp", handler)
log.Fatal(http.ListenAndServe(":8080", nil))
type SSEOptions struct{}

Options for SSEHandler (reserved for future use).

StreamableHTTPHandler

HTTP handler for streamable HTTP transport (2025-03-26 spec) with support for server-sent events.

type StreamableHTTPHandler struct {
	// Has unexported fields
}

Functions:

func NewStreamableHTTPHandler(getServer func(*http.Request) *Server, opts *StreamableHTTPOptions) *StreamableHTTPHandler

Creates a new StreamableHTTPHandler.

Parameters:

  • getServer: Function that returns a Server for the request
  • opts: Options including event store

Methods:

func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)

Example:

// Create streamable HTTP handler with event store
server := mcp.NewServer(impl, nil)
handler := mcp.NewStreamableHTTPHandler(
	func(req *http.Request) *mcp.Server {
		return server
	},
	&mcp.StreamableHTTPOptions{
		EventStore: myEventStore,
	},
)

http.Handle("/mcp", handler)
log.Fatal(http.ListenAndServe(":8080", nil))
type StreamableHTTPOptions struct {
	EventStore EventStore
}

Options for StreamableHTTPHandler.

Fields:

  • EventStore: Optional storage for server-sent events

LoggingTransport

Wrapper transport that logs all JSON-RPC messages.

type LoggingTransport struct {
	Transport Transport
	Writer    io.Writer
}

Fields:

  • Transport: Underlying transport to wrap
  • Writer: Where to write log messages (e.g., os.Stderr)

Methods:

func (t *LoggingTransport) Connect(ctx context.Context) (Connection, error)

Example:

// Wrap any transport with logging
baseTransport := &mcp.StdioTransport{}
loggingTransport := &mcp.LoggingTransport{
	Transport: baseTransport,
	Writer:    os.Stderr,
}
server := mcp.NewServer(impl, nil)
if err := server.Run(ctx, loggingTransport); err != nil {
	log.Fatal(err)
}

Event Storage

EventStore Interface

type EventStore interface {
	Open(ctx context.Context, sessionID, streamID string) error
	Append(ctx context.Context, sessionID, streamID string, data []byte) error
	After(ctx context.Context, sessionID, streamID string, index int) iter.Seq2[[]byte, error]
	Close(ctx context.Context, sessionID, streamID string) error
}

Interface for storing and replaying server-sent events in streamable HTTP transport.

Methods:

  • Open: Initialize storage for a new event stream
  • Append: Add event data to the stream
  • After: Returns iterator over events after given index
  • Close: Clean up storage for stream

Event Type

type Event struct {
	Name string
	ID   string
	Data []byte
}

Represents a server-sent event.

Methods:

func (e Event) Empty() bool

Reports whether the event is empty.

Errors

var ErrEventsPurged = errors.New("data purged")

Error returned by EventStore.After if events are no longer available.

var ErrConnectionClosed = errors.New("connection closed")

Returned when attempting to send a message to a closed connection.

Custom Transport Implementation

To implement a custom transport:

  1. Implement the Transport interface
  2. Create a Connection that handles JSON-RPC messages
  3. Ensure proper error handling and cleanup

Example:

type CustomTransport struct {
	endpoint string
}

func (t *CustomTransport) Connect(ctx context.Context) (mcp.Connection, error) {
	// Establish connection to your backend
	conn, err := dialCustomProtocol(t.endpoint)
	if err != nil {
		return nil, err
	}

	return &customConnection{conn: conn}, nil
}

type customConnection struct {
	conn    net.Conn
	decoder *json.Decoder
	encoder *json.Encoder
	mu      sync.Mutex
}

func (c *customConnection) Read(ctx context.Context) (jsonrpc.Message, error) {
	var msg jsonrpc.Message
	if err := c.decoder.Decode(&msg); err != nil {
		return nil, err
	}
	return msg, nil
}

func (c *customConnection) Write(ctx context.Context, msg jsonrpc.Message) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	return c.encoder.Encode(msg)
}

func (c *customConnection) Close() error {
	return c.conn.Close()
}

func (c *customConnection) SessionID() string {
	return "custom-session-id"
}

Transport Selection Guide

TransportUse CaseProsCons
StdioTransportCLI tools, subprocessSimple, universalNo network support
CommandTransportLaunching server processesAuto-lifecycleProcess overhead
IOTransportCustom I/O needsFlexibleManual stream management
InMemoryTransportTesting, embedded serversFast, no I/ONot for production
SSEClientTransportBrowser-friendly serversHTTP-based, firewall-friendlyHigher overhead
SSEServerTransportWeb applicationsStandard HTTPUnidirectional events
StreamableHTTPHandlerProduction web serversFull-duplex HTTP, event replayMore complex
LoggingTransportDebuggingInspect all messagesPerformance impact

Advanced Transport Patterns

Transport with Authentication

// Wrap CommandTransport to add environment variables
cmd := exec.Command("server")
cmd.Env = append(os.Environ(), "MCP_API_KEY=secret")
transport := &mcp.CommandTransport{Command: cmd}

Multiplexing Multiple Servers

// Route to different servers based on request
handler := mcp.NewSSEHandler(func(req *http.Request) *mcp.Server {
	switch req.Header.Get("X-Service") {
	case "tools":
		return toolsServer
	case "resources":
		return resourcesServer
	default:
		return defaultServer
	}
}, nil)

Custom Session IDs

// Generate custom session IDs
server := mcp.NewServer(impl, &mcp.ServerOptions{
	GetSessionID: func() string {
		return fmt.Sprintf("session-%s", uuid.New())
	},
})

Graceful Shutdown

// CommandTransport with custom termination timeout
transport := &mcp.CommandTransport{
	Command:           exec.Command("server"),
	TerminateDuration: 30 * time.Second, // Wait 30s before SIGTERM
}

Connection Recovery

// Implement automatic reconnection
func connectWithRetry(ctx context.Context, client *mcp.Client, transport mcp.Transport, maxRetries int) (*mcp.ClientSession, error) {
	var session *mcp.ClientSession
	var err error

	for i := 0; i < maxRetries; i++ {
		session, err = client.Connect(ctx, transport, nil)
		if err == nil {
			return session, nil
		}

		log.Printf("Connection attempt %d failed: %v", i+1, err)
		time.Sleep(time.Second * time.Duration(i+1))
	}

	return nil, fmt.Errorf("failed after %d retries: %w", maxRetries, err)
}