Server-side streaming support with event-driven API for handling real-time data streams from gRPC services. gRPC-Web supports server streaming where the client sends a single request and receives multiple responses over time.
Event-driven interface for handling server-side streaming responses with support for data, error, status, metadata, and lifecycle events.
/**
* Stream interface for reading server responses with event-based API
*/
interface ClientReadableStream<RESP> {
/**
* Register event listener for data events
* @param eventType - Must be "data"
* @param callback - Function called when response data is received
* @returns This stream for chaining
*/
on(eventType: "data", callback: (response: RESP) => void): ClientReadableStream<RESP>;
/**
* Register event listener for error events
* @param eventType - Must be "error"
* @param callback - Function called when an error occurs
* @returns This stream for chaining
*/
on(eventType: "error", callback: (err: RpcError) => void): ClientReadableStream<RESP>;
/**
* Register event listener for status events
* @param eventType - Must be "status"
* @param callback - Function called when gRPC status is received
* @returns This stream for chaining
*/
on(eventType: "status", callback: (status: Status) => void): ClientReadableStream<RESP>;
/**
* Register event listener for metadata events
* @param eventType - Must be "metadata"
* @param callback - Function called when response metadata is received
* @returns This stream for chaining
*/
on(eventType: "metadata", callback: (metadata: Metadata) => void): ClientReadableStream<RESP>;
/**
* Register event listener for end events
* @param eventType - Must be "end"
* @param callback - Function called when the stream ends normally
* @returns This stream for chaining
*/
on(eventType: "end", callback: () => void): ClientReadableStream<RESP>;
/**
* Remove specific event listener for data events
* @param eventType - Must be "data"
* @param callback - The callback function to remove
*/
removeListener(eventType: "data", callback: (response: RESP) => void): void;
/**
* Remove specific event listener for error events
* @param eventType - Must be "error"
* @param callback - The callback function to remove
*/
removeListener(eventType: "error", callback: (err: RpcError) => void): void;
/**
* Remove specific event listener for status events
* @param eventType - Must be "status"
* @param callback - The callback function to remove
*/
removeListener(eventType: "status", callback: (status: Status) => void): void;
/**
* Remove specific event listener for metadata events
* @param eventType - Must be "metadata"
* @param callback - The callback function to remove
*/
removeListener(eventType: "metadata", callback: (metadata: Metadata) => void): void;
/**
* Remove specific event listener for end events
* @param eventType - Must be "end"
* @param callback - The callback function to remove
*/
removeListener(eventType: "end", callback: () => void): void;
/**
* Cancel the stream and close the connection
*/
cancel(): void;
}Usage Examples:
import { GrpcWebClientBase, MethodDescriptor, MethodType } from "grpc-web";
const client = new GrpcWebClientBase();
// Create method descriptor for streaming
const listOrdersMethod = new MethodDescriptor(
'/ecommerce.OrderService/ListOrdersStream',
MethodType.SERVER_STREAMING,
ListOrdersRequest,
OrderResponse,
(req) => req.serializeBinary(),
(bytes) => OrderResponse.deserializeBinary(bytes)
);
// Start server streaming call
const stream = client.serverStreaming(
'https://api.example.com/ecommerce.OrderService/ListOrdersStream',
new ListOrdersRequest().setUserId('user123'),
{ 'authorization': 'Bearer token' },
listOrdersMethod
);
// Handle streaming data
const orders: OrderResponse[] = [];
stream.on('data', (order) => {
console.log('Received order:', order.toObject());
orders.push(order);
});
stream.on('error', (error) => {
console.error('Stream error:', error.message);
console.error('Error code:', error.code);
});
stream.on('status', (status) => {
console.log('Stream status:', status.code, status.details);
});
stream.on('metadata', (metadata) => {
console.log('Response metadata:', metadata);
});
stream.on('end', () => {
console.log('Stream ended. Total orders received:', orders.length);
});
// Cancel stream after 30 seconds
setTimeout(() => {
stream.cancel();
console.log('Stream cancelled');
}, 30000);Server streaming follows a specific event flow:
client.serverStreaming() creates and returns the streamCollecting Stream Data:
const results: ProductUpdate[] = [];
let streamMetadata: Metadata;
const stream = client.serverStreaming(url, request, metadata, methodDescriptor);
stream.on('metadata', (meta) => {
streamMetadata = meta;
console.log('Stream headers:', meta);
});
stream.on('data', (update) => {
results.push(update);
console.log(`Received update ${results.length}:`, update.toObject());
});
stream.on('end', () => {
console.log(`Stream completed with ${results.length} updates`);
processResults(results, streamMetadata);
});Error Handling with Retry:
function createStreamWithRetry(maxRetries = 3): Promise<ProductUpdate[]> {
return new Promise((resolve, reject) => {
let retryCount = 0;
const results: ProductUpdate[] = [];
function attemptStream() {
const stream = client.serverStreaming(url, request, metadata, methodDescriptor);
stream.on('data', (update) => {
results.push(update);
});
stream.on('end', () => {
resolve(results);
});
stream.on('error', (error) => {
if (retryCount < maxRetries && error.code === StatusCode.UNAVAILABLE) {
retryCount++;
console.log(`Retrying stream (attempt ${retryCount}/${maxRetries})`);
setTimeout(attemptStream, 1000 * retryCount); // Exponential backoff
} else {
reject(error);
}
});
}
attemptStream();
});
}Stream Cancellation:
// Cancel based on condition
const stream = client.serverStreaming(url, request, metadata, methodDescriptor);
let messageCount = 0;
stream.on('data', (message) => {
messageCount++;
// Cancel after receiving 100 messages
if (messageCount >= 100) {
stream.cancel();
console.log('Stream cancelled after 100 messages');
}
});
// Cancel with AbortController pattern
const controller = new AbortController();
const stream = client.serverStreaming(url, request, metadata, methodDescriptor);
controller.signal.addEventListener('abort', () => {
stream.cancel();
});
// Cancel after timeout
setTimeout(() => controller.abort(), 10000);Cleanup and Resource Management:
class StreamManager {
private activeStreams = new Set<ClientReadableStream<any>>();
createStream<T>(
client: GrpcWebClientBase,
url: string,
request: any,
metadata: Metadata,
methodDescriptor: MethodDescriptor<any, T>
): ClientReadableStream<T> {
const stream = client.serverStreaming(url, request, metadata, methodDescriptor);
this.activeStreams.add(stream);
// Auto-cleanup on end or error
const cleanup = () => this.activeStreams.delete(stream);
stream.on('end', cleanup);
stream.on('error', cleanup);
return stream;
}
cancelAllStreams(): void {
for (const stream of this.activeStreams) {
stream.cancel();
}
this.activeStreams.clear();
}
}Proper event listener management prevents memory leaks:
const stream = client.serverStreaming(url, request, metadata, methodDescriptor);
// Named functions for cleanup
const dataHandler = (data) => console.log('Data:', data);
const errorHandler = (error) => console.error('Error:', error);
const endHandler = () => console.log('Stream ended');
// Register listeners
stream.on('data', dataHandler);
stream.on('error', errorHandler);
stream.on('end', endHandler);
// Clean up when done
function cleanup() {
stream.removeListener('data', dataHandler);
stream.removeListener('error', errorHandler);
stream.removeListener('end', endHandler);
}
// Cleanup on completion
stream.on('end', cleanup);
stream.on('error', cleanup);Use Server Streaming When:
Use Unary Calls When: