or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

admin.mdadvanced.mdclient-server.mdcredentials-security.mderrors-status.mdhealth.mdindex.mdinterceptors.mdload-balancing.mdmetadata-context.mdname-resolution.mdobservability.mdreflection.mdstreaming.mdtesting.mdxds.md
tile.json

interceptors.mddocs/

Interceptors

This document covers interceptors in gRPC-Go, including unary and streaming interceptors for both client and server, chaining, and common patterns.

Overview

Interceptors provide hooks to intercept RPC execution on both client and server sides. They enable cross-cutting concerns like logging, authentication, metrics, tracing, and error handling without modifying business logic.

Client-Side Interceptors

Unary Client Interceptor

Unary interceptors intercept unary RPC execution on the client.

type UnaryClientInterceptor func(
    ctx context.Context,
    method string,
    req, reply any,
    cc *ClientConn,
    invoker UnaryInvoker,
    opts ...CallOption,
) error

// UnaryInvoker completes the RPC
type UnaryInvoker func(
    ctx context.Context,
    method string,
    req, reply any,
    cc *ClientConn,
    opts ...CallOption,
) error

Parameters:

  • ctx: The RPC context
  • method: The RPC name (format: "/package.service/Method")
  • req, reply: Request and response messages
  • cc: The ClientConn on which the RPC was invoked
  • invoker: Handler to complete the RPC (must be called)
  • opts: All applicable call options (defaults + per-call)

Returns: Error compatible with the status package

Example:

import (
    "context"
    "log"
    "time"
    "google.golang.org/grpc"
    "google.golang.org/grpc/status"
)

func loggingInterceptor(
    ctx context.Context,
    method string,
    req, reply any,
    cc *grpc.ClientConn,
    invoker grpc.UnaryInvoker,
    opts ...grpc.CallOption,
) error {
    start := time.Now()

    // Log request
    log.Printf("Call %s: request=%v", method, req)

    // Call the invoker to execute RPC
    err := invoker(ctx, method, req, reply, cc, opts...)

    // Log response
    duration := time.Since(start)
    if err != nil {
        st := status.Convert(err)
        log.Printf("Call %s: code=%s duration=%v error=%v",
            method, st.Code(), duration, err)
    } else {
        log.Printf("Call %s: duration=%v reply=%v",
            method, duration, reply)
    }

    return err
}

// Use interceptor
conn, err := grpc.NewClient("localhost:50051",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithUnaryInterceptor(loggingInterceptor))

Stream Client Interceptor

Stream interceptors intercept stream creation on the client.

type StreamClientInterceptor func(
    ctx context.Context,
    desc *StreamDesc,
    cc *ClientConn,
    method string,
    streamer Streamer,
    opts ...CallOption,
) (ClientStream, error)

// Streamer creates a ClientStream
type Streamer func(
    ctx context.Context,
    desc *StreamDesc,
    cc *ClientConn,
    method string,
    opts ...CallOption,
) (ClientStream, error)

// StreamDesc describes a streaming RPC
type StreamDesc struct {
    StreamName    string        // Method name excluding service
    Handler       StreamHandler // Handler for the method
    ServerStreams bool          // Server can perform streaming sends
    ClientStreams bool          // Client can perform streaming sends
}

Example:

import (
    "context"
    "log"
    "google.golang.org/grpc"
)

func streamLoggingInterceptor(
    ctx context.Context,
    desc *grpc.StreamDesc,
    cc *grpc.ClientConn,
    method string,
    streamer grpc.Streamer,
    opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
    log.Printf("Opening stream: %s (client:%v server:%v)",
        method, desc.ClientStreams, desc.ServerStreams)

    // Call streamer to create stream
    stream, err := streamer(ctx, desc, cc, method, opts...)
    if err != nil {
        log.Printf("Failed to open stream %s: %v", method, err)
        return nil, err
    }

    // Wrap stream to intercept Send/Recv
    return &loggingClientStream{ClientStream: stream, method: method}, nil
}

// Wrapper to intercept stream operations
type loggingClientStream struct {
    grpc.ClientStream
    method string
}

func (s *loggingClientStream) SendMsg(m any) error {
    log.Printf("Stream %s: sending message: %v", s.method, m)
    err := s.ClientStream.SendMsg(m)
    if err != nil {
        log.Printf("Stream %s: send error: %v", s.method, err)
    }
    return err
}

