Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences
—
Static functions for creating observables from various sources including events, promises, iterables, and custom logic.
Convert various input types to observables.
/**
* Convert various input types to observables
* @param input - Array, promise, iterable, or observable-like object
* @returns Observable emitting values from the input
*/
function from<T>(input: ObservableInput<T>): Observable<T>;
function from<T>(input: ObservableInput<T>, scheduler: SchedulerLike): Observable<T>;Usage Examples:
import { from } from "rxjs";
// From array
from([1, 2, 3, 4]).subscribe(x => console.log(x));
// From promise
from(fetch('/api/data')).subscribe(response => console.log(response));
// From async iterable
async function* asyncGenerator() {
yield 1;
yield 2;
yield 3;
}
from(asyncGenerator()).subscribe(x => console.log(x));
// From string (iterable)
from('hello').subscribe(char => console.log(char)); // h, e, l, l, oCreate observable that emits provided values in sequence.
/**
* Create observable that emits provided arguments in sequence
* @param args - Values to emit
* @returns Observable emitting the provided values
*/
function of<T>(): Observable<never>;
function of<T>(value: T): Observable<T>;
function of<T>(...args: T[]): Observable<T>;
function of<T>(...args: (T | SchedulerLike)[]): Observable<T>;Usage Examples:
import { of } from "rxjs";
// Emit multiple values
of(1, 2, 3, 4).subscribe(x => console.log(x));
// Emit single value
of('hello').subscribe(x => console.log(x));
// Emit complex objects
of(
{ id: 1, name: 'Alice' },
{ id: 2, name: 'Bob' }
).subscribe(user => console.log(user));Create observable from DOM events or Node.js EventEmitter.
/**
* Create observable from DOM events or EventEmitter
* @param target - Event target (DOM element, EventEmitter, etc.)
* @param eventName - Name of the event to listen for
* @param options - Event listener options
* @returns Observable emitting event objects
*/
function fromEvent<T>(
target: any,
eventName: string,
options?: EventListenerOptions | ((...args: any[]) => T)
): Observable<T>;Usage Examples:
import { fromEvent } from "rxjs";
import { map, throttleTime } from "rxjs/operators";
// DOM events
const button = document.getElementById('myButton');
const clicks$ = fromEvent(button, 'click');
clicks$.subscribe(event => console.log('Button clicked!', event));
// Window resize events with throttling
const resize$ = fromEvent(window, 'resize').pipe(
throttleTime(200),
map(() => ({ width: window.innerWidth, height: window.innerHeight }))
);
resize$.subscribe(size => console.log('Window resized:', size));
// Node.js EventEmitter
const EventEmitter = require('events');
const emitter = new EventEmitter();
const myEvents$ = fromEvent(emitter, 'data');
myEvents$.subscribe(data => console.log('Received:', data));Create observable that emits sequential numbers at specified intervals.
/**
* Create observable that emits sequential numbers at regular intervals
* @param period - Interval between emissions in milliseconds
* @param scheduler - Scheduler to control timing
* @returns Observable emitting incremental numbers
*/
function interval(period: number, scheduler?: SchedulerLike): Observable<number>;Usage Examples:
import { interval } from "rxjs";
import { take, map } from "rxjs/operators";
// Emit every second
const timer$ = interval(1000);
timer$.pipe(take(5)).subscribe(x => console.log('Timer:', x)); // 0, 1, 2, 3, 4
// Create clock
interval(1000).pipe(
map(() => new Date().toLocaleTimeString())
).subscribe(time => console.log('Current time:', time));Create observable that emits after a delay, optionally repeating at intervals.
/**
* Create observable that emits after initial delay, optionally repeating
* @param dueTime - Initial delay in milliseconds or specific Date
* @param period - Repeat interval in milliseconds (optional)
* @param scheduler - Scheduler to control timing
* @returns Observable emitting numbers starting from 0
*/
function timer(dueTime: number | Date, period?: number, scheduler?: SchedulerLike): Observable<number>;
function timer(dueTime: number | Date, scheduler?: SchedulerLike): Observable<0>;Usage Examples:
import { timer } from "rxjs";
// Single emission after 3 seconds
timer(3000).subscribe(() => console.log('3 seconds have passed'));
// Start after 2 seconds, then emit every 1 second
timer(2000, 1000).pipe(take(5)).subscribe(x => console.log('Timer value:', x));
// Timer at specific time
const tomorrow = new Date();
tomorrow.setDate(tomorrow.getDate() + 1);
timer(tomorrow).subscribe(() => console.log('Tomorrow has arrived!'));Combine latest values from multiple observables into arrays.
/**
* Combine latest values from multiple observables
* @param sources - Array of observables or individual observables
* @returns Observable emitting arrays of latest values
*/
function combineLatest<T>(sources: ObservableInput<T>[]): Observable<T[]>;
function combineLatest<T, R>(sources: ObservableInput<T>[], project: (...values: T[]) => R): Observable<R>;
function combineLatest<T1, T2>(v1: ObservableInput<T1>, v2: ObservableInput<T2>): Observable<[T1, T2]>;
function combineLatest<T1, T2, T3>(v1: ObservableInput<T1>, v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<[T1, T2, T3]>;
function combineLatest<T1, T2, T3, T4>(v1: ObservableInput<T1>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<[T1, T2, T3, T4]>;
function combineLatest<T1, T2, T3, T4, T5>(v1: ObservableInput<T1>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): Observable<[T1, T2, T3, T4, T5]>;
function combineLatest<T1, T2, T3, T4, T5, T6>(v1: ObservableInput<T1>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): Observable<[T1, T2, T3, T4, T5, T6]>;Usage Examples:
import { combineLatest, timer, of } from "rxjs";
import { map } from "rxjs/operators";
// Combine user input and server data
const username$ = of('Alice');
const permissions$ = of(['read', 'write']);
const isOnline$ = timer(0, 5000).pipe(map(x => x % 2 === 0));
combineLatest([username$, permissions$, isOnline$]).subscribe(
([username, permissions, isOnline]) => {
console.log(`User: ${username}, Permissions: ${permissions}, Online: ${isOnline}`);
}
);Merge multiple observables into a single stream.
/**
* Merge multiple observables into one
* @param sources - Observables to merge
* @param concurrent - Maximum concurrent subscriptions
* @param scheduler - Scheduler for managing subscriptions
* @returns Observable emitting values from all sources
*/
function merge<T>(...sources: ObservableInput<T>[]): Observable<T>;
function merge<T>(...sources: (ObservableInput<T> | SchedulerLike | number)[]): Observable<T>;
function merge<T>(sources: ObservableInput<T>[], concurrent?: number, scheduler?: SchedulerLike): Observable<T>;Usage Examples:
import { merge, interval, fromEvent } from "rxjs";
import { map } from "rxjs/operators";
// Merge timer and user clicks
const timer$ = interval(1000).pipe(map(() => 'timer'));
const clicks$ = fromEvent(document, 'click').pipe(map(() => 'click'));
merge(timer$, clicks$).subscribe(source => {
console.log('Event from:', source);
});
// Merge with concurrency limit
const urls = ['url1', 'url2', 'url3'];
const requests$ = urls.map(url => from(fetch(url)));
merge(...requests$, 2).subscribe(response => {
console.log('Response received:', response);
});Wait for all observables to complete, then emit final values as array.
/**
* Wait for all observables to complete and emit their final values as array
* @param sources - Array of observables or object with observable properties
* @returns Observable emitting array or object of final values
*/
function forkJoin<T>(sources: ObservableInput<T>[]): Observable<T[]>;
function forkJoin<T extends Record<string, ObservableInput<any>>>(sources: T): Observable<{ [K in keyof T]: ObservedValueOf<T[K]> }>;
function forkJoin<T1, T2>(v1: ObservableInput<T1>, v2: ObservableInput<T2>): Observable<[T1, T2]>;
function forkJoin<T1, T2, T3>(v1: ObservableInput<T1>, v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<[T1, T2, T3]>;
function forkJoin<T1, T2, T3, T4>(v1: ObservableInput<T1>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<[T1, T2, T3, T4]>;Usage Examples:
import { forkJoin, timer, of } from "rxjs";
import { delay } from "rxjs/operators";
// Wait for multiple async operations
const user$ = of({ id: 1, name: 'Alice' }).pipe(delay(1000));
const posts$ = of([{ id: 1, title: 'Post 1' }]).pipe(delay(2000));
const comments$ = of([{ id: 1, text: 'Comment 1' }]).pipe(delay(1500));
forkJoin([user$, posts$, comments$]).subscribe(
([user, posts, comments]) => {
console.log('All data loaded:', { user, posts, comments });
}
);
// Object syntax
forkJoin({
user: user$,
posts: posts$,
comments: comments$
}).subscribe(data => {
console.log('User:', data.user);
console.log('Posts:', data.posts);
console.log('Comments:', data.comments);
});/**
* Create observable that emits no values and immediately completes
*/
function empty(scheduler?: SchedulerLike): Observable<never>;
/**
* Create observable that never emits any values
*/
function never(): Observable<never>;
/**
* Create observable that immediately emits an error
*/
function throwError(errorOrErrorFactory: any | (() => any), scheduler?: SchedulerLike): Observable<never>;
/**
* Create observable from event pattern (addHandler/removeHandler)
*/
function fromEventPattern<T>(
addHandler: (handler: NodeEventHandler) => any,
removeHandler?: (handler: NodeEventHandler, signal?: any) => void,
resultSelector?: (...args: any[]) => T
): Observable<T>;
/**
* Defer observable creation until subscription
*/
function defer<R extends ObservableInput<any>>(observableFactory: () => R): Observable<ObservedValueOf<R>>;
/**
* Generate sequence of values based on state and condition
*/
function generate<T, S>(
initialState: S,
condition: (state: S) => boolean,
iterate: (state: S) => S,
resultSelector?: (state: S) => T,
scheduler?: SchedulerLike
): Observable<T>;
/**
* Create range of sequential numbers
*/
function range(start: number, count?: number, scheduler?: SchedulerLike): Observable<number>;
/**
* Choose between observables based on condition
*/
function iif<T, F>(
condition: () => boolean,
trueResult?: ObservableInput<T>,
falseResult?: ObservableInput<F>
): Observable<T | F>;
/**
* Convert callback-based function to observable-returning function
*/
function bindCallback<T>(
callbackFunc: (...args: any[]) => void,
resultSelector?: (...args: any[]) => T,
scheduler?: SchedulerLike
): (...args: any[]) => Observable<T>;
/**
* Convert Node.js callback-based function to observable-returning function
*/
function bindNodeCallback<T>(
callbackFunc: (...args: any[]) => void,
resultSelector?: (...args: any[]) => T,
scheduler?: SchedulerLike
): (...args: any[]) => Observable<T>;
/**
* Create connectable observable that can be shared among subscribers
*/
function connectable<T>(source: ObservableInput<T>, config?: ConnectableConfig<T>): ConnectableObservable<T>;
/**
* Continue with next observable on error (concatenation with error recovery)
*/
function onErrorResumeNext<T, R>(...sources: ObservableInput<any>[]): Observable<T | R>;
/**
* Create key-value pair emissions from object properties
*/
function pairs<T>(obj: Record<string, T>, scheduler?: SchedulerLike): Observable<[string, T]>;
/**
* Split source observable into two based on predicate
*/
function partition<T>(
source: ObservableInput<T>,
predicate: (value: T, index: number) => boolean,
thisArg?: any
): [Observable<T>, Observable<T>];
/**
* Race multiple observables - emit from first to emit
*/
function race<T>(...sources: ObservableInput<T>[]): Observable<T>;
/**
* Create scheduled observable with custom scheduler
*/
function scheduled<T>(input: ObservableInput<T>, scheduler: SchedulerLike): Observable<T>;
/**
* Manage resource lifecycle with automatic cleanup
*/
function using<T, R>(
resourceFactory: () => R,
observableFactory: (resource: R) => ObservableInput<T>
): Observable<T>;
/**
* Combine corresponding values from multiple observables into tuples
*/
function zip<T, R>(...sources: ObservableInput<any>[]): Observable<R[]>;
/**
* Concatenate observables in sequence
*/
function concat<T>(...sources: ObservableInput<T>[]): Observable<T>;
/**
* Create observable from animation frames
*/
function animationFrames(timestampProvider?: TimestampProvider): Observable<{ timestamp: number; elapsed: number }>;/**
* Empty observable constant
*/
const EMPTY: Observable<never>;
/**
* Never-emitting observable constant
*/
const NEVER: Observable<never>;type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T>;
type ObservedValueOf<O> = O extends ObservableInput<infer T> ? T : never;
interface EventListenerOptions {
capture?: boolean;
once?: boolean;
passive?: boolean;
}
type NodeEventHandler = (...args: any[]) => void;