Robust retry mechanisms with exponential backoff and rate limiting for managing concurrent operations. Critical for handling transient failures and respecting service limits.
Advanced retry functionality with exponential backoff, cancellation support, and progress monitoring.
/**
* Executes async operation with exponential backoff retry logic
* @param api - Async function to execute, receives AbortSignal for cancellation
* @param fetchCallName - Name for telemetry and logging
* @param logger - Telemetry logger for monitoring retry attempts
* @param progress - Progress handler with cancellation and retry callbacks
* @returns Promise resolving to the operation result
*/
function runWithRetry<T>(
api: (cancel?: AbortSignal) => Promise<T>,
fetchCallName: string,
logger: ITelemetryLoggerExt,
progress: IProgress
): Promise<T>;
/**
* Calculates next retry delay using exponential backoff
* @param delayMs - Current delay in milliseconds
* @param error - Error that triggered the retry
* @returns Next delay in milliseconds (max 60s for reachable endpoints, 8s for unreachable)
*/
function calculateMaxWaitTime(delayMs: number, error: unknown): number;
/**
* Progress interface for controlling cancellation and receiving retry notifications
*/
interface IProgress {
/** Optional abort signal for cancelling the operation */
cancel?: AbortSignal;
/** Optional callback called before each retry attempt */
onRetry?(delayInMs: number, error: any): void;
}Usage Examples:
import { runWithRetry, IProgress } from "@fluidframework/driver-utils";
// Basic retry with timeout
const result = await runWithRetry(
async (signal) => {
return await fetch("/api/data", { signal });
},
"fetchApiData",
logger,
{ cancel: AbortSignal.timeout(30000) }
);
// Retry with progress monitoring
const resultWithProgress = await runWithRetry(
async (signal) => {
return await expensiveOperation(signal);
},
"expensiveOperation",
logger,
{
cancel: abortController.signal,
onRetry: (delay, error) => {
console.log(`Retrying in ${delay}ms due to: ${error.message}`);
updateProgressUI(`Retrying in ${delay / 1000}s...`);
}
}
);Concurrency limiting to manage the number of simultaneous operations and respect service limits.
/**
* Limits concurrent operations to specified maximum
*/
class RateLimiter {
/**
* @param maxRequests - Maximum number of concurrent requests allowed
*/
constructor(maxRequests: number);
/**
* Schedule work to run with rate limiting
* @param work - Async function to execute
* @returns Promise resolving to work result
*/
schedule<T>(work: () => Promise<T>): Promise<T>;
/**
* Number of requests currently waiting in queue
*/
get waitQueueLength(): number;
}Usage Examples:
import { RateLimiter } from "@fluidframework/driver-utils";
// Create rate limiter for max 3 concurrent requests
const rateLimiter = new RateLimiter(3);
// Schedule multiple operations
const operations = Array.from({ length: 10 }, (_, i) =>
rateLimiter.schedule(async () => {
console.log(`Starting operation ${i}`);
const result = await performOperation(i);
console.log(`Completed operation ${i}`);
return result;
})
);
// Wait for all operations to complete
const results = await Promise.all(operations);
console.log(`All operations completed. Queue length: ${rateLimiter.waitQueueLength}`);import { runWithRetry, calculateMaxWaitTime } from "@fluidframework/driver-utils";
class RetryManager {
static async withCustomBackoff<T>(
operation: (signal?: AbortSignal) => Promise<T>,
options: CustomRetryOptions
): Promise<T> {
let attempt = 0;
let delay = options.initialDelay;
while (attempt < options.maxAttempts) {
try {
return await runWithRetry(
operation,
options.operationName,
options.logger,
{
cancel: options.signal,
onRetry: (delayMs, error) => {
attempt++;
console.log(`Attempt ${attempt}: retrying in ${delayMs}ms`);
if (options.onRetry) {
options.onRetry(attempt, delayMs, error);
}
}
}
);
} catch (error) {
attempt++;
if (attempt >= options.maxAttempts) {
throw error;
}
// Custom backoff calculation
delay = Math.min(
delay * options.backoffMultiplier,
options.maxDelay
);
await this.sleep(delay);
}
}
throw new Error(`Operation failed after ${options.maxAttempts} attempts`);
}
private static sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
interface CustomRetryOptions {
initialDelay: number;
maxDelay: number;
backoffMultiplier: number;
maxAttempts: number;
operationName: string;
logger: ITelemetryLoggerExt;
signal?: AbortSignal;
onRetry?: (attempt: number, delay: number, error: any) => void;
}
// Usage
const result = await RetryManager.withCustomBackoff(
async (signal) => await networkRequest(signal),
{
initialDelay: 1000,
maxDelay: 30000,
backoffMultiplier: 2,
maxAttempts: 5,
operationName: "customNetworkRequest",
logger: myLogger,
signal: controller.signal,
onRetry: (attempt, delay, error) => {
updateRetryStatus(`Attempt ${attempt} failed, retrying...`);
}
}
);import { RateLimiter } from "@fluidframework/driver-utils";
class AdaptiveRateLimiter {
private baseLimiter: RateLimiter;
private currentLimit: number;
private errorCount = 0;
private successCount = 0;
private lastAdjustment = Date.now();
constructor(
private initialLimit: number,
private minLimit: number = 1,
private maxLimit: number = 50
) {
this.currentLimit = initialLimit;
this.baseLimiter = new RateLimiter(initialLimit);
}
async schedule<T>(work: () => Promise<T>): Promise<T> {
try {
const result = await this.baseLimiter.schedule(work);
this.recordSuccess();
return result;
} catch (error) {
this.recordError();
throw error;
}
}
private recordSuccess(): void {
this.successCount++;
this.maybeAdjustLimit();
}
private recordError(): void {
this.errorCount++;
this.maybeAdjustLimit();
}
private maybeAdjustLimit(): void {
const now = Date.now();
const timeSinceLastAdjustment = now - this.lastAdjustment;
// Adjust every 30 seconds
if (timeSinceLastAdjustment < 30000) {
return;
}
const totalRequests = this.successCount + this.errorCount;
if (totalRequests === 0) return;
const errorRate = this.errorCount / totalRequests;
let newLimit = this.currentLimit;
if (errorRate > 0.1) {
// High error rate, decrease limit
newLimit = Math.max(this.minLimit, Math.floor(this.currentLimit * 0.8));
} else if (errorRate < 0.05) {
// Low error rate, increase limit
newLimit = Math.min(this.maxLimit, Math.floor(this.currentLimit * 1.2));
}
if (newLimit !== this.currentLimit) {
console.log(`Adjusting rate limit from ${this.currentLimit} to ${newLimit}`);
this.currentLimit = newLimit;
this.baseLimiter = new RateLimiter(newLimit);
}
// Reset counters
this.errorCount = 0;
this.successCount = 0;
this.lastAdjustment = now;
}
get waitQueueLength(): number {
return this.baseLimiter.waitQueueLength;
}
get currentRateLimit(): number {
return this.currentLimit;
}
}import { runWithRetry } from "@fluidframework/driver-utils";
class CircuitBreakerRetryManager {
private failureCount = 0;
private lastFailureTime = 0;
private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED';
constructor(
private maxFailures: number = 5,
private resetTimeoutMs: number = 60000
) {}
async executeWithRetry<T>(
operation: (signal?: AbortSignal) => Promise<T>,
operationName: string,
logger: ITelemetryLoggerExt,
signal?: AbortSignal
): Promise<T> {
if (this.state === 'OPEN') {
if (Date.now() - this.lastFailureTime < this.resetTimeoutMs) {
throw new Error('Circuit breaker is OPEN');
} else {
this.state = 'HALF_OPEN';
}
}
try {
const result = await runWithRetry(
operation,
operationName,
logger,
{
cancel: signal,
onRetry: (delay, error) => {
logger.sendTelemetryEvent({
eventName: 'CircuitBreakerRetry',
state: this.state,
failureCount: this.failureCount,
delay
});
}
}
);
// Success - reset circuit breaker
if (this.state === 'HALF_OPEN') {
this.state = 'CLOSED';
this.failureCount = 0;
}
return result;
} catch (error) {
this.recordFailure();
throw error;
}
}
private recordFailure(): void {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.failureCount >= this.maxFailures) {
this.state = 'OPEN';
}
}
get circuitState(): string {
return this.state;
}
reset(): void {
this.state = 'CLOSED';
this.failureCount = 0;
this.lastFailureTime = 0;
}
}import { RateLimiter, runWithRetry } from "@fluidframework/driver-utils";
class RateLimitedRetryManager {
constructor(
private rateLimiter: RateLimiter,
private logger: ITelemetryLoggerExt
) {}
async execute<T>(
operation: (signal?: AbortSignal) => Promise<T>,
operationName: string,
signal?: AbortSignal
): Promise<T> {
return this.rateLimiter.schedule(async () => {
return runWithRetry(
operation,
operationName,
this.logger,
{
cancel: signal,
onRetry: (delay, error) => {
this.logger.sendTelemetryEvent({
eventName: 'RateLimitedRetry',
operationName,
queueLength: this.rateLimiter.waitQueueLength,
retryDelay: delay,
error: error.message
});
}
}
);
});
}
async executeBatch<T>(
operations: Array<{
operation: (signal?: AbortSignal) => Promise<T>;
name: string;
}>,
signal?: AbortSignal
): Promise<T[]> {
const promises = operations.map(({ operation, name }) =>
this.execute(operation, name, signal)
);
return Promise.all(promises);
}
}
// Usage
const rateLimiter = new RateLimiter(3);
const retryManager = new RateLimitedRetryManager(rateLimiter, logger);
// Execute single operation
const result = await retryManager.execute(
async (signal) => await fetchData(signal),
"fetchData",
controller.signal
);
// Execute batch of operations
const batchResults = await retryManager.executeBatch([
{ operation: async (signal) => await fetchUserData(signal), name: "fetchUser" },
{ operation: async (signal) => await fetchPreferences(signal), name: "fetchPrefs" },
{ operation: async (signal) => await fetchSettings(signal), name: "fetchSettings" }
], controller.signal);