func (s *loggingClientStream) RecvMsg(m any) error {
    err := s.ClientStream.RecvMsg(m)
    if err != nil {
        log.Printf("Stream %s: recv error: %v", s.method, err)
    } else {
        log.Printf("Stream %s: received message: %v", s.method, m)
    }
    return err
}

// Use interceptor
conn, err := grpc.NewClient("localhost:50051",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithStreamInterceptor(streamLoggingInterceptor))

Client Interceptor Chaining

Chain multiple interceptors to compose functionality:

// WithChainUnaryInterceptor returns DialOption that chains multiple unary interceptors
// Execution order: first to last (outermost to innermost)
func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption

// WithChainStreamInterceptor returns DialOption that chains multiple stream interceptors
// Execution order: first to last (outermost to innermost)
func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption

Example:

import "google.golang.org/grpc"

conn, err := grpc.NewClient("localhost:50051",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(
        authInterceptor,
        loggingInterceptor,
        metricsInterceptor,
    ),
    grpc.WithChainStreamInterceptor(
        streamAuthInterceptor,
        streamLoggingInterceptor,
        streamMetricsInterceptor,
    ))

// Execution order for unary:
// authInterceptor -> loggingInterceptor -> metricsInterceptor -> RPC

Single Interceptor Options

// WithUnaryInterceptor sets a single unary interceptor
// Only one unary interceptor can be set (use WithChainUnaryInterceptor for multiple)
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption

// WithStreamInterceptor sets a single stream interceptor
// Only one stream interceptor can be set (use WithChainStreamInterceptor for multiple)
func WithStreamInterceptor(f StreamClientInterceptor) DialOption

Server-Side Interceptors

Unary Server Interceptor

Unary interceptors intercept unary RPC execution on the server.

type UnaryServerInterceptor func(
    ctx context.Context,
    req any,
    info *UnaryServerInfo,
    handler UnaryHandler,
) (resp any, err error)

// UnaryHandler completes normal execution of unary RPC
type UnaryHandler func(ctx context.Context, req any) (any, error)

// UnaryServerInfo contains RPC information
type UnaryServerInfo struct {
    Server     any    // Service implementation (read-only)
    FullMethod string // Full RPC method: /package.service/method
}

Example:

