or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

admin.mdadvanced.mdclient-server.mdcredentials-security.mderrors-status.mdhealth.mdindex.mdinterceptors.mdload-balancing.mdmetadata-context.mdname-resolution.mdobservability.mdreflection.mdstreaming.mdtesting.mdxds.md
tile.json

streaming.mddocs/

Streaming

gRPC supports four types of RPC patterns: unary, client streaming, server streaming, and bidirectional streaming. This document covers all streaming interfaces and patterns.

RPC Patterns

  1. Unary RPC: Client sends single request, receives single response
  2. Server Streaming RPC: Client sends single request, receives stream of responses
  3. Client Streaming RPC: Client sends stream of requests, receives single response
  4. Bidirectional Streaming RPC: Both client and server send streams of messages

Stream Descriptor

type StreamDesc struct {
    StreamName    string        // Method name excluding the service
    Handler       StreamHandler // Handler called for the method
    ServerStreams bool          // Indicates server can perform streaming sends
    ClientStreams bool          // Indicates client can perform streaming sends
}

type StreamHandler func(srv any, stream ServerStream) error

Client-Side Streaming Interfaces

ClientStream

Base interface for all client-side streams.

type ClientStream interface {
    // Header returns the header metadata received from the server
    // Blocks if the metadata is not ready to read
    Header() (metadata.MD, error)

    // Trailer returns the trailer metadata from the server
    // Must only be called after stream completion
    Trailer() metadata.MD

    // CloseSend closes the send direction of the stream
    CloseSend() error

    // Context returns the context for this stream
    Context() context.Context

    // SendMsg sends a message on the stream
    SendMsg(m any) error

    // RecvMsg receives a message from the stream
    // Returns io.EOF when stream completes successfully
    RecvMsg(m any) error
}

ServerStreamingClient

Client side of server streaming RPC (one request, many responses).

type ServerStreamingClient[Res any] interface {
    // Recv receives the next response message from the server
    // Returns io.EOF when stream completes with OK status
    Recv() (*Res, error)

    // ClientStream provides Context, Header, and Trailer
    ClientStream
}

Example:

stream, err := client.ListFeatures(ctx, &pb.Rectangle{...})
if err != nil {
    log.Fatalf("ListFeatures failed: %v", err)
}

for {
    feature, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Fatalf("ListFeatures error: %v", err)
    }
    log.Printf("Feature: %v", feature)
}

ClientStreamingClient

Client side of client streaming RPC (many requests, one response).

type ClientStreamingClient[Req any, Res any] interface {
    // Send sends a request message to the server
    // Can be called multiple times
    Send(*Req) error

    // CloseAndRecv closes the request stream and waits for response
    // Must be called once and only once after sending all requests
    CloseAndRecv() (*Res, error)

    // ClientStream provides Context, Header, and Trailer
    ClientStream
}

Example:

stream, err := client.RecordRoute(ctx)
if err != nil {
    log.Fatalf("RecordRoute failed: %v", err)
}

for _, point := range points {
    if err := stream.Send(point); err != nil {
        log.Fatalf("Send error: %v", err)
    }
}

summary, err := stream.CloseAndRecv()
if err != nil {
    log.Fatalf("CloseAndRecv error: %v", err)
}
log.Printf("Summary: %v", summary)

BidiStreamingClient

Client side of bidirectional streaming RPC (many requests, many responses).

type BidiStreamingClient[Req any, Res any] interface {
    // Send sends a request message to the server
    // Can be called multiple times
    Send(*Req) error

    // Recv receives a response message from the server
    // Can be called multiple times
    // Returns io.EOF when server closes stream with OK status
    Recv() (*Res, error)

    // ClientStream provides Context, Header, Trailer, and CloseSend
    ClientStream
}

Example:

stream, err := client.RouteChat(ctx)
if err != nil {
    log.Fatalf("RouteChat failed: %v", err)
}

// Create wait group for send and receive goroutines
waitc := make(chan struct{})

