gRPC supports four types of RPC patterns: unary, client streaming, server streaming, and bidirectional streaming. This document covers all streaming interfaces and patterns.
type StreamDesc struct {
StreamName string // Method name excluding the service
Handler StreamHandler // Handler called for the method
ServerStreams bool // Indicates server can perform streaming sends
ClientStreams bool // Indicates client can perform streaming sends
}
type StreamHandler func(srv any, stream ServerStream) errorBase interface for all client-side streams.
type ClientStream interface {
// Header returns the header metadata received from the server
// Blocks if the metadata is not ready to read
Header() (metadata.MD, error)
// Trailer returns the trailer metadata from the server
// Must only be called after stream completion
Trailer() metadata.MD
// CloseSend closes the send direction of the stream
CloseSend() error
// Context returns the context for this stream
Context() context.Context
// SendMsg sends a message on the stream
SendMsg(m any) error
// RecvMsg receives a message from the stream
// Returns io.EOF when stream completes successfully
RecvMsg(m any) error
}Client side of server streaming RPC (one request, many responses).
type ServerStreamingClient[Res any] interface {
// Recv receives the next response message from the server
// Returns io.EOF when stream completes with OK status
Recv() (*Res, error)
// ClientStream provides Context, Header, and Trailer
ClientStream
}Example:
stream, err := client.ListFeatures(ctx, &pb.Rectangle{...})
if err != nil {
log.Fatalf("ListFeatures failed: %v", err)
}
for {
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("ListFeatures error: %v", err)
}
log.Printf("Feature: %v", feature)
}Client side of client streaming RPC (many requests, one response).
type ClientStreamingClient[Req any, Res any] interface {
// Send sends a request message to the server
// Can be called multiple times
Send(*Req) error
// CloseAndRecv closes the request stream and waits for response
// Must be called once and only once after sending all requests
CloseAndRecv() (*Res, error)
// ClientStream provides Context, Header, and Trailer
ClientStream
}Example:
stream, err := client.RecordRoute(ctx)
if err != nil {
log.Fatalf("RecordRoute failed: %v", err)
}
for _, point := range points {
if err := stream.Send(point); err != nil {
log.Fatalf("Send error: %v", err)
}
}
summary, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("CloseAndRecv error: %v", err)
}
log.Printf("Summary: %v", summary)Client side of bidirectional streaming RPC (many requests, many responses).
type BidiStreamingClient[Req any, Res any] interface {
// Send sends a request message to the server
// Can be called multiple times
Send(*Req) error
// Recv receives a response message from the server
// Can be called multiple times
// Returns io.EOF when server closes stream with OK status
Recv() (*Res, error)
// ClientStream provides Context, Header, Trailer, and CloseSend
ClientStream
}Example:
stream, err := client.RouteChat(ctx)
if err != nil {
log.Fatalf("RouteChat failed: %v", err)
}
// Create wait group for send and receive goroutines
waitc := make(chan struct{})
// Receive goroutine
go func() {
for {
note, err := stream.Recv()
if err == io.EOF {
close(waitc)
return
}
if err != nil {
log.Fatalf("Recv error: %v", err)
}
log.Printf("Got message: %v", note)
}
}()
// Send messages
for _, note := range notes {
if err := stream.Send(note); err != nil {
log.Fatalf("Send error: %v", err)
}
}
stream.CloseSend()
<-waitcGeneric wrapper implementing all client streaming interfaces.
type GenericClientStream[Req any, Res any] struct {
ClientStream
}
func (x *GenericClientStream[Req, Res]) Send(m *Req) error
func (x *GenericClientStream[Req, Res]) Recv() (*Res, error)
func (x *GenericClientStream[Req, Res]) CloseAndRecv() (*Res, error)Used in generated code to provide type-safe streaming.
Base interface for all server-side streams.
type ServerStream interface {
// SetHeader sets the header metadata
// Can be called multiple times; all metadata will be merged
SetHeader(metadata.MD) error
// SendHeader sends the header metadata
// Can only be called once
SendHeader(metadata.MD) error
// SetTrailer sets the trailer metadata
SetTrailer(metadata.MD)
// Context returns the context for this stream
Context() context.Context
// SendMsg sends a message on the stream
SendMsg(m any) error
// RecvMsg receives a message from the stream
// Returns io.EOF when client calls CloseSend
RecvMsg(m any) error
}Server side of server streaming RPC (one request, many responses).
type ServerStreamingServer[Res any] interface {
// Send sends a response message to the client
// Can be called multiple times
Send(*Res) error
// ServerStream provides Context, SetHeader, SendHeader, and SetTrailer
ServerStream
}Example:
func (s *server) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
for _, feature := range s.features {
if inRange(feature.Location, rect) {
if err := stream.Send(feature); err != nil {
return err
}
}
}
return nil
}Server side of client streaming RPC (many requests, one response).
type ClientStreamingServer[Req any, Res any] interface {
// Recv receives the next request message from the client
// Returns io.EOF when client calls CloseAndRecv
Recv() (*Req, error)
// SendAndClose sends a single response and closes the stream
// Must be called once and only once
SendAndClose(*Res) error
// ServerStream provides Context, SetHeader, SendHeader, and SetTrailer
ServerStream
}Example:
func (s *server) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
var pointCount int32
startTime := time.Now()
for {
point, err := stream.Recv()
if err == io.EOF {
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
PointCount: pointCount,
ElapsedTime: int32(endTime.Sub(startTime).Seconds()),
})
}
if err != nil {
return err
}
pointCount++
}
}Server side of bidirectional streaming RPC (many requests, many responses).
type BidiStreamingServer[Req any, Res any] interface {
// Recv receives the next request message from the client
// Returns io.EOF when client closes stream
Recv() (*Req, error)
// Send sends a response message to the client
// Can be called multiple times
Send(*Res) error
// ServerStream provides Context, SetHeader, SendHeader, and SetTrailer
ServerStream
}Example:
func (s *server) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for {
note, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// Process and send response
for _, prevNote := range s.getPreviousNotes(note.Location) {
if err := stream.Send(prevNote); err != nil {
return err
}
}
}
}Generic wrapper implementing all server streaming interfaces.
type GenericServerStream[Req any, Res any] struct {
ServerStream
}
func (x *GenericServerStream[Req, Res]) Send(m *Res) error
func (x *GenericServerStream[Req, Res]) Recv() (*Req, error)
func (x *GenericServerStream[Req, Res]) SendAndClose(m *Res) errorUsed in generated code to provide type-safe streaming.
// Create a new client stream
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
// Wrapper function
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error)Example:
desc := &grpc.StreamDesc{
StreamName: "MyStream",
ServerStreams: true,
ClientStreams: true,
}
stream, err := conn.NewStream(ctx, desc, "/mypackage.MyService/MyStream")
if err != nil {
log.Fatalf("Failed to create stream: %v", err)
}type StreamClientInterceptor func(
ctx context.Context,
desc *StreamDesc,
cc *ClientConn,
method string,
streamer Streamer,
opts ...CallOption,
) (ClientStream, error)
type Streamer func(
ctx context.Context,
desc *StreamDesc,
cc *ClientConn,
method string,
opts ...CallOption,
) (ClientStream, error)Example:
func streamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc,
cc *grpc.ClientConn, method string, streamer grpc.Streamer,
opts ...grpc.CallOption) (grpc.ClientStream, error) {
log.Printf("Starting stream: %s", method)
s, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return nil, err
}
return &wrappedStream{ClientStream: s, method: method}, nil
}
type wrappedStream struct {
grpc.ClientStream
method string
}
func (w *wrappedStream) SendMsg(m interface{}) error {
log.Printf("Send to %s: %v", w.method, m)
return w.ClientStream.SendMsg(m)
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
log.Printf("Recv from %s", w.method)
return w.ClientStream.RecvMsg(m)
}type StreamServerInterceptor func(
srv any,
ss ServerStream,
info *StreamServerInfo,
handler StreamHandler,
) error
type StreamServerInfo struct {
FullMethod string
IsClientStream bool
IsServerStream bool
}Example:
func streamServerInterceptor(srv interface{}, ss grpc.ServerStream,
info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
log.Printf("Stream started: %s", info.FullMethod)
err := handler(srv, &wrappedServerStream{ServerStream: ss, method: info.FullMethod})
log.Printf("Stream completed: %s, error: %v", info.FullMethod, err)
return err
}
type wrappedServerStream struct {
grpc.ServerStream
method string
}
func (w *wrappedServerStream) SendMsg(m interface{}) error {
log.Printf("Send on %s: %v", w.method, m)
return w.ServerStream.SendMsg(m)
}
func (w *wrappedServerStream) RecvMsg(m interface{}) error {
log.Printf("Recv on %s", w.method)
return w.ServerStream.RecvMsg(m)
}NewStream or generated client methodSend/SendMsg (multiple times for streaming)Recv/RecvMsg (multiple times for streaming)CloseSend (for bidirectional streaming)io.EOF or errorRecv to read client messagesSend to write response messagesTo prevent resource leaks, ensure one of the following:
Close() on the ClientConnRecvMsg until a non-nil error is returnedio.EOF error from Header or SendMsgResources are automatically managed. Return from the handler to complete the stream.
// Deprecated: Use ClientStream and ServerStream instead
type Stream interface {
Context() context.Context
SendMsg(m any) error
RecvMsg(m any) error
}All errors from stream methods are compatible with the status package:
_, err := stream.Recv()
if err != nil {
if err == io.EOF {
// Stream ended successfully
} else {
// Get status
st, ok := status.FromError(err)
if ok {
log.Printf("Stream error: code=%s, msg=%s",
st.Code(), st.Message())
}
}
}CloseSend on client bidirectional streamsstatus.Error for proper error codesSetHeader before first Send or returnstream.Context() for cancellationSendAndClose once in client streaming// Server implementation
func (s *server) BidiStream(stream pb.Service_BidiStreamServer) error {
ctx := stream.Context()
// Set initial metadata
if err := stream.SetHeader(metadata.Pairs("initial", "header")); err != nil {
return status.Errorf(codes.Internal, "failed to set header: %v", err)
}
// Process messages
for {
// Check if context is cancelled
select {
case <-ctx.Done():
return status.Error(codes.Canceled, "stream cancelled")
default:
}
// Receive message
req, err := stream.Recv()
if err == io.EOF {
stream.SetTrailer(metadata.Pairs("final", "trailer"))
return nil
}
if err != nil {
return status.Errorf(codes.Internal, "recv error: %v", err)
}
// Process and send response
resp := &pb.Response{Data: process(req.Data)}
if err := stream.Send(resp); err != nil {
return status.Errorf(codes.Internal, "send error: %v", err)
}
}
}
// Client usage
func runClient(client pb.ServiceClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.BidiStream(ctx)
if err != nil {
return fmt.Errorf("failed to create stream: %w", err)
}
// Read header
header, err := stream.Header()
if err != nil {
return fmt.Errorf("failed to get header: %w", err)
}
log.Printf("Header: %v", header)
// Coordinate send and receive
waitc := make(chan struct{})
errc := make(chan error, 1)
// Receive goroutine
go func() {
defer close(waitc)
for {
resp, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
errc <- fmt.Errorf("recv error: %w", err)
return
}
log.Printf("Received: %v", resp)
}
}()
// Send messages
for i := 0; i < 10; i++ {
if err := stream.Send(&pb.Request{Data: fmt.Sprintf("msg-%d", i)}); err != nil {
return fmt.Errorf("send error: %w", err)
}
}
// Close send side
if err := stream.CloseSend(); err != nil {
return fmt.Errorf("close send error: %w", err)
}
// Wait for receive goroutine
<-waitc
// Check for errors
select {
case err := <-errc:
return err
default:
}
// Read trailer
trailer := stream.Trailer()
log.Printf("Trailer: %v", trailer)
return nil
}