or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

client-management.mderror-handling.mdindex.mdinterceptors.mdmethod-descriptors.mdstreaming.md
tile.json

streaming.mddocs/

Streaming Operations

Server-side streaming support with event-driven API for handling real-time data streams from gRPC services. gRPC-Web supports server streaming where the client sends a single request and receives multiple responses over time.

Capabilities

ClientReadableStream

Event-driven interface for handling server-side streaming responses with support for data, error, status, metadata, and lifecycle events.

/**
 * Stream interface for reading server responses with event-based API
 */
interface ClientReadableStream<RESP> {
  /**
   * Register event listener for data events
   * @param eventType - Must be "data"
   * @param callback - Function called when response data is received
   * @returns This stream for chaining
   */
  on(eventType: "data", callback: (response: RESP) => void): ClientReadableStream<RESP>;

  /**
   * Register event listener for error events
   * @param eventType - Must be "error"
   * @param callback - Function called when an error occurs
   * @returns This stream for chaining
   */
  on(eventType: "error", callback: (err: RpcError) => void): ClientReadableStream<RESP>;

  /**
   * Register event listener for status events
   * @param eventType - Must be "status"
   * @param callback - Function called when gRPC status is received
   * @returns This stream for chaining
   */
  on(eventType: "status", callback: (status: Status) => void): ClientReadableStream<RESP>;

  /**
   * Register event listener for metadata events
   * @param eventType - Must be "metadata"
   * @param callback - Function called when response metadata is received
   * @returns This stream for chaining
   */
  on(eventType: "metadata", callback: (metadata: Metadata) => void): ClientReadableStream<RESP>;

  /**
   * Register event listener for end events
   * @param eventType - Must be "end"
   * @param callback - Function called when the stream ends normally
   * @returns This stream for chaining
   */
  on(eventType: "end", callback: () => void): ClientReadableStream<RESP>;

  /**
   * Remove specific event listener for data events
   * @param eventType - Must be "data"
   * @param callback - The callback function to remove
   */
  removeListener(eventType: "data", callback: (response: RESP) => void): void;

  /**
   * Remove specific event listener for error events
   * @param eventType - Must be "error"
   * @param callback - The callback function to remove
   */
  removeListener(eventType: "error", callback: (err: RpcError) => void): void;

  /**
   * Remove specific event listener for status events
   * @param eventType - Must be "status"
   * @param callback - The callback function to remove
   */
  removeListener(eventType: "status", callback: (status: Status) => void): void;

  /**
   * Remove specific event listener for metadata events
   * @param eventType - Must be "metadata"
   * @param callback - The callback function to remove
   */
  removeListener(eventType: "metadata", callback: (metadata: Metadata) => void): void;

  /**
   * Remove specific event listener for end events
   * @param eventType - Must be "end"
   * @param callback - The callback function to remove
   */
  removeListener(eventType: "end", callback: () => void): void;

  /**
   * Cancel the stream and close the connection
   */
  cancel(): void;
}

Usage Examples:

import { GrpcWebClientBase, MethodDescriptor, MethodType } from "grpc-web";

const client = new GrpcWebClientBase();

// Create method descriptor for streaming
const listOrdersMethod = new MethodDescriptor(
  '/ecommerce.OrderService/ListOrdersStream',
  MethodType.SERVER_STREAMING,
  ListOrdersRequest,
  OrderResponse,
  (req) => req.serializeBinary(),
  (bytes) => OrderResponse.deserializeBinary(bytes)
);

// Start server streaming call
const stream = client.serverStreaming(
  'https://api.example.com/ecommerce.OrderService/ListOrdersStream',
  new ListOrdersRequest().setUserId('user123'),
  { 'authorization': 'Bearer token' },
  listOrdersMethod
);

// Handle streaming data
const orders: OrderResponse[] = [];

stream.on('data', (order) => {
  console.log('Received order:', order.toObject());
  orders.push(order);
});

stream.on('error', (error) => {
  console.error('Stream error:', error.message);
  console.error('Error code:', error.code);
});

stream.on('status', (status) => {
  console.log('Stream status:', status.code, status.details);
});

stream.on('metadata', (metadata) => {
  console.log('Response metadata:', metadata);
});

stream.on('end', () => {
  console.log('Stream ended. Total orders received:', orders.length);
});

// Cancel stream after 30 seconds
setTimeout(() => {
  stream.cancel();
  console.log('Stream cancelled');
}, 30000);