// Receive goroutine
go func() {
    for {
        note, err := stream.Recv()
        if err == io.EOF {
            close(waitc)
            return
        }
        if err != nil {
            log.Fatalf("Recv error: %v", err)
        }
        log.Printf("Got message: %v", note)
    }
}()

// Send messages
for _, note := range notes {
    if err := stream.Send(note); err != nil {
        log.Fatalf("Send error: %v", err)
    }
}
stream.CloseSend()
<-waitc

GenericClientStream

Generic wrapper implementing all client streaming interfaces.

type GenericClientStream[Req any, Res any] struct {
    ClientStream
}

func (x *GenericClientStream[Req, Res]) Send(m *Req) error
func (x *GenericClientStream[Req, Res]) Recv() (*Res, error)
func (x *GenericClientStream[Req, Res]) CloseAndRecv() (*Res, error)

Used in generated code to provide type-safe streaming.

Server-Side Streaming Interfaces

ServerStream

Base interface for all server-side streams.

type ServerStream interface {
    // SetHeader sets the header metadata
    // Can be called multiple times; all metadata will be merged
    SetHeader(metadata.MD) error

    // SendHeader sends the header metadata
    // Can only be called once
    SendHeader(metadata.MD) error

    // SetTrailer sets the trailer metadata
    SetTrailer(metadata.MD)

    // Context returns the context for this stream
    Context() context.Context

    // SendMsg sends a message on the stream
    SendMsg(m any) error

    // RecvMsg receives a message from the stream
    // Returns io.EOF when client calls CloseSend
    RecvMsg(m any) error
}

ServerStreamingServer

Server side of server streaming RPC (one request, many responses).

type ServerStreamingServer[Res any] interface {
    // Send sends a response message to the client
    // Can be called multiple times
    Send(*Res) error

    // ServerStream provides Context, SetHeader, SendHeader, and SetTrailer
    ServerStream
}

Example:

func (s *server) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
    for _, feature := range s.features {
        if inRange(feature.Location, rect) {
            if err := stream.Send(feature); err != nil {
                return err
            }
        }
    }
    return nil
}

ClientStreamingServer

Server side of client streaming RPC (many requests, one response).

type ClientStreamingServer[Req any, Res any] interface {
    // Recv receives the next request message from the client
    // Returns io.EOF when client calls CloseAndRecv
    Recv() (*Req, error)

    // SendAndClose sends a single response and closes the stream
    // Must be called once and only once
    SendAndClose(*Res) error

    // ServerStream provides Context, SetHeader, SendHeader, and SetTrailer
    ServerStream
}

Example:

func (s *server) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
    var pointCount int32
    startTime := time.Now()

    for {
        point, err := stream.Recv()
        if err == io.EOF {
            endTime := time.Now()
            return stream.SendAndClose(&pb.RouteSummary{
                PointCount: pointCount,
                ElapsedTime: int32(endTime.Sub(startTime).Seconds()),
            })
        }
        if err != nil {
            return err
        }
        pointCount++
    }
}

BidiStreamingServer

Server side of bidirectional streaming RPC (many requests, many responses).

type BidiStreamingServer[Req any, Res any] interface {
    // Recv receives the next request message from the client
    // Returns io.EOF when client closes stream
    Recv() (*Req, error)

    // Send sends a response message to the client
    // Can be called multiple times
    Send(*Res) error

    // ServerStream provides Context, SetHeader, SendHeader, and SetTrailer
    ServerStream
}

Example:

func (s *server) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
    for {
        note, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }

        // Process and send response
        for _, prevNote := range s.getPreviousNotes(note.Location) {
            if err := stream.Send(prevNote); err != nil {
                return err
            }
        }
    }
}

GenericServerStream

Generic wrapper implementing all server streaming interfaces.

type GenericServerStream[Req any, Res any] struct {
    ServerStream
}

func (x *GenericServerStream[Req, Res]) Send(m *Res) error
func (x *GenericServerStream[Req, Res]) Recv() (*Req, error)
func (x *GenericServerStream[Req, Res]) SendAndClose(m *Res) error