import (
    "context"
    "log"
    "time"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

func serverLoggingInterceptor(
    ctx context.Context,
    req any,
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (any, error) {
    start := time.Now()

    // Log request
    log.Printf("Method %s: request=%v", info.FullMethod, req)

    // Call handler to execute RPC
    resp, err := handler(ctx, req)

    // Log response
    duration := time.Since(start)
    if err != nil {
        st := status.Convert(err)
        log.Printf("Method %s: code=%s duration=%v error=%v",
            info.FullMethod, st.Code(), duration, err)
    } else {
        log.Printf("Method %s: duration=%v", info.FullMethod, duration)
    }

    return resp, err
}

// Use interceptor
server := grpc.NewServer(
    grpc.UnaryInterceptor(serverLoggingInterceptor))

Stream Server Interceptor

Stream interceptors intercept streaming RPC execution on the server.

type StreamServerInterceptor func(
    srv any,
    ss ServerStream,
    info *StreamServerInfo,
    handler StreamHandler,
) error

// StreamHandler completes execution of streaming RPC
type StreamHandler func(srv any, stream ServerStream) error

// StreamServerInfo contains streaming RPC information
type StreamServerInfo struct {
    FullMethod     string // Full RPC method: /package.service/method
    IsClientStream bool   // RPC is client streaming
    IsServerStream bool   // RPC is server streaming
}

Example:

import (
    "context"
    "log"
    "google.golang.org/grpc"
)

func streamServerLoggingInterceptor(
    srv any,
    ss grpc.ServerStream,
    info *grpc.StreamServerInfo,
    handler grpc.StreamHandler,
) error {
    log.Printf("Stream method %s: client:%v server:%v",
        info.FullMethod, info.IsClientStream, info.IsServerStream)

    // Wrap stream to intercept Send/Recv
    wrapped := &loggingServerStream{ServerStream: ss, method: info.FullMethod}

    // Call handler with wrapped stream
    err := handler(srv, wrapped)

    if err != nil {
        log.Printf("Stream method %s: completed with error: %v",
            info.FullMethod, err)
    } else {
        log.Printf("Stream method %s: completed successfully", info.FullMethod)
    }

    return err
}

// Wrapper to intercept stream operations
type loggingServerStream struct {
    grpc.ServerStream
    method string
}

func (s *loggingServerStream) SendMsg(m any) error {
    log.Printf("Stream %s: sending message", s.method)
    return s.ServerStream.SendMsg(m)
}

func (s *loggingServerStream) RecvMsg(m any) error {
    err := s.ServerStream.RecvMsg(m)
    if err != nil {
        log.Printf("Stream %s: recv error: %v", s.method, err)
    } else {
        log.Printf("Stream %s: received message", s.method)
    }
    return err
}

// Use interceptor
server := grpc.NewServer(
    grpc.StreamInterceptor(streamServerLoggingInterceptor))

Server Interceptor Chaining

Chain multiple server interceptors:

// ChainUnaryInterceptor chains multiple unary interceptors
// Execution order: first to last (outermost to innermost)
func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption

// ChainStreamInterceptor chains multiple stream interceptors
// Execution order: first to last (outermost to innermost)
func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption

Example:

import "google.golang.org/grpc"

server := grpc.NewServer(
    grpc.ChainUnaryInterceptor(
        authInterceptor,
        validationInterceptor,
        loggingInterceptor,
        metricsInterceptor,
    ),
    grpc.ChainStreamInterceptor(
        streamAuthInterceptor,
        streamLoggingInterceptor,
        streamMetricsInterceptor,
    ))

// Execution order for unary:
// authInterceptor -> validationInterceptor -> loggingInterceptor -> metricsInterceptor -> handler

Single Interceptor Options

// UnaryInterceptor sets a single unary server interceptor
// Only one unary interceptor can be set (use ChainUnaryInterceptor for multiple)
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption

// StreamInterceptor sets a single stream server interceptor
// Only one stream interceptor can be set (use ChainStreamInterceptor for multiple)
func StreamInterceptor(i StreamServerInterceptor) ServerOption

Common Interceptor Patterns

Authentication Interceptor

import (
    "context"
    "strings"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/metadata"
    "google.golang.org/grpc/status"
)

// Client-side: Add auth token
func clientAuthInterceptor(token string) grpc.UnaryClientInterceptor {
    return func(
        ctx context.Context,
        method string,
        req, reply any,
        cc *grpc.ClientConn,
        invoker grpc.UnaryInvoker,
        opts ...grpc.CallOption,
    ) error {
        // Add token to metadata
        ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+token)
        return invoker(ctx, method, req, reply, cc, opts...)
    }
}

// Server-side: Validate auth token
func serverAuthInterceptor(
    ctx context.Context,
    req any,
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (any, error) {
    // Extract metadata
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return nil, status.Error(codes.Unauthenticated, "missing metadata")
    }

    // Get authorization header
    auth := md.Get("authorization")
    if len(auth) == 0 {
        return nil, status.Error(codes.Unauthenticated, "missing authorization")
    }

    // Validate token
    token := strings.TrimPrefix(auth[0], "Bearer ")
    if !validateToken(token) {
        return nil, status.Error(codes.Unauthenticated, "invalid token")
    }

    // Token valid, proceed with handler
    return handler(ctx, req)
}

func validateToken(token string) bool {
    // Implement token validation
    return token == "valid-token"
}

Metrics Interceptor

import (
    "context"
    "time"
    "google.golang.org/grpc"
    "google.golang.org/grpc/status"
)

type Metrics struct {
    requestCount  map[string]int
    requestDurations map[string]time.Duration
}

func (m *Metrics) unaryMetricsInterceptor(
    ctx context.Context,
    req any,
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (any, error) {
    start := time.Now()

    // Call handler
    resp, err := handler(ctx, req)

    // Record metrics
    duration := time.Since(start)
    m.requestCount[info.FullMethod]++
    m.requestDurations[info.FullMethod] += duration

    // Record by status code
    code := status.Code(err)
    metricKey := fmt.Sprintf("%s:%s", info.FullMethod, code.String())
    m.requestCount[metricKey]++

    return resp, err
}

Error Handling Interceptor

import (
    "context"
    "log"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

func errorHandlingInterceptor(
    ctx context.Context,
    req any,
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (any, error) {
    resp, err := handler(ctx, req)

    if err != nil {
        // Convert panics and unknown errors to Internal
        if st, ok := status.FromError(err); !ok {
            log.Printf("Non-status error in %s: %v", info.FullMethod, err)
            return nil, status.Error(codes.Internal, "internal server error")
        } else if st.Code() == codes.Unknown {
            log.Printf("Unknown error in %s: %v", info.FullMethod, err)
            return nil, status.Error(codes.Internal, "internal server error")
        }
    }

    return resp, err
}

Panic Recovery Interceptor

import (
    "context"
    "log"
    "runtime/debug"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

func panicRecoveryInterceptor(
    ctx context.Context,
    req any,
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (resp any, err error) {
    defer func() {
        if r := recover(); r != nil {
            log.Printf("Panic recovered in %s: %v\n%s",
                info.FullMethod, r, debug.Stack())
            err = status.Error(codes.Internal, "internal server error")
        }
    }()

    return handler(ctx, req)
}

Request Validation Interceptor

import (
    "context"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

type Validator interface {
    Validate() error
}

func validationInterceptor(
    ctx context.Context,
    req any,
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (any, error) {
    // Check if request implements Validator
    if v, ok := req.(Validator); ok {
        if err := v.Validate(); err != nil {
            return nil, status.Error(codes.InvalidArgument, err.Error())
        }
    }

    return handler(ctx, req)
}

// In proto-generated code, implement Validator
func (r *MyRequest) Validate() error {
    if r.Name == "" {
        return errors.New("name is required")
    }
    if r.Age < 0 {
        return errors.New("age must be positive")
    }
    return nil
}

Tracing Interceptor

import (
    "context"
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
)

func tracingInterceptor(
    ctx context.Context,
    req any,
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (any, error) {
    // Extract trace ID from metadata
    md, ok := metadata.FromIncomingContext(ctx)
    var traceID string
    if ok {
        if ids := md.Get("trace-id"); len(ids) > 0 {
            traceID = ids[0]
        }
    }

    // Generate trace ID if not present
    if traceID == "" {
        traceID = generateTraceID()
    }

    // Add to context
    ctx = context.WithValue(ctx, "trace-id", traceID)

    // Log with trace ID
    log.Printf("[%s] Method %s: starting", traceID, info.FullMethod)

    // Call handler
    resp, err := handler(ctx, req)

    // Log completion
    if err != nil {
        log.Printf("[%s] Method %s: failed: %v", traceID, info.FullMethod, err)
    } else {
        log.Printf("[%s] Method %s: completed", traceID, info.FullMethod)
    }

    return resp, err
}

func generateTraceID() string {
    // Generate unique trace ID
    return uuid.New().String()
}

Rate Limiting Interceptor

import (
    "context"
    "sync"
    "time"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

type RateLimiter struct {
    requests map[string][]time.Time
    limit    int
    window   time.Duration
    mu       sync.Mutex
}

func NewRateLimiter(limit int, window time.Duration) *RateLimiter {
    return &RateLimiter{
        requests: make(map[string][]time.Time),
        limit:    limit,
        window:   window,
    }
}

func (rl *RateLimiter) Interceptor(
    ctx context.Context,
    req any,
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (any, error) {
    // Extract client identifier (could be from metadata, peer info, etc.)
    clientID := getClientID(ctx)

    rl.mu.Lock()
    defer rl.mu.Unlock()

    now := time.Now()
    requests := rl.requests[clientID]

    // Remove old requests outside window
    var validRequests []time.Time
    for _, t := range requests {
        if now.Sub(t) < rl.window {
            validRequests = append(validRequests, t)
        }
    }

    // Check limit
    if len(validRequests) >= rl.limit {
        return nil, status.Error(codes.ResourceExhausted, "rate limit exceeded")
    }

    // Add current request
    validRequests = append(validRequests, now)
    rl.requests[clientID] = validRequests

    return handler(ctx, req)
}

func getClientID(ctx context.Context) string {
    // Extract client ID from context (e.g., from auth token, peer info, etc.)
    return "client-id"
}

Stream Interceptor Patterns

Stream Authentication

func streamAuthInterceptor(
    srv any,
    ss grpc.ServerStream,
    info *grpc.StreamServerInfo,
    handler grpc.StreamHandler,
) error {
    // Extract and validate auth token
    md, ok := metadata.FromIncomingContext(ss.Context())
    if !ok {
        return status.Error(codes.Unauthenticated, "missing metadata")
    }

    auth := md.Get("authorization")
    if len(auth) == 0 {
        return status.Error(codes.Unauthenticated, "missing authorization")
    }

    token := strings.TrimPrefix(auth[0], "Bearer ")
    if !validateToken(token) {
        return status.Error(codes.Unauthenticated, "invalid token")
    }

    return handler(srv, ss)
}

Stream Metrics

func streamMetricsInterceptor(
    srv any,
    ss grpc.ServerStream,
    info *grpc.StreamServerInfo,
    handler grpc.StreamHandler,
) error {
    start := time.Now()
    msgSent := 0
    msgRecv := 0

    // Wrap stream to count messages
    wrapped := &metricsServerStream{
        ServerStream: ss,
        onSend: func() { msgSent++ },
        onRecv: func() { msgRecv++ },
    }

    // Call handler
    err := handler(srv, wrapped)

    // Record metrics
    duration := time.Since(start)
    log.Printf("Stream %s: duration=%v sent=%d recv=%d",
        info.FullMethod, duration, msgSent, msgRecv)

    return err
}

type metricsServerStream struct {
    grpc.ServerStream
    onSend func()
    onRecv func()
}

func (s *metricsServerStream) SendMsg(m any) error {
    err := s.ServerStream.SendMsg(m)
    if err == nil {
        s.onSend()
    }
    return err
}

func (s *metricsServerStream) RecvMsg(m any) error {
    err := s.ServerStream.RecvMsg(m)
    if err == nil {
        s.onRecv()
    }
    return err
}

Best Practices

Interceptor Design

  1. Call the next handler: Always call invoker (client) or handler (server)
  2. Error handling: Return errors compatible with status package
  3. Context propagation: Pass context through to next handler
  4. Minimal overhead: Keep interceptors lightweight
  5. Composability: Design interceptors to work together

Chaining Order

Choose order carefully as it affects execution:

// Common order for client interceptors:
// 1. Retry/circuit breaker (outermost)
// 2. Authentication
// 3. Tracing
// 4. Logging
// 5. Metrics (innermost)

// Common order for server interceptors:
// 1. Panic recovery (outermost)
// 2. Rate limiting
// 3. Authentication
// 4. Validation
// 5. Tracing
// 6. Logging
// 7. Metrics (innermost)

Stream Wrapping

When wrapping streams, embed the original stream interface:

type wrappedStream struct {
    grpc.ServerStream  // Embed to inherit other methods
    // Add custom fields
}

// Override only methods you need to intercept
func (s *wrappedStream) SendMsg(m any) error {
    // Custom logic
    return s.ServerStream.SendMsg(m)
}

Performance

  1. Avoid blocking: Don't perform blocking operations in interceptors
  2. Minimize allocations: Reuse objects where possible
  3. Async logging: Use async logging to avoid blocking RPC
  4. Sampling: Sample metrics/traces for high-throughput services

Testing

import (
    "testing"
    "context"
    "google.golang.org/grpc"
)

func TestInterceptor(t *testing.T) {
    // Mock invoker
    called := false
    mockInvoker := func(
        ctx context.Context,
        method string,
        req, reply any,
        cc *grpc.ClientConn,
        opts ...grpc.CallOption,
    ) error {
        called = true
        return nil
    }

    // Test interceptor
    err := myInterceptor(
        context.Background(),
        "/test.Service/Method",
        &TestRequest{},
        &TestResponse{},
        nil,
        mockInvoker,
    )

    if err != nil {
        t.Errorf("unexpected error: %v", err)
    }

    if !called {
        t.Error("invoker was not called")
    }
}

Context Values

Store request-scoped data in context:

type contextKey string

const requestIDKey contextKey = "request-id"

func requestIDInterceptor(
    ctx context.Context,
    req any,
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (any, error) {
    // Add request ID to context
    requestID := generateRequestID()
    ctx = context.WithValue(ctx, requestIDKey, requestID)

    return handler(ctx, req)
}

// Access in handler
func (s *server) MyMethod(ctx context.Context, req *pb.Request) (*pb.Response, error) {
    if id, ok := ctx.Value(requestIDKey).(string); ok {
        log.Printf("Request ID: %s", id)
    }
    // ...
}