Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences
npx @tessl/cli install tessl/npm-rxjs@7.8.0RxJS is the most comprehensive reactive programming library for JavaScript and TypeScript, providing powerful tools for composing asynchronous and event-based programs using observable sequences. It offers an extensive collection of operators for transforming, filtering, combining, and managing streams of data, with features including backpressure handling, error propagation, resource management, and comprehensive testing utilities.
npm install rxjsimport { Observable, Subject, BehaviorSubject, map, filter, mergeMap } from "rxjs";For operators specifically:
import { map, filter, mergeMap, catchError } from "rxjs/operators";For specialized modules:
import { ajax } from "rxjs/ajax";
import { webSocket } from "rxjs/webSocket";
import { TestScheduler } from "rxjs/testing";
import { fromFetch } from "rxjs/fetch";CommonJS:
const { Observable, Subject, map, filter } = require("rxjs");
const { ajax } = require("rxjs/ajax");import { Observable, Subject, map, filter, mergeMap, catchError } from "rxjs";
// Create observables from various sources
const numbers$ = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
// Use operators to transform data
const processedNumbers$ = numbers$.pipe(
map(x => x * 2),
filter(x => x > 2),
mergeMap(x => [x, x + 1])
);
// Subscribe to receive values
processedNumbers$.subscribe({
next: value => console.log(value),
error: err => console.error(err),
complete: () => console.log('Complete!')
});
// Work with subjects for multicasting
const subject = new Subject<string>();
subject.subscribe(value => console.log('Observer 1:', value));
subject.subscribe(value => console.log('Observer 2:', value));
subject.next('Hello World');RxJS is built around several foundational concepts:
The library uses a pull-based approach where operators create new observables rather than mutating existing ones, enabling powerful composition patterns and predictable data flow.
Foundation classes for reactive programming including Observable, ConnectableObservable, and GroupedObservable for different stream patterns.
class Observable<T> {
constructor(subscribe?: (observer: Observer<T>) => TeardownLogic);
subscribe(observer?: Partial<Observer<T>>): Subscription;
pipe<A>(op1: OperatorFunction<T, A>): Observable<A>;
pipe<A, B>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>): Observable<B>;
}
class ConnectableObservable<T> extends Observable<T> {
connect(): Subscription;
refCount(): Observable<T>;
}
class GroupedObservable<K, T> extends Observable<T> {
readonly key: K;
}Special observables that can act as both observer and observable, enabling multicasting patterns.
class Subject<T> extends Observable<T> {
next(value: T): void;
error(err: any): void;
complete(): void;
}
class BehaviorSubject<T> extends Subject<T> {
constructor(initialValue: T);
readonly value: T;
}
class ReplaySubject<T> extends Subject<T> {
constructor(bufferSize?: number, windowTime?: number, timestampProvider?: TimestampProvider);
}
class AsyncSubject<T> extends Subject<T> {}Static functions for creating observables from various sources including events, promises, iterables, and custom logic.
function from<T>(input: ObservableInput<T>): Observable<T>;
function of<T>(...args: T[]): Observable<T>;
function fromEvent<T>(target: any, eventName: string): Observable<T>;
function interval(period: number): Observable<number>;
function timer(dueTime: number | Date, period?: number): Observable<number>;
function combineLatest<T>(sources: ObservableInput<T>[]): Observable<T[]>;
function merge<T>(...sources: ObservableInput<T>[]): Observable<T>;
function forkJoin<T>(sources: ObservableInput<T>[]): Observable<T[]>;Operators for selecting specific values from observable streams based on various criteria.
function filter<T>(predicate: (value: T, index: number) => boolean): OperatorFunction<T, T>;
function take<T>(count: number): OperatorFunction<T, T>;
function skip<T>(count: number): OperatorFunction<T, T>;
function first<T>(predicate?: (value: T, index: number) => boolean): OperatorFunction<T, T>;
function last<T>(predicate?: (value: T, index: number) => boolean): OperatorFunction<T, T>;
function distinct<T>(keySelector?: (value: T) => any): OperatorFunction<T, T>;
function debounceTime<T>(dueTime: number): OperatorFunction<T, T>;
function throttleTime<T>(duration: number): OperatorFunction<T, T>;Operators for transforming values emitted by observables into new forms and structures.
function map<T, R>(project: (value: T, index: number) => R): OperatorFunction<T, R>;
function mergeMap<T, R>(project: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R>;
function switchMap<T, R>(project: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R>;
function concatMap<T, R>(project: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R>;
function scan<T, R>(accumulator: (acc: R, value: T, index: number) => R, seed: R): OperatorFunction<T, R>;
function buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T, T[]>;
function groupBy<T, K>(keySelector: (value: T) => K): OperatorFunction<T, GroupedObservable<K, T>>;Operators for combining multiple observable streams in various ways.
function combineLatestWith<T, A>(...sources: ObservableInput<A>[]): OperatorFunction<T, [T, ...A[]]>;
function mergeWith<T>(...sources: ObservableInput<T>[]): OperatorFunction<T, T>;
function concatWith<T>(...sources: ObservableInput<T>[]): OperatorFunction<T, T>;
function startWith<T>(...values: T[]): OperatorFunction<T, T>;
function withLatestFrom<T, R>(...sources: ObservableInput<R>[]): OperatorFunction<T, [T, ...R[]]>;
function zipWith<T, A>(...sources: ObservableInput<A>[]): OperatorFunction<T, [T, ...A[]]>;Operators and patterns for handling errors in reactive streams with recovery mechanisms.
function catchError<T, O>(selector: (err: any, caught: Observable<T>) => O): OperatorFunction<T, T | ObservedValueOf<O>>;
function retry<T>(count?: number): OperatorFunction<T, T>;
function retryWhen<T>(notifier: (errors: Observable<any>) => Observable<any>): OperatorFunction<T, T>;
class TimeoutError extends Error {
readonly name: "TimeoutError";
}
class EmptyError extends Error {
readonly name: "EmptyError";
}Control timing and concurrency of observable execution with various scheduling strategies.
interface SchedulerLike {
schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, delay?: number, state?: T): Subscription;
}
const asyncScheduler: SchedulerLike;
const asapScheduler: SchedulerLike;
const queueScheduler: SchedulerLike;
const animationFrameScheduler: SchedulerLike;
class VirtualTimeScheduler extends AsyncScheduler {
flush(): void;
}HTTP request capabilities with full observable integration and response streaming.
function ajax(request: string | AjaxConfig): Observable<AjaxResponse<any>>;
interface AjaxConfig {
url?: string;
method?: string;
headers?: Record<string, any>;
body?: any;
timeout?: number;
responseType?: XMLHttpRequestResponseType;
}
class AjaxResponse<T> {
readonly response: T;
readonly status: number;
readonly responseText: string;
readonly request: AjaxConfig;
}Real-time bidirectional communication with WebSocket integration for reactive streams.
function webSocket<T>(urlConfigOrSource: string | WebSocketSubjectConfig<T>): WebSocketSubject<T>;
class WebSocketSubject<T> extends Subject<T> {
multiplex<R>(
subMsg: () => any,
unsubMsg: () => any,
messageFilter: (value: T) => boolean
): Observable<R>;
close(): void;
}
interface WebSocketSubjectConfig<T> {
url: string;
protocol?: string | string[];
serializer?: (value: T) => any;
deserializer?: (e: MessageEvent) => T;
}Modern fetch-based HTTP requests with full Observable integration, streaming support, and comprehensive error handling.
function fromFetch<T>(
input: string | Request,
initWithSelector?: RequestInit & {
selector?: (response: Response) => ObservableInput<T>;
}
): Observable<T extends never ? Response : T>;Comprehensive testing framework with marble testing and virtual time scheduling.
class TestScheduler extends VirtualTimeScheduler {
run<T>(callback: (helpers: RunHelpers) => T): T;
createHotObservable<T>(marbles: string, values?: any, error?: any): HotObservable<T>;
createColdObservable<T>(marbles: string, values?: any, error?: any): ColdObservable<T>;
expectObservable<T>(observable: Observable<T>): Expectation<T>;
}
interface RunHelpers {
cold: typeof TestScheduler.prototype.createColdObservable;
hot: typeof TestScheduler.prototype.createHotObservable;
expectObservable: typeof TestScheduler.prototype.expectObservable;
flush: typeof TestScheduler.prototype.flush;
}Utilities for converting observables to promises for integration with async/await patterns.
function firstValueFrom<T>(source: Observable<T>): Promise<T>;
function lastValueFrom<T>(source: Observable<T>): Promise<T>;Global configuration object for customizing RxJS behavior and error handling.
const config: GlobalConfig;
interface GlobalConfig {
onUnhandledError: ((err: any) => void) | null;
onStoppedNotification: ((notification: Notification<any>, subscriber: Subscriber<any>) => void) | null;
Promise?: PromiseConstructorLike;
useDeprecatedSynchronousErrorHandling: boolean;
useDeprecatedNextContext: boolean;
}type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T>;
interface Observer<T> {
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
interface Subscription {
unsubscribe(): void;
readonly closed: boolean;
}
interface OperatorFunction<T, R> {
(source: Observable<T>): Observable<R>;
}
interface MonoTypeOperatorFunction<T> extends OperatorFunction<T, T> {}
type TeardownLogic = Subscription | Unsubscribable | (() => void) | void;