or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

client-management.mderror-handling.mdindex.mdinterceptors.mdmethod-descriptors.mdstreaming.md
tile.json

interceptors.mddocs/

Interceptors and Middleware

Interceptor system for implementing middleware functionality in both unary and streaming RPC calls, enabling cross-cutting concerns like authentication, logging, and request/response transformation.

Capabilities

UnaryInterceptor

Interceptor interface for unary RPC calls that return Promise-based responses.

/**
 * Interceptor for RPC calls with Promise-based responses
 */
interface UnaryInterceptor<REQ, RESP> {
  /**
   * Intercept a unary RPC call
   * @param request - The RPC request object
   * @param invoker - Function to invoke the next interceptor or actual RPC call
   * @returns Promise resolving to the unary response
   */
  intercept(
    request: Request<REQ, RESP>,
    invoker: (request: Request<REQ, RESP>) => Promise<UnaryResponse<REQ, RESP>>
  ): Promise<UnaryResponse<REQ, RESP>>;
}

StreamInterceptor

Interceptor interface for streaming RPC calls that return stream-based responses.

/**
 * Interceptor for RPC calls with stream-based responses
 */
interface StreamInterceptor<REQ, RESP> {
  /**
   * Intercept a streaming RPC call
   * @param request - The RPC request object
   * @param invoker - Function to invoke the next interceptor or actual RPC call
   * @returns ClientReadableStream for the response
   */
  intercept(
    request: Request<REQ, RESP>,
    invoker: (request: Request<REQ, RESP>) => ClientReadableStream<RESP>
  ): ClientReadableStream<RESP>;
}

Usage Examples:

Authentication Interceptor

import { UnaryInterceptor, Request, UnaryResponse } from "grpc-web";

class AuthInterceptor implements UnaryInterceptor<any, any> {
  constructor(private getAuthToken: () => string) {}

  async intercept(
    request: Request<any, any>,
    invoker: (request: Request<any, any>) => Promise<UnaryResponse<any, any>>
  ): Promise<UnaryResponse<any, any>> {
    // Add authentication token to request metadata
    const authToken = this.getAuthToken();
    const requestWithAuth = request.withMetadata('authorization', `Bearer ${authToken}`);
    
    try {
      return await invoker(requestWithAuth);
    } catch (error) {
      if (error.code === StatusCode.UNAUTHENTICATED) {
        // Try to refresh token and retry
        const newToken = await this.refreshToken();
        const retryRequest = request.withMetadata('authorization', `Bearer ${newToken}`);
        return await invoker(retryRequest);
      }
      throw error;
    }
  }

  private async refreshToken(): string {
    // Implementation to refresh authentication token
    const response = await fetch('/api/auth/refresh', { method: 'POST' });
    const data = await response.json();
    return data.token;
  }
}

// Usage with client
const authInterceptor = new AuthInterceptor(() => localStorage.getItem('authToken'));

const client = new GrpcWebClientBase({
  unaryInterceptors: [authInterceptor]
});

Logging Interceptor

class LoggingInterceptor implements UnaryInterceptor<any, any> {
  async intercept(
    request: Request<any, any>,
    invoker: (request: Request<any, any>) => Promise<UnaryResponse<any, any>>
  ): Promise<UnaryResponse<any, any>> {
    const methodName = request.getMethodDescriptor().getName();
    const startTime = Date.now();
    
    console.log(`[gRPC] Starting call: ${methodName}`);
    console.log(`[gRPC] Request metadata:`, request.getMetadata());
    
    try {
      const response = await invoker(request);
      const duration = Date.now() - startTime;
      
      console.log(`[gRPC] Call completed: ${methodName} (${duration}ms)`);
      console.log(`[gRPC] Response metadata:`, response.getMetadata());
      
      return response;
    } catch (error) {
      const duration = Date.now() - startTime;
      console.error(`[gRPC] Call failed: ${methodName} (${duration}ms)`, error);
      throw error;
    }
  }
}

Retry Interceptor