Used in generated code to provide type-safe streaming.

Stream Creation

Client-Side

// Create a new client stream
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)

// Wrapper function
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error)

Example:

desc := &grpc.StreamDesc{
    StreamName:    "MyStream",
    ServerStreams: true,
    ClientStreams: true,
}

stream, err := conn.NewStream(ctx, desc, "/mypackage.MyService/MyStream")
if err != nil {
    log.Fatalf("Failed to create stream: %v", err)
}

Streaming Interceptors

Client-Side

type StreamClientInterceptor func(
    ctx context.Context,
    desc *StreamDesc,
    cc *ClientConn,
    method string,
    streamer Streamer,
    opts ...CallOption,
) (ClientStream, error)

type Streamer func(
    ctx context.Context,
    desc *StreamDesc,
    cc *ClientConn,
    method string,
    opts ...CallOption,
) (ClientStream, error)

Example:

func streamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc,
    cc *grpc.ClientConn, method string, streamer grpc.Streamer,
    opts ...grpc.CallOption) (grpc.ClientStream, error) {

    log.Printf("Starting stream: %s", method)
    s, err := streamer(ctx, desc, cc, method, opts...)
    if err != nil {
        return nil, err
    }
    return &wrappedStream{ClientStream: s, method: method}, nil
}

type wrappedStream struct {
    grpc.ClientStream
    method string
}

func (w *wrappedStream) SendMsg(m interface{}) error {
    log.Printf("Send to %s: %v", w.method, m)
    return w.ClientStream.SendMsg(m)
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
    log.Printf("Recv from %s", w.method)
    return w.ClientStream.RecvMsg(m)
}

Server-Side

type StreamServerInterceptor func(
    srv any,
    ss ServerStream,
    info *StreamServerInfo,
    handler StreamHandler,
) error

type StreamServerInfo struct {
    FullMethod     string
    IsClientStream bool
    IsServerStream bool
}

Example:

