CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-grpc-web

gRPC-Web Client Runtime Library for browser communication with gRPC services

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

streaming.mddocs/

Streaming Operations

Server-side streaming support with event-driven API for handling real-time data streams from gRPC services. gRPC-Web supports server streaming where the client sends a single request and receives multiple responses over time.

Capabilities

ClientReadableStream

Event-driven interface for handling server-side streaming responses with support for data, error, status, metadata, and lifecycle events.

/**
 * Stream interface for reading server responses with event-based API
 */
interface ClientReadableStream<RESP> {
  /**
   * Register event listener for data events
   * @param eventType - Must be "data"
   * @param callback - Function called when response data is received
   * @returns This stream for chaining
   */
  on(eventType: "data", callback: (response: RESP) => void): ClientReadableStream<RESP>;

  /**
   * Register event listener for error events
   * @param eventType - Must be "error"
   * @param callback - Function called when an error occurs
   * @returns This stream for chaining
   */
  on(eventType: "error", callback: (err: RpcError) => void): ClientReadableStream<RESP>;

  /**
   * Register event listener for status events
   * @param eventType - Must be "status"
   * @param callback - Function called when gRPC status is received
   * @returns This stream for chaining
   */
  on(eventType: "status", callback: (status: Status) => void): ClientReadableStream<RESP>;

  /**
   * Register event listener for metadata events
   * @param eventType - Must be "metadata"
   * @param callback - Function called when response metadata is received
   * @returns This stream for chaining
   */
  on(eventType: "metadata", callback: (metadata: Metadata) => void): ClientReadableStream<RESP>;

  /**
   * Register event listener for end events
   * @param eventType - Must be "end"
   * @param callback - Function called when the stream ends normally
   * @returns This stream for chaining
   */
  on(eventType: "end", callback: () => void): ClientReadableStream<RESP>;

  /**
   * Remove specific event listener for data events
   * @param eventType - Must be "data"
   * @param callback - The callback function to remove
   */
  removeListener(eventType: "data", callback: (response: RESP) => void): void;

  /**
   * Remove specific event listener for error events
   * @param eventType - Must be "error"
   * @param callback - The callback function to remove
   */
  removeListener(eventType: "error", callback: (err: RpcError) => void): void;

  /**
   * Remove specific event listener for status events
   * @param eventType - Must be "status"
   * @param callback - The callback function to remove
   */
  removeListener(eventType: "status", callback: (status: Status) => void): void;

  /**
   * Remove specific event listener for metadata events
   * @param eventType - Must be "metadata"
   * @param callback - The callback function to remove
   */
  removeListener(eventType: "metadata", callback: (metadata: Metadata) => void): void;

  /**
   * Remove specific event listener for end events
   * @param eventType - Must be "end"
   * @param callback - The callback function to remove
   */
  removeListener(eventType: "end", callback: () => void): void;

  /**
   * Cancel the stream and close the connection
   */
  cancel(): void;
}

Usage Examples:

import { GrpcWebClientBase, MethodDescriptor, MethodType } from "grpc-web";

const client = new GrpcWebClientBase();

// Create method descriptor for streaming
const listOrdersMethod = new MethodDescriptor(
  '/ecommerce.OrderService/ListOrdersStream',
  MethodType.SERVER_STREAMING,
  ListOrdersRequest,
  OrderResponse,
  (req) => req.serializeBinary(),
  (bytes) => OrderResponse.deserializeBinary(bytes)
);

// Start server streaming call
const stream = client.serverStreaming(
  'https://api.example.com/ecommerce.OrderService/ListOrdersStream',
  new ListOrdersRequest().setUserId('user123'),
  { 'authorization': 'Bearer token' },
  listOrdersMethod
);

// Handle streaming data
const orders: OrderResponse[] = [];

stream.on('data', (order) => {
  console.log('Received order:', order.toObject());
  orders.push(order);
});

stream.on('error', (error) => {
  console.error('Stream error:', error.message);
  console.error('Error code:', error.code);
});

stream.on('status', (status) => {
  console.log('Stream status:', status.code, status.details);
});

stream.on('metadata', (metadata) => {
  console.log('Response metadata:', metadata);
});

stream.on('end', () => {
  console.log('Stream ended. Total orders received:', orders.length);
});

// Cancel stream after 30 seconds
setTimeout(() => {
  stream.cancel();
  console.log('Stream cancelled');
}, 30000);

Event Flow and Lifecycle