class RetryInterceptor implements UnaryInterceptor<any, any> {
  constructor(
    private maxRetries: number = 3,
    private baseDelay: number = 1000
  ) {}

  async intercept(
    request: Request<any, any>,
    invoker: (request: Request<any, any>) => Promise<UnaryResponse<any, any>>
  ): Promise<UnaryResponse<any, any>> {
    let lastError: RpcError;
    
    for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
      try {
        return await invoker(request);
      } catch (error) {
        lastError = error as RpcError;
        
        // Don't retry for these error types
        if (this.isNonRetryableError(error.code)) {
          throw error;
        }
        
        if (attempt < this.maxRetries) {
          const delay = this.baseDelay * Math.pow(2, attempt);
          console.log(`Retrying RPC call in ${delay}ms (attempt ${attempt + 1}/${this.maxRetries})`);
          await this.delay(delay);
        }
      }
    }
    
    throw lastError;
  }

  private isNonRetryableError(code: StatusCode): boolean {
    return code === StatusCode.INVALID_ARGUMENT ||
           code === StatusCode.NOT_FOUND ||
           code === StatusCode.PERMISSION_DENIED ||
           code === StatusCode.UNAUTHENTICATED;
  }

  private delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

Caching Interceptor

class CacheInterceptor implements UnaryInterceptor<any, any> {
  private cache = new Map<string, { response: UnaryResponse<any, any>, expiry: number }>();
  
  constructor(private ttlMs: number = 60000) {} // 1 minute default TTL

  async intercept(
    request: Request<any, any>,
    invoker: (request: Request<any, any>) => Promise<UnaryResponse<any, any>>
  ): Promise<UnaryResponse<any, any>> {
    const cacheKey = this.getCacheKey(request);
    const now = Date.now();
    
    // Check cache
    const cached = this.cache.get(cacheKey);
    if (cached && cached.expiry > now) {
      console.log(`[Cache] Cache hit for ${request.getMethodDescriptor().getName()}`);
      return cached.response;
    }
    
    // Invoke actual call
    const response = await invoker(request);
    
    // Cache successful responses
    if (response.getStatus()?.code === StatusCode.OK || !response.getStatus()) {
      this.cache.set(cacheKey, {
        response,
        expiry: now + this.ttlMs
      });
      console.log(`[Cache] Cached response for ${request.getMethodDescriptor().getName()}`);
    }
    
    return response;
  }

  private getCacheKey(request: Request<any, any>): string {
    const method = request.getMethodDescriptor().getName();
    const message = request.getRequestMessage();
    // Simple serialization - in production, use a proper hash
    return `${method}:${JSON.stringify(message)}`;
  }

  clearCache(): void {
    this.cache.clear();
  }
}

Stream Logging Interceptor

class StreamLoggingInterceptor implements StreamInterceptor<any, any> {
  intercept(
    request: Request<any, any>,
    invoker: (request: Request<any, any>) => ClientReadableStream<any>
  ): ClientReadableStream<any> {
    const methodName = request.getMethodDescriptor().getName();
    const startTime = Date.now();
    let messageCount = 0;
    
    console.log(`[gRPC Stream] Starting stream: ${methodName}`);
    
    const originalStream = invoker(request);
    
    // Wrap the stream to add logging
    return new (class implements ClientReadableStream<any> {
      on(eventType: string, callback: (...args: any[]) => void): ClientReadableStream<any> {
        if (eventType === 'data') {
          const wrappedCallback = (data: any) => {
            messageCount++;
            console.log(`[gRPC Stream] Message ${messageCount} received from ${methodName}`);
            callback(data);
          };
          originalStream.on(eventType as any, wrappedCallback);
        } else if (eventType === 'end') {
          const wrappedCallback = () => {
            const duration = Date.now() - startTime;
            console.log(`[gRPC Stream] Stream ended: ${methodName} (${duration}ms, ${messageCount} messages)`);
            callback();
          };
          originalStream.on(eventType as any, wrappedCallback);
        } else if (eventType === 'error') {
          const wrappedCallback = (error: any) => {
            const duration = Date.now() - startTime;
            console.error(`[gRPC Stream] Stream error: ${methodName} (${duration}ms, ${messageCount} messages)`, error);
            callback(error);
          };
          originalStream.on(eventType as any, wrappedCallback);
        } else {
          originalStream.on(eventType as any, callback);
        }
        return this;
      }

      removeListener(eventType: string, callback: (...args: any[]) => void): void {
        originalStream.removeListener(eventType as any, callback);
      }

      cancel(): void {
        console.log(`[gRPC Stream] Stream cancelled: ${methodName}`);
        originalStream.cancel();
      }
    })();
  }
}

