Interceptor system for implementing middleware functionality in both unary and streaming RPC calls, enabling cross-cutting concerns like authentication, logging, and request/response transformation.
Interceptor interface for unary RPC calls that return Promise-based responses.
/**
* Interceptor for RPC calls with Promise-based responses
*/
interface UnaryInterceptor<REQ, RESP> {
/**
* Intercept a unary RPC call
* @param request - The RPC request object
* @param invoker - Function to invoke the next interceptor or actual RPC call
* @returns Promise resolving to the unary response
*/
intercept(
request: Request<REQ, RESP>,
invoker: (request: Request<REQ, RESP>) => Promise<UnaryResponse<REQ, RESP>>
): Promise<UnaryResponse<REQ, RESP>>;
}Interceptor interface for streaming RPC calls that return stream-based responses.
/**
* Interceptor for RPC calls with stream-based responses
*/
interface StreamInterceptor<REQ, RESP> {
/**
* Intercept a streaming RPC call
* @param request - The RPC request object
* @param invoker - Function to invoke the next interceptor or actual RPC call
* @returns ClientReadableStream for the response
*/
intercept(
request: Request<REQ, RESP>,
invoker: (request: Request<REQ, RESP>) => ClientReadableStream<RESP>
): ClientReadableStream<RESP>;
}Usage Examples:
import { UnaryInterceptor, Request, UnaryResponse } from "grpc-web";
class AuthInterceptor implements UnaryInterceptor<any, any> {
constructor(private getAuthToken: () => string) {}
async intercept(
request: Request<any, any>,
invoker: (request: Request<any, any>) => Promise<UnaryResponse<any, any>>
): Promise<UnaryResponse<any, any>> {
// Add authentication token to request metadata
const authToken = this.getAuthToken();
const requestWithAuth = request.withMetadata('authorization', `Bearer ${authToken}`);
try {
return await invoker(requestWithAuth);
} catch (error) {
if (error.code === StatusCode.UNAUTHENTICATED) {
// Try to refresh token and retry
const newToken = await this.refreshToken();
const retryRequest = request.withMetadata('authorization', `Bearer ${newToken}`);
return await invoker(retryRequest);
}
throw error;
}
}
private async refreshToken(): string {
// Implementation to refresh authentication token
const response = await fetch('/api/auth/refresh', { method: 'POST' });
const data = await response.json();
return data.token;
}
}
// Usage with client
const authInterceptor = new AuthInterceptor(() => localStorage.getItem('authToken'));
const client = new GrpcWebClientBase({
unaryInterceptors: [authInterceptor]
});class LoggingInterceptor implements UnaryInterceptor<any, any> {
async intercept(
request: Request<any, any>,
invoker: (request: Request<any, any>) => Promise<UnaryResponse<any, any>>
): Promise<UnaryResponse<any, any>> {
const methodName = request.getMethodDescriptor().getName();
const startTime = Date.now();
console.log(`[gRPC] Starting call: ${methodName}`);
console.log(`[gRPC] Request metadata:`, request.getMetadata());
try {
const response = await invoker(request);
const duration = Date.now() - startTime;
console.log(`[gRPC] Call completed: ${methodName} (${duration}ms)`);
console.log(`[gRPC] Response metadata:`, response.getMetadata());
return response;
} catch (error) {
const duration = Date.now() - startTime;
console.error(`[gRPC] Call failed: ${methodName} (${duration}ms)`, error);
throw error;
}
}
}class RetryInterceptor implements UnaryInterceptor<any, any> {
constructor(
private maxRetries: number = 3,
private baseDelay: number = 1000
) {}
async intercept(
request: Request<any, any>,
invoker: (request: Request<any, any>) => Promise<UnaryResponse<any, any>>
): Promise<UnaryResponse<any, any>> {
let lastError: RpcError;
for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
try {
return await invoker(request);
} catch (error) {
lastError = error as RpcError;
// Don't retry for these error types
if (this.isNonRetryableError(error.code)) {
throw error;
}
if (attempt < this.maxRetries) {
const delay = this.baseDelay * Math.pow(2, attempt);
console.log(`Retrying RPC call in ${delay}ms (attempt ${attempt + 1}/${this.maxRetries})`);
await this.delay(delay);
}
}
}
throw lastError;
}
private isNonRetryableError(code: StatusCode): boolean {
return code === StatusCode.INVALID_ARGUMENT ||
code === StatusCode.NOT_FOUND ||
code === StatusCode.PERMISSION_DENIED ||
code === StatusCode.UNAUTHENTICATED;
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}class CacheInterceptor implements UnaryInterceptor<any, any> {
private cache = new Map<string, { response: UnaryResponse<any, any>, expiry: number }>();
constructor(private ttlMs: number = 60000) {} // 1 minute default TTL
async intercept(
request: Request<any, any>,
invoker: (request: Request<any, any>) => Promise<UnaryResponse<any, any>>
): Promise<UnaryResponse<any, any>> {
const cacheKey = this.getCacheKey(request);
const now = Date.now();
// Check cache
const cached = this.cache.get(cacheKey);
if (cached && cached.expiry > now) {
console.log(`[Cache] Cache hit for ${request.getMethodDescriptor().getName()}`);
return cached.response;
}
// Invoke actual call
const response = await invoker(request);
// Cache successful responses
if (response.getStatus()?.code === StatusCode.OK || !response.getStatus()) {
this.cache.set(cacheKey, {
response,
expiry: now + this.ttlMs
});
console.log(`[Cache] Cached response for ${request.getMethodDescriptor().getName()}`);
}
return response;
}
private getCacheKey(request: Request<any, any>): string {
const method = request.getMethodDescriptor().getName();
const message = request.getRequestMessage();
// Simple serialization - in production, use a proper hash
return `${method}:${JSON.stringify(message)}`;
}
clearCache(): void {
this.cache.clear();
}
}class StreamLoggingInterceptor implements StreamInterceptor<any, any> {
intercept(
request: Request<any, any>,
invoker: (request: Request<any, any>) => ClientReadableStream<any>
): ClientReadableStream<any> {
const methodName = request.getMethodDescriptor().getName();
const startTime = Date.now();
let messageCount = 0;
console.log(`[gRPC Stream] Starting stream: ${methodName}`);
const originalStream = invoker(request);
// Wrap the stream to add logging
return new (class implements ClientReadableStream<any> {
on(eventType: string, callback: (...args: any[]) => void): ClientReadableStream<any> {
if (eventType === 'data') {
const wrappedCallback = (data: any) => {
messageCount++;
console.log(`[gRPC Stream] Message ${messageCount} received from ${methodName}`);
callback(data);
};
originalStream.on(eventType as any, wrappedCallback);
} else if (eventType === 'end') {
const wrappedCallback = () => {
const duration = Date.now() - startTime;
console.log(`[gRPC Stream] Stream ended: ${methodName} (${duration}ms, ${messageCount} messages)`);
callback();
};
originalStream.on(eventType as any, wrappedCallback);
} else if (eventType === 'error') {
const wrappedCallback = (error: any) => {
const duration = Date.now() - startTime;
console.error(`[gRPC Stream] Stream error: ${methodName} (${duration}ms, ${messageCount} messages)`, error);
callback(error);
};
originalStream.on(eventType as any, wrappedCallback);
} else {
originalStream.on(eventType as any, callback);
}
return this;
}
removeListener(eventType: string, callback: (...args: any[]) => void): void {
originalStream.removeListener(eventType as any, callback);
}
cancel(): void {
console.log(`[gRPC Stream] Stream cancelled: ${methodName}`);
originalStream.cancel();
}
})();
}
}// Create client with multiple interceptors
const client = new GrpcWebClientBase({
unaryInterceptors: [
new LoggingInterceptor(),
new AuthInterceptor(() => getAuthToken()),
new RetryInterceptor(3, 1000),
new CacheInterceptor(300000) // 5 minute cache
],
streamInterceptors: [
new StreamLoggingInterceptor()
]
});
// Interceptors are executed in order:
// 1. LoggingInterceptor (logs start)
// 2. AuthInterceptor (adds auth headers)
// 3. RetryInterceptor (handles retries)
// 4. CacheInterceptor (checks/updates cache)
// 5. Actual RPC call
// Then responses flow back through interceptors in reverse orderConditional Interceptor:
class ConditionalInterceptor implements UnaryInterceptor<any, any> {
constructor(
private condition: (request: Request<any, any>) => boolean,
private interceptor: UnaryInterceptor<any, any>
) {}
async intercept(
request: Request<any, any>,
invoker: (request: Request<any, any>) => Promise<UnaryResponse<any, any>>
): Promise<UnaryResponse<any, any>> {
if (this.condition(request)) {
return await this.interceptor.intercept(request, invoker);
} else {
return await invoker(request);
}
}
}
// Usage: Only apply auth to certain methods
const conditionalAuth = new ConditionalInterceptor(
(request) => request.getMethodDescriptor().getName().includes('Secure'),
new AuthInterceptor(() => getAuthToken())
);Circuit Breaker Interceptor:
class CircuitBreakerInterceptor implements UnaryInterceptor<any, any> {
private failures = 0;
private lastFailureTime = 0;
private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED';
constructor(
private failureThreshold: number = 5,
private timeoutMs: number = 60000
) {}
async intercept(
request: Request<any, any>,
invoker: (request: Request<any, any>) => Promise<UnaryResponse<any, any>>
): Promise<UnaryResponse<any, any>> {
if (this.state === 'OPEN') {
if (Date.now() - this.lastFailureTime > this.timeoutMs) {
this.state = 'HALF_OPEN';
} else {
throw new RpcError(StatusCode.UNAVAILABLE, 'Circuit breaker is OPEN');
}
}
try {
const response = await invoker(request);
// Success - reset circuit breaker
if (this.state === 'HALF_OPEN') {
this.state = 'CLOSED';
this.failures = 0;
}
return response;
} catch (error) {
this.failures++;
this.lastFailureTime = Date.now();
if (this.failures >= this.failureThreshold) {
this.state = 'OPEN';
console.log('Circuit breaker opened due to failures');
}
throw error;
}
}
}