Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences
—
Operators and patterns for handling errors in reactive streams with recovery mechanisms, retry logic, and error transformation.
Catch errors and recover with alternative observable.
/**
* Catch errors on the source observable and switch to alternative observable
* @param selector - Function that receives error and caught observable, returns alternative
* @returns Operator function catching errors and switching to alternative
*/
function catchError<T, O extends ObservableInput<any>>(
selector: (err: any, caught: Observable<T>) => O
): OperatorFunction<T, T | ObservedValueOf<O>>;Usage Examples:
import { of, throwError, catchError } from "rxjs";
import { ajax } from "rxjs/ajax";
// Simple error recovery
throwError('Something went wrong!').pipe(
catchError(err => {
console.log('Error caught:', err);
return of('Default value');
})
).subscribe(value => console.log(value)); // 'Default value'
// HTTP request with fallback
ajax.getJSON('/api/data').pipe(
catchError(err => {
if (err.status === 404) {
return of({ data: [], message: 'No data available' });
}
return throwError(err); // Re-throw other errors
})
).subscribe(
data => console.log('Data:', data),
err => console.error('Unhandled error:', err)
);
// Retry with different endpoint
ajax.getJSON('/api/primary-server').pipe(
catchError(err => {
console.log('Primary server failed, trying backup');
return ajax.getJSON('/api/backup-server');
}),
catchError(err => {
console.log('Backup server also failed');
return of({ error: true, message: 'All servers unavailable' });
})
).subscribe(result => console.log('Result:', result));Retry failed observable a specified number of times.
/**
* Retry failed observable up to specified count
* @param count - Number of retry attempts (default: Infinity)
* @returns Operator function retrying on error
*/
function retry<T>(count?: number): OperatorFunction<T, T>;
function retry<T>(config: RetryConfig): OperatorFunction<T, T>;
interface RetryConfig {
count?: number;
delay?: number | ((error: any, retryCount: number) => number);
resetOnSuccess?: boolean;
}Usage Examples:
import { ajax } from "rxjs/ajax";
import { retry } from "rxjs/operators";
// Simple retry
ajax.getJSON('/api/unreliable-endpoint').pipe(
retry(3) // Retry up to 3 times
).subscribe(
data => console.log('Success:', data),
err => console.error('Failed after 3 retries:', err)
);
// Retry with configuration
ajax.getJSON('/api/data').pipe(
retry({
count: 3,
delay: 1000, // Wait 1 second between retries
resetOnSuccess: true
})
).subscribe(
data => console.log('Data:', data),
err => console.error('All retries failed:', err)
);
// Retry with exponential backoff
ajax.getJSON('/api/data').pipe(
retry({
count: 5,
delay: (error, retryCount) => retryCount * 1000 // 1s, 2s, 3s, 4s, 5s
})
).subscribe(result => console.log(result));Retry when another observable emits.
/**
* Retry when the notifier observable emits
* @param notifier - Function that receives error observable and returns retry trigger
* @returns Operator function retrying when notifier emits
*/
function retryWhen<T>(
notifier: (errors: Observable<any>) => ObservableInput<any>
): OperatorFunction<T, T>;Usage Examples:
import { ajax } from "rxjs/ajax";
import { retryWhen, delay, take, tap } from "rxjs/operators";
// Retry with delay
ajax.getJSON('/api/data').pipe(
retryWhen(errors =>
errors.pipe(
tap(err => console.log('Error occurred, retrying in 2s:', err)),
delay(2000),
take(3) // Retry maximum 3 times
)
)
).subscribe(
data => console.log('Success:', data),
err => console.error('Final error:', err)
);
// Retry with exponential backoff
ajax.getJSON('/api/data').pipe(
retryWhen(errors =>
errors.pipe(
scan((retryCount, err) => {
if (retryCount >= 3) {
throw err; // Stop retrying after 3 attempts
}
return retryCount + 1;
}, 0),
delay(1000) // Wait 1 second between retries
)
)
);RxJS provides several built-in error types for common scenarios.
/**
* Error thrown when a timeout occurs
*/
class TimeoutError extends Error {
readonly name: "TimeoutError";
constructor(info?: any);
}
/**
* Error thrown when sequence is empty but value was expected
*/
class EmptyError extends Error {
readonly name: "EmptyError";
constructor(message?: string);
}
/**
* Error thrown when argument is out of range
*/
class ArgumentOutOfRangeError extends Error {
readonly name: "ArgumentOutOfRangeError";
constructor(message?: string);
}
/**
* Error thrown when no elements match criteria
*/
class NotFoundError extends Error {
readonly name: "NotFoundError";
constructor(message?: string);
}
/**
* Error thrown when sequence doesn't match expected pattern
*/
class SequenceError extends Error {
readonly name: "SequenceError";
constructor(message?: string);
}
/**
* Error thrown when object has been unsubscribed
*/
class ObjectUnsubscribedError extends Error {
readonly name: "ObjectUnsubscribedError";
constructor(message?: string);
}
/**
* Error thrown when unsubscription fails
*/
class UnsubscriptionError extends Error {
readonly name: "UnsubscriptionError";
constructor(errors: any[]);
readonly errors: any[];
}
/**
* Error thrown during AJAX operations
*/
class AjaxError extends Error {
readonly name: "AjaxError";
readonly request: AjaxRequest;
readonly status: number;
readonly responseType: XMLHttpRequestResponseType;
readonly response: any;
constructor(message: string, xhr: XMLHttpRequest, request: AjaxRequest);
}
/**
* Error thrown when AJAX request times out
*/
class AjaxTimeoutError extends AjaxError {
readonly name: "AjaxTimeoutError";
}import { ajax } from "rxjs/ajax";
import { catchError, retry, retryWhen, delay, take } from "rxjs/operators";
// Classify errors and apply different strategies
function handleApiError<T>(source: Observable<T>) {
return source.pipe(
retryWhen(errors =>
errors.pipe(
switchMap((error, index) => {
// Network errors - retry with exponential backoff
if (error.name === 'TypeError' || error.status === 0) {
if (index < 3) {
return timer(Math.pow(2, index) * 1000); // 1s, 2s, 4s
}
}
// Server errors - retry with linear backoff
if (error.status >= 500) {
if (index < 2) {
return timer(2000); // 2s delay
}
}
// Don't retry client errors
throw error;
})
)
),
catchError(err => {
// Final error handling
console.error('Request failed after retries:', err);
if (err.status === 404) {
return of({ data: null, error: 'Resource not found' });
}
if (err.status >= 500) {
return of({ data: null, error: 'Server error, please try again later' });
}
return of({ data: null, error: 'Request failed' });
})
);
}
// Usage
handleApiError(ajax.getJSON('/api/users/123')).subscribe(result => {
if (result.error) {
console.log('Error:', result.error);
} else {
console.log('User data:', result.data);
}
});import { catchError, tap } from "rxjs/operators";
// Global error handler operator
function globalErrorHandler<T>() {
return (source: Observable<T>) => source.pipe(
catchError(error => {
// Log to monitoring service
console.error('Global error:', error);
// Send to error reporting service
if (typeof window !== 'undefined' && window.ErrorReporting) {
window.ErrorReporting.captureException(error);
}
// Show user-friendly message
if (error.status === 401) {
// Redirect to login
window.location.href = '/login';
return EMPTY;
}
// Re-throw for component-level handling
return throwError(error);
})
);
}
// Apply globally to HTTP requests
const httpClient = {
get: (url: string) => ajax.getJSON(url).pipe(globalErrorHandler()),
post: (url: string, data: any) => ajax.post(url, data).pipe(globalErrorHandler())
};import { timer, throwError, of } from "rxjs";
import { switchMap, catchError, tap } from "rxjs/operators";
class CircuitBreaker {
private failures = 0;
private lastFailureTime = 0;
private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED';
constructor(
private threshold = 5,
private timeout = 60000 // 1 minute
) {}
execute<T>(operation: () => Observable<T>): Observable<T> {
return defer(() => {
if (this.state === 'OPEN') {
if (Date.now() - this.lastFailureTime > this.timeout) {
this.state = 'HALF_OPEN';
} else {
return throwError(new Error('Circuit breaker is OPEN'));
}
}
return operation().pipe(
tap(() => {
// Success - reset circuit
this.failures = 0;
this.state = 'CLOSED';
}),
catchError(err => {
this.failures++;
this.lastFailureTime = Date.now();
if (this.failures >= this.threshold) {
this.state = 'OPEN';
}
return throwError(err);
})
);
});
}
}
// Usage
const breaker = new CircuitBreaker(3, 30000); // 3 failures, 30s timeout
breaker.execute(() => ajax.getJSON('/api/unreliable')).subscribe(
data => console.log('Success:', data),
err => console.error('Circuit breaker error:', err)
);import { timeout, catchError } from "rxjs/operators";
// Request with timeout
ajax.getJSON('/api/slow-endpoint').pipe(
timeout(5000), // 5 second timeout
catchError(err => {
if (err instanceof TimeoutError) {
console.log('Request timed out');
return of({ error: 'Request timeout', data: null });
}
return throwError(err);
})
).subscribe(result => console.log(result));
// Timeout with custom error
ajax.getJSON('/api/data').pipe(
timeout({
each: 5000,
with: () => throwError(new Error('Custom timeout message'))
})
).subscribe(
data => console.log('Data:', data),
err => console.error('Timeout error:', err.message)
);interface RetryConfig {
count?: number;
delay?: number | ((error: any, retryCount: number) => number);
resetOnSuccess?: boolean;
}
type ObservedValueOf<O> = O extends ObservableInput<infer T> ? T : never;
type OperatorFunction<T, R> = (source: Observable<T>) => Observable<R>;