Server streaming follows a specific event flow:

  1. Stream Creation: client.serverStreaming() creates and returns the stream
  2. Metadata Events: Response headers arrive as 'metadata' events (optional)
  3. Data Events: Each streamed response arrives as a 'data' event
  4. Status Events: gRPC status information arrives as 'status' events
  5. End Events: Stream completion is signaled with an 'end' event
  6. Error Events: Any errors during streaming trigger 'error' events

Advanced Streaming Patterns

Collecting Stream Data:

const results: ProductUpdate[] = [];
let streamMetadata: Metadata;

const stream = client.serverStreaming(url, request, metadata, methodDescriptor);

stream.on('metadata', (meta) => {
  streamMetadata = meta;
  console.log('Stream headers:', meta);
});

stream.on('data', (update) => {
  results.push(update);
  console.log(`Received update ${results.length}:`, update.toObject());
});

stream.on('end', () => {
  console.log(`Stream completed with ${results.length} updates`);
  processResults(results, streamMetadata);
});

Error Handling with Retry:

function createStreamWithRetry(maxRetries = 3): Promise<ProductUpdate[]> {
  return new Promise((resolve, reject) => {
    let retryCount = 0;
    const results: ProductUpdate[] = [];

    function attemptStream() {
      const stream = client.serverStreaming(url, request, metadata, methodDescriptor);

      stream.on('data', (update) => {
        results.push(update);
      });

      stream.on('end', () => {
        resolve(results);
      });

      stream.on('error', (error) => {
        if (retryCount < maxRetries && error.code === StatusCode.UNAVAILABLE) {
          retryCount++;
          console.log(`Retrying stream (attempt ${retryCount}/${maxRetries})`);
          setTimeout(attemptStream, 1000 * retryCount); // Exponential backoff
        } else {
          reject(error);
        }
      });
    }

    attemptStream();
  });
}

Stream Cancellation:

// Cancel based on condition
const stream = client.serverStreaming(url, request, metadata, methodDescriptor);
let messageCount = 0;

stream.on('data', (message) => {
  messageCount++;
  
  // Cancel after receiving 100 messages
  if (messageCount >= 100) {
    stream.cancel();
    console.log('Stream cancelled after 100 messages');
  }
});

// Cancel with AbortController pattern
const controller = new AbortController();
const stream = client.serverStreaming(url, request, metadata, methodDescriptor);

controller.signal.addEventListener('abort', () => {
  stream.cancel();
});

// Cancel after timeout
setTimeout(() => controller.abort(), 10000);

Cleanup and Resource Management:

class StreamManager {
  private activeStreams = new Set<ClientReadableStream<any>>();

  createStream<T>(
    client: GrpcWebClientBase,
    url: string,
    request: any,
    metadata: Metadata,
    methodDescriptor: MethodDescriptor<any, T>
  ): ClientReadableStream<T> {
    const stream = client.serverStreaming(url, request, metadata, methodDescriptor);
    
    this.activeStreams.add(stream);
    
    // Auto-cleanup on end or error
    const cleanup = () => this.activeStreams.delete(stream);
    stream.on('end', cleanup);
    stream.on('error', cleanup);
    
    return stream;
  }

  cancelAllStreams(): void {
    for (const stream of this.activeStreams) {
      stream.cancel();
    }
    this.activeStreams.clear();
  }
}

Event Listener Management

Proper event listener management prevents memory leaks:

const stream = client.serverStreaming(url, request, metadata, methodDescriptor);

// Named functions for cleanup
const dataHandler = (data) => console.log('Data:', data);
const errorHandler = (error) => console.error('Error:', error);
const endHandler = () => console.log('Stream ended');

// Register listeners
stream.on('data', dataHandler);
stream.on('error', errorHandler);
stream.on('end', endHandler);

// Clean up when done
function cleanup() {
  stream.removeListener('data', dataHandler);
  stream.removeListener('error', errorHandler);
  stream.removeListener('end', endHandler);
}

// Cleanup on completion
stream.on('end', cleanup);
stream.on('error', cleanup);

Streaming vs Unary Trade-offs

Use Server Streaming When:

  • You need real-time updates or live data feeds
  • Large result sets that benefit from progressive loading
  • Long-running operations with periodic status updates
  • Event streams or notification systems

Use Unary Calls When:

  • Simple request-response interactions
  • Small to medium result sets that can be returned at once
  • Operations that complete quickly
  • Simpler error handling requirements

docs

client-management.md

error-handling.md

index.md

interceptors.md

method-descriptors.md

streaming.md

tile.json