Multiple Interceptors

// Create client with multiple interceptors
const client = new GrpcWebClientBase({
  unaryInterceptors: [
    new LoggingInterceptor(),
    new AuthInterceptor(() => getAuthToken()),
    new RetryInterceptor(3, 1000),
    new CacheInterceptor(300000) // 5 minute cache
  ],
  streamInterceptors: [
    new StreamLoggingInterceptor()
  ]
});

// Interceptors are executed in order:
// 1. LoggingInterceptor (logs start)
// 2. AuthInterceptor (adds auth headers)
// 3. RetryInterceptor (handles retries)
// 4. CacheInterceptor (checks/updates cache)
// 5. Actual RPC call
// Then responses flow back through interceptors in reverse order

Advanced Interceptor Patterns

Conditional Interceptor:

class ConditionalInterceptor implements UnaryInterceptor<any, any> {
  constructor(
    private condition: (request: Request<any, any>) => boolean,
    private interceptor: UnaryInterceptor<any, any>
  ) {}

  async intercept(
    request: Request<any, any>,
    invoker: (request: Request<any, any>) => Promise<UnaryResponse<any, any>>
  ): Promise<UnaryResponse<any, any>> {
    if (this.condition(request)) {
      return await this.interceptor.intercept(request, invoker);
    } else {
      return await invoker(request);
    }
  }
}

// Usage: Only apply auth to certain methods
const conditionalAuth = new ConditionalInterceptor(
  (request) => request.getMethodDescriptor().getName().includes('Secure'),
  new AuthInterceptor(() => getAuthToken())
);

Circuit Breaker Interceptor:

class CircuitBreakerInterceptor implements UnaryInterceptor<any, any> {
  private failures = 0;
  private lastFailureTime = 0;
  private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED';
  
  constructor(
    private failureThreshold: number = 5,
    private timeoutMs: number = 60000
  ) {}

  async intercept(
    request: Request<any, any>,
    invoker: (request: Request<any, any>) => Promise<UnaryResponse<any, any>>
  ): Promise<UnaryResponse<any, any>> {
    if (this.state === 'OPEN') {
      if (Date.now() - this.lastFailureTime > this.timeoutMs) {
        this.state = 'HALF_OPEN';
      } else {
        throw new RpcError(StatusCode.UNAVAILABLE, 'Circuit breaker is OPEN');
      }
    }
    
    try {
      const response = await invoker(request);
      
      // Success - reset circuit breaker
      if (this.state === 'HALF_OPEN') {
        this.state = 'CLOSED';
        this.failures = 0;
      }
      
      return response;
    } catch (error) {
      this.failures++;
      this.lastFailureTime = Date.now();
      
      if (this.failures >= this.failureThreshold) {
        this.state = 'OPEN';
        console.log('Circuit breaker opened due to failures');
      }
      
      throw error;
    }
  }
}

Interceptor Best Practices

  1. Order Matters: Interceptors execute in the order they're registered
  2. Error Handling: Always handle errors appropriately and decide whether to propagate or transform them
  3. Performance: Be mindful of performance impact, especially for frequently called methods
  4. State Management: Use interceptors for stateless operations when possible
  5. Testing: Test interceptors independently and in combination
  6. Documentation: Document any side effects or behavioral changes introduced by interceptors