Event Flow and Lifecycle

Server streaming follows a specific event flow:

  1. Stream Creation: client.serverStreaming() creates and returns the stream
  2. Metadata Events: Response headers arrive as 'metadata' events (optional)
  3. Data Events: Each streamed response arrives as a 'data' event
  4. Status Events: gRPC status information arrives as 'status' events
  5. End Events: Stream completion is signaled with an 'end' event
  6. Error Events: Any errors during streaming trigger 'error' events

Advanced Streaming Patterns

Collecting Stream Data:

const results: ProductUpdate[] = [];
let streamMetadata: Metadata;

const stream = client.serverStreaming(url, request, metadata, methodDescriptor);

stream.on('metadata', (meta) => {
  streamMetadata = meta;
  console.log('Stream headers:', meta);
});

stream.on('data', (update) => {
  results.push(update);
  console.log(`Received update ${results.length}:`, update.toObject());
});

stream.on('end', () => {
  console.log(`Stream completed with ${results.length} updates`);
  processResults(results, streamMetadata);
});

Error Handling with Retry:

function createStreamWithRetry(maxRetries = 3): Promise<ProductUpdate[]> {
  return new Promise((resolve, reject) => {
    let retryCount = 0;
    const results: ProductUpdate[] = [];

    function attemptStream() {
      const stream = client.serverStreaming(url, request, metadata, methodDescriptor);

      stream.on('data', (update) => {
        results.push(update);
      });

      stream.on('end', () => {
        resolve(results);
      });

      stream.on('error', (error) => {
        if (retryCount < maxRetries && error.code === StatusCode.UNAVAILABLE) {
          retryCount++;
          console.log(`Retrying stream (attempt ${retryCount}/${maxRetries})`);
          setTimeout(attemptStream, 1000 * retryCount); // Exponential backoff
        } else {
          reject(error);
        }
      });
    }

    attemptStream();
  });
}

Stream Cancellation:

// Cancel based on condition
const stream = client.serverStreaming(url, request, metadata, methodDescriptor);
let messageCount = 0;

stream.on('data', (message) => {
  messageCount++;
  
  // Cancel after receiving 100 messages
  if (messageCount >= 100) {
    stream.cancel();
    console.log('Stream cancelled after 100 messages');
  }
});

// Cancel with AbortController pattern
const controller = new AbortController();
const stream = client.serverStreaming(url, request, metadata, methodDescriptor);

controller.signal.addEventListener('abort', () => {
  stream.cancel();
});

// Cancel after timeout
setTimeout(() => controller.abort(), 10000);

Cleanup and Resource Management:

class StreamManager {
  private activeStreams = new Set<ClientReadableStream<any>>();

  createStream<T>(
    client: GrpcWebClientBase,
    url: string,
    request: any,
    metadata: Metadata,
    methodDescriptor: MethodDescriptor<any, T>
  ): ClientReadableStream<T> {
    const stream = client.serverStreaming(url, request, metadata, methodDescriptor);
    
    this.activeStreams.add(stream);
    
    // Auto-cleanup on end or error
    const cleanup = () => this.activeStreams.delete(stream);
    stream.on('end', cleanup);
    stream.on('error', cleanup);
    
    return stream;
  }

  cancelAllStreams(): void {
    for (const stream of this.activeStreams) {
      stream.cancel();
    }
    this.activeStreams.clear();
  }
}

Event Listener Management

Proper event listener management prevents memory leaks:

const stream = client.serverStreaming(url, request, metadata, methodDescriptor);

// Named functions for cleanup
const dataHandler = (data) => console.log('Data:', data);
const errorHandler = (error) => console.error('Error:', error);
const endHandler = () => console.log('Stream ended');

// Register listeners
stream.on('data', dataHandler);
stream.on('error', errorHandler);
stream.on('end', endHandler);

// Clean up when done
function cleanup() {
  stream.removeListener('data', dataHandler);
  stream.removeListener('error', errorHandler);
  stream.removeListener('end', endHandler);
}

// Cleanup on completion
stream.on('end', cleanup);
stream.on('error', cleanup);

Streaming vs Unary Trade-offs

Use Server Streaming When:

  • You need real-time updates or live data feeds
  • Large result sets that benefit from progressive loading
  • Long-running operations with periodic status updates
  • Event streams or notification systems

Use Unary Calls When:

  • Simple request-response interactions
  • Small to medium result sets that can be returned at once
  • Operations that complete quickly
  • Simpler error handling requirements