func streamServerInterceptor(srv interface{}, ss grpc.ServerStream,
    info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {

    log.Printf("Stream started: %s", info.FullMethod)
    err := handler(srv, &wrappedServerStream{ServerStream: ss, method: info.FullMethod})
    log.Printf("Stream completed: %s, error: %v", info.FullMethod, err)
    return err
}

type wrappedServerStream struct {
    grpc.ServerStream
    method string
}

func (w *wrappedServerStream) SendMsg(m interface{}) error {
    log.Printf("Send on %s: %v", w.method, m)
    return w.ServerStream.SendMsg(m)
}

func (w *wrappedServerStream) RecvMsg(m interface{}) error {
    log.Printf("Recv on %s", w.method)
    return w.ServerStream.RecvMsg(m)
}

Stream Lifecycle

Client-Side Stream Lifecycle

  1. Creation: Call NewStream or generated client method
  2. Sending: Call Send/SendMsg (multiple times for streaming)
  3. Receiving: Call Recv/RecvMsg (multiple times for streaming)
  4. Half-close: Call CloseSend (for bidirectional streaming)
  5. Completion: Receive io.EOF or error
  6. Cleanup: Stream resources automatically cleaned up

Server-Side Stream Lifecycle

  1. Invocation: Handler called by gRPC framework
  2. Receiving: Call Recv to read client messages
  3. Sending: Call Send to write response messages
  4. Completion: Return from handler (nil for success, error for failure)
  5. Cleanup: Framework sends status and trailers

Resource Management

Client-Side

To prevent resource leaks, ensure one of the following:

  1. Call Close() on the ClientConn
  2. Cancel the context provided to the stream
  3. Call RecvMsg until a non-nil error is returned
  4. Receive a non-nil, non-io.EOF error from Header or SendMsg

Server-Side

Resources are automatically managed. Return from the handler to complete the stream.

Deprecated Interfaces

// Deprecated: Use ClientStream and ServerStream instead
type Stream interface {
    Context() context.Context
    SendMsg(m any) error
    RecvMsg(m any) error
}

Error Handling

All errors from stream methods are compatible with the status package:

_, err := stream.Recv()
if err != nil {
    if err == io.EOF {
        // Stream ended successfully
    } else {
        // Get status
        st, ok := status.FromError(err)
        if ok {
            log.Printf("Stream error: code=%s, msg=%s",
                st.Code(), st.Message())
        }
    }
}

Best Practices

General

  1. Always check for io.EOF: Indicates successful stream completion
  2. Handle errors immediately: Don't continue after receiving an error
  3. Use context for cancellation: Provide cancelable contexts for timeouts
  4. Close streams properly: Call CloseSend on client bidirectional streams

Client-Side

  1. Separate goroutines: Use separate goroutines for concurrent send/recv
  2. Wait for completion: Use channels to coordinate goroutine completion
  3. Handle metadata: Read headers and trailers for additional information
  4. Set call options: Configure per-stream behavior with call options

Server-Side

  1. Return errors properly: Use status.Error for proper error codes
  2. Set metadata early: Call SetHeader before first Send or return
  3. Check context: Monitor stream.Context() for cancellation
  4. Avoid blocking: Don't block indefinitely in stream handlers
  5. SendAndClose once: Only call SendAndClose once in client streaming

Performance

  1. Batch messages: Send multiple small messages as one larger message
  2. Use buffering: Leverage gRPC's internal buffering
  3. Consider message size: Large messages can cause memory pressure
  4. Monitor flow control: Be aware of window sizes and backpressure

Complete Example

// Server implementation
func (s *server) BidiStream(stream pb.Service_BidiStreamServer) error {
    ctx := stream.Context()

    // Set initial metadata
    if err := stream.SetHeader(metadata.Pairs("initial", "header")); err != nil {
        return status.Errorf(codes.Internal, "failed to set header: %v", err)
    }

    // Process messages
    for {
        // Check if context is cancelled
        select {
        case <-ctx.Done():
            return status.Error(codes.Canceled, "stream cancelled")
        default:
        }

        // Receive message
        req, err := stream.Recv()
        if err == io.EOF {
            stream.SetTrailer(metadata.Pairs("final", "trailer"))
            return nil
        }
        if err != nil {
            return status.Errorf(codes.Internal, "recv error: %v", err)
        }

        // Process and send response
        resp := &pb.Response{Data: process(req.Data)}
        if err := stream.Send(resp); err != nil {
            return status.Errorf(codes.Internal, "send error: %v", err)
        }
    }
}

// Client usage
func runClient(client pb.ServiceClient) error {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    stream, err := client.BidiStream(ctx)
    if err != nil {
        return fmt.Errorf("failed to create stream: %w", err)
    }

    // Read header
    header, err := stream.Header()
    if err != nil {
        return fmt.Errorf("failed to get header: %w", err)
    }
    log.Printf("Header: %v", header)

    // Coordinate send and receive
    waitc := make(chan struct{})
    errc := make(chan error, 1)

    // Receive goroutine
    go func() {
        defer close(waitc)
        for {
            resp, err := stream.Recv()
            if err == io.EOF {
                return
            }
            if err != nil {
                errc <- fmt.Errorf("recv error: %w", err)
                return
            }
            log.Printf("Received: %v", resp)
        }
    }()

    // Send messages
    for i := 0; i < 10; i++ {
        if err := stream.Send(&pb.Request{Data: fmt.Sprintf("msg-%d", i)}); err != nil {
            return fmt.Errorf("send error: %w", err)
        }
    }

    // Close send side
    if err := stream.CloseSend(); err != nil {
        return fmt.Errorf("close send error: %w", err)
    }

    // Wait for receive goroutine
    <-waitc

    // Check for errors
    select {
    case err := <-errc:
        return err
    default:
    }

    // Read trailer
    trailer := stream.Trailer()
    log.Printf("Trailer: %v", trailer)

    return nil
}