Asynchronous programming utilities including delays, timeouts, and synchronization primitives. Designed for modern async/await patterns and concurrent programming.
Functions for controlling timing and execution of asynchronous operations.
/**
* Delays execution for specified milliseconds
* @param ms - Delay in milliseconds
* @param options - Configuration options
* @returns Promise that resolves after delay
*/
function delay(ms: number, options?: DelayOptions): Promise<void>;
/**
* Creates promise that rejects with TimeoutError after specified time
* @param ms - Timeout in milliseconds
* @returns Promise that rejects with TimeoutError
*/
function timeout(ms: number): Promise<never>;
/**
* Adds timeout to existing promise
* @param promise - Promise to add timeout to
* @param ms - Timeout in milliseconds
* @returns Promise that resolves/rejects with original or times out
*/
function withTimeout<T>(promise: Promise<T>, ms: number): Promise<T>;
interface DelayOptions {
signal?: AbortSignal;
}Usage Examples:
import { delay, timeout, withTimeout } from 'es-toolkit/promise';
// Basic delay
async function example1() {
console.log('Starting...');
await delay(1000); // Wait 1 second
console.log('Done!');
}
// Cancellable delay with AbortController
async function example2() {
const controller = new AbortController();
// Cancel after 2 seconds
setTimeout(() => controller.abort(), 2000);
try {
await delay(5000, { signal: controller.signal });
console.log('Completed'); // Won't reach here
} catch (error) {
console.log('Delay was cancelled'); // Will execute
}
}
// Adding timeout to existing promise
async function fetchWithTimeout(url: string): Promise<Response> {
const fetchPromise = fetch(url);
return withTimeout(fetchPromise, 10000); // 10 second timeout
}
// Race condition with timeout
async function raceWithTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
return Promise.race([
promise,
timeout(ms).catch(() => {
throw new Error(`Operation timed out after ${ms}ms`);
})
]);
}
// Retry with delay
async function retryWithDelay<T>(
operation: () => Promise<T>,
maxRetries: number,
delayMs: number
): Promise<T> {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
if (attempt === maxRetries) throw error;
console.log(`Attempt ${attempt} failed, retrying in ${delayMs}ms...`);
await delay(delayMs);
}
}
throw new Error('Should not reach here');
}
// Usage examples
async function examples() {
// Simple delay
await delay(500);
// Fetch with timeout
try {
const response = await fetchWithTimeout('https://api.example.com/data');
const data = await response.json();
console.log(data);
} catch (error) {
console.log('Request timed out or failed');
}
// Retry with delay
const result = await retryWithDelay(
() => fetch('/api/unreliable-endpoint').then(r => r.json()),
3,
1000
);
}Classes for coordinating access to shared resources in concurrent asynchronous operations.
/**
* Mutual exclusion lock for async functions
*/
class Mutex {
/**
* Acquires the mutex lock
* @returns Promise that resolves when lock is acquired
*/
acquire(): Promise<void>;
/**
* Releases the mutex lock
*/
release(): void;
/**
* Checks if mutex is currently locked
*/
get isLocked(): boolean;
}
/**
* Counting semaphore for resource management
*/
class Semaphore {
/**
* Creates semaphore with specified number of permits
* @param permits - Number of available permits
*/
constructor(permits: number);
/**
* Acquires a permit from the semaphore
* @returns Promise that resolves when permit is acquired
*/
acquire(): Promise<void>;
/**
* Releases a permit back to the semaphore
*/
release(): void;
/**
* Gets number of available permits
*/
get available(): number;
}Usage Examples:
import { Mutex, Semaphore } from 'es-toolkit/promise';
// Mutex for exclusive access
class Counter {
private value = 0;
private mutex = new Mutex();
async increment(): Promise<number> {
await this.mutex.acquire();
try {
// Critical section - only one increment at a time
const currentValue = this.value;
await delay(10); // Simulate async work
this.value = currentValue + 1;
return this.value;
} finally {
this.mutex.release();
}
}
async decrement(): Promise<number> {
await this.mutex.acquire();
try {
const currentValue = this.value;
await delay(10);
this.value = currentValue - 1;
return this.value;
} finally {
this.mutex.release();
}
}
getValue(): number {
return this.value;
}
}
// Usage
const counter = new Counter();
async function testCounter() {
// These operations will be serialized by the mutex
const promises = [
counter.increment(),
counter.increment(),
counter.decrement(),
counter.increment()
];
const results = await Promise.all(promises);
console.log('Final value:', counter.getValue()); // Will be 2
console.log('Operation results:', results);
}
// Semaphore for rate limiting
class ApiClient {
private semaphore = new Semaphore(3); // Max 3 concurrent requests
async makeRequest(url: string): Promise<any> {
await this.semaphore.acquire();
try {
console.log(`Making request to ${url}. Available permits: ${this.semaphore.available}`);
const response = await fetch(url);
await delay(1000); // Simulate processing time
return response.json();
} finally {
this.semaphore.release();
}
}
}
// Usage - only 3 requests will run concurrently
const client = new ApiClient();
const urls = [
'https://api.example.com/user/1',
'https://api.example.com/user/2',
'https://api.example.com/user/3',
'https://api.example.com/user/4',
'https://api.example.com/user/5',
'https://api.example.com/user/6'
];
async function fetchAllUsers() {
const promises = urls.map(url => client.makeRequest(url));
const results = await Promise.all(promises);
return results;
}
// Database connection pool using semaphore
class DatabasePool {
private semaphore: Semaphore;
constructor(maxConnections: number) {
this.semaphore = new Semaphore(maxConnections);
}
async executeQuery<T>(query: string): Promise<T> {
await this.semaphore.acquire();
try {
console.log(`Executing query. Available connections: ${this.semaphore.available}`);
// Simulate database operation
await delay(Math.random() * 500 + 100);
return { result: 'query result' } as T;
} finally {
this.semaphore.release();
}
}
get availableConnections(): number {
return this.semaphore.available;
}
}
// File processing with resource limits
class FileProcessor {
private processingMutex = new Mutex();
private resourceSemaphore = new Semaphore(2); // Max 2 concurrent file operations
async processFile(filename: string): Promise<void> {
await this.resourceSemaphore.acquire();
try {
console.log(`Starting to process ${filename}`);
// Simulate file reading
await delay(500);
// Critical section for shared resource access
await this.processingMutex.acquire();
try {
console.log(`Processing ${filename} in critical section`);
await delay(200);
} finally {
this.processingMutex.release();
}
// Simulate file writing
await delay(300);
console.log(`Finished processing ${filename}`);
} finally {
this.resourceSemaphore.release();
}
}
}
// Usage
const processor = new FileProcessor();
const files = ['file1.txt', 'file2.txt', 'file3.txt', 'file4.txt'];
async function processAllFiles() {
const promises = files.map(file => processor.processFile(file));
await Promise.all(promises);
console.log('All files processed');
}import { Semaphore, delay } from 'es-toolkit/promise';
class TaskQueue<T> {
private queue: Array<() => Promise<T>> = [];
private semaphore: Semaphore;
private processing = false;
constructor(concurrency: number) {
this.semaphore = new Semaphore(concurrency);
}
async add(task: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
const wrappedTask = async () => {
try {
const result = await task();
resolve(result);
} catch (error) {
reject(error);
}
};
this.queue.push(wrappedTask);
this.processQueue();
});
}
private async processQueue(): Promise<void> {
if (this.processing || this.queue.length === 0) return;
this.processing = true;
while (this.queue.length > 0) {
await this.semaphore.acquire();
const task = this.queue.shift();
if (task) {
// Don't await here - allow concurrent execution
task().finally(() => this.semaphore.release());
}
}
this.processing = false;
}
get pending(): number {
return this.queue.length;
}
get running(): number {
return this.semaphore.available;
}
}
// Usage
const taskQueue = new TaskQueue<string>(3); // Max 3 concurrent tasks
async function addTasks() {
const tasks = Array.from({ length: 10 }, (_, i) =>
() => delay(Math.random() * 2000).then(() => `Task ${i + 1} completed`)
);
const promises = tasks.map(task => taskQueue.add(task));
const results = await Promise.all(promises);
console.log(results);
}import { Mutex, withTimeout, delay } from 'es-toolkit/promise';
class DistributedLock {
private mutex = new Mutex();
private lockTimeout: number;
constructor(timeoutMs: number = 30000) {
this.lockTimeout = timeoutMs;
}
async withLock<T>(operation: () => Promise<T>): Promise<T> {
const lockPromise = this.mutex.acquire();
try {
await withTimeout(lockPromise, this.lockTimeout);
return await operation();
} catch (error) {
if (error.name === 'TimeoutError') {
throw new Error(`Failed to acquire lock within ${this.lockTimeout}ms`);
}
throw error;
} finally {
if (this.mutex.isLocked) {
this.mutex.release();
}
}
}
}
// Usage
const distributedLock = new DistributedLock(5000); // 5 second timeout
async function criticalOperation() {
return distributedLock.withLock(async () => {
console.log('Performing critical operation...');
await delay(2000);
return 'Operation completed';
});
}import { Semaphore, delay } from 'es-toolkit/promise';
class RateLimiter {
private semaphore: Semaphore;
private windowMs: number;
private requests: number[] = [];
constructor(requestsPerWindow: number, windowMs: number) {
this.semaphore = new Semaphore(requestsPerWindow);
this.windowMs = windowMs;
}
async execute<T>(operation: () => Promise<T>): Promise<T> {
await this.semaphore.acquire();
const now = Date.now();
this.requests.push(now);
// Clean up old requests
this.requests = this.requests.filter(time => now - time < this.windowMs);
try {
return await operation();
} finally {
// Release permit after window expires
setTimeout(() => {
this.semaphore.release();
}, this.windowMs);
}
}
get currentUsage(): number {
const now = Date.now();
this.requests = this.requests.filter(time => now - time < this.windowMs);
return this.requests.length;
}
}
// Usage - 10 requests per minute
const rateLimiter = new RateLimiter(10, 60000);
async function makeApiCall(endpoint: string) {
return rateLimiter.execute(async () => {
console.log(`Making API call to ${endpoint}. Current usage: ${rateLimiter.currentUsage}`);
const response = await fetch(endpoint);
return response.json();
});
}import { Semaphore, Mutex, delay } from 'es-toolkit/promise';
class ProducerConsumer<T> {
private buffer: T[] = [];
private bufferMutex = new Mutex();
private producerSemaphore: Semaphore;
private consumerSemaphore = new Semaphore(0);
constructor(bufferSize: number) {
this.producerSemaphore = new Semaphore(bufferSize);
}
async produce(item: T): Promise<void> {
await this.producerSemaphore.acquire();
await this.bufferMutex.acquire();
try {
this.buffer.push(item);
console.log(`Produced: ${item}. Buffer size: ${this.buffer.length}`);
} finally {
this.bufferMutex.release();
}
this.consumerSemaphore.release();
}
async consume(): Promise<T> {
await this.consumerSemaphore.acquire();
await this.bufferMutex.acquire();
let item: T;
try {
item = this.buffer.shift()!;
console.log(`Consumed: ${item}. Buffer size: ${this.buffer.length}`);
} finally {
this.bufferMutex.release();
}
this.producerSemaphore.release();
return item;
}
get bufferSize(): number {
return this.buffer.length;
}
}
// Usage
const pc = new ProducerConsumer<string>(5); // Buffer size of 5
async function producer(id: number) {
for (let i = 0; i < 10; i++) {
await pc.produce(`Producer-${id}-Item-${i}`);
await delay(Math.random() * 1000);
}
}
async function consumer(id: number) {
for (let i = 0; i < 5; i++) {
const item = await pc.consume();
console.log(`Consumer-${id} got: ${item}`);
await delay(Math.random() * 1500);
}
}
async function runProducerConsumer() {
const producers = [producer(1), producer(2)];
const consumers = [consumer(1), consumer(2), consumer(3), consumer(4)];
await Promise.all([...producers, ...consumers]);
}import { withTimeout, delay } from 'es-toolkit/promise';
async function operationWithCleanup(): Promise<string> {
const resources: any[] = [];
try {
return await withTimeout(async () => {
// Acquire resources
resources.push(await acquireResource());
resources.push(await acquireResource());
// Long-running operation
await delay(5000);
return 'Operation completed';
}, 3000); // 3 second timeout
} catch (error) {
// Cleanup resources on timeout or error
await Promise.all(resources.map(resource => releaseResource(resource)));
throw error;
}
}
async function acquireResource() {
console.log('Acquiring resource...');
return { id: Math.random() };
}
async function releaseResource(resource: any) {
console.log(`Releasing resource ${resource.id}`);
}import { delay, Mutex } from 'es-toolkit/promise';
class CircuitBreaker {
private failures = 0;
private lastFailureTime = 0;
private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED';
private mutex = new Mutex();
constructor(
private failureThreshold: number = 5,
private resetTimeoutMs: number = 60000
) {}
async execute<T>(operation: () => Promise<T>): Promise<T> {
await this.mutex.acquire();
try {
if (this.state === 'OPEN') {
if (Date.now() - this.lastFailureTime < this.resetTimeoutMs) {
throw new Error('Circuit breaker is OPEN');
} else {
this.state = 'HALF_OPEN';
}
}
try {
const result = await operation();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
} finally {
this.mutex.release();
}
}
private onSuccess(): void {
this.failures = 0;
this.state = 'CLOSED';
}
private onFailure(): void {
this.failures++;
this.lastFailureTime = Date.now();
if (this.failures >= this.failureThreshold) {
this.state = 'OPEN';
}
}
get currentState(): string {
return this.state;
}
}