Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences
—
Operators for transforming values emitted by observables into new forms and structures, including mapping, flattening, scanning, and buffering operations.
Transform each value using a projection function.
/**
* Transform each emitted value using a projection function
* @param project - Function to transform each value
* @returns Operator function applying transformation to each value
*/
function map<T, R>(project: (value: T, index: number) => R): OperatorFunction<T, R>;Usage Examples:
import { of, map } from "rxjs";
// Double each number
of(1, 2, 3, 4).pipe(
map(x => x * 2)
).subscribe(x => console.log(x)); // 2, 4, 6, 8
// Transform objects
of(
{ firstName: 'John', lastName: 'Doe' },
{ firstName: 'Jane', lastName: 'Smith' }
).pipe(
map(person => `${person.firstName} ${person.lastName}`)
).subscribe(name => console.log(name)); // John Doe, Jane Smith
// With index
of('a', 'b', 'c').pipe(
map((letter, index) => `${index}: ${letter}`)
).subscribe(result => console.log(result)); // 0: a, 1: b, 2: cMap each value to an observable and merge all inner observables.
/**
* Map each value to an observable and merge all inner observables
* @param project - Function that returns an observable for each value
* @param concurrent - Maximum number of concurrent inner subscriptions
* @returns Operator function flattening mapped observables
*/
function mergeMap<T, R>(
project: (value: T, index: number) => ObservableInput<R>,
concurrent?: number
): OperatorFunction<T, R>;
function mergeMap<T, R, O>(
project: (value: T, index: number) => ObservableInput<R>,
resultSelector: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => O,
concurrent?: number
): OperatorFunction<T, O>;
/**
* Alias for mergeMap
*/
function flatMap<T, R>(
project: (value: T, index: number) => ObservableInput<R>,
concurrent?: number
): OperatorFunction<T, R>;Usage Examples:
import { of, mergeMap, delay } from "rxjs";
import { ajax } from "rxjs/ajax";
// Map to HTTP requests
of('user1', 'user2', 'user3').pipe(
mergeMap(userId => ajax.getJSON(`/api/users/${userId}`))
).subscribe(user => console.log('Loaded user:', user));
// Control concurrency
of(1, 2, 3, 4, 5).pipe(
mergeMap(n => of(n).pipe(delay(1000)), 2) // Max 2 concurrent
).subscribe(x => console.log('Value:', x));
// Map to arrays (flattened)
of([1, 2], [3, 4], [5, 6]).pipe(
mergeMap(arr => arr)
).subscribe(x => console.log(x)); // 1, 2, 3, 4, 5, 6Map each value to an observable, canceling previous inner observables.
/**
* Map each value to an observable, canceling previous inner observables
* @param project - Function that returns an observable for each value
* @returns Operator function switching to new mapped observables
*/
function switchMap<T, R>(
project: (value: T, index: number) => ObservableInput<R>
): OperatorFunction<T, R>;
function switchMap<T, R, O>(
project: (value: T, index: number) => ObservableInput<R>,
resultSelector: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => O
): OperatorFunction<T, O>;Usage Examples:
import { fromEvent, switchMap, debounceTime, map } from "rxjs";
import { ajax } from "rxjs/ajax";
// Search with automatic cancellation
const searchInput = document.getElementById('search');
fromEvent(searchInput, 'input').pipe(
map(event => event.target.value),
debounceTime(300),
switchMap(term => ajax.getJSON(`/api/search?q=${term}`))
).subscribe(results => {
console.log('Search results:', results);
// Previous search requests are cancelled automatically
});
// Latest value wins
of(1, 2, 3).pipe(
switchMap(n => interval(1000).pipe(
map(i => `${n}-${i}`),
take(3)
))
).subscribe(x => console.log(x)); // Only outputs from last (3): 3-0, 3-1, 3-2Map each value to an observable and concatenate in order.
/**
* Map each value to an observable and concatenate in sequential order
* @param project - Function that returns an observable for each value
* @returns Operator function concatenating mapped observables in order
*/
function concatMap<T, R>(
project: (value: T, index: number) => ObservableInput<R>
): OperatorFunction<T, R>;
function concatMap<T, R, O>(
project: (value: T, index: number) => ObservableInput<R>,
resultSelector: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => O
): OperatorFunction<T, O>;Usage Examples:
import { of, concatMap, delay } from "rxjs";
// Sequential processing (waits for each to complete)
of(1, 2, 3).pipe(
concatMap(n => of(`Processing ${n}`).pipe(delay(1000)))
).subscribe(x => console.log(x));
// Outputs: Processing 1 (after 1s), Processing 2 (after 2s), Processing 3 (after 3s)
// Preserve order
of('file1.txt', 'file2.txt', 'file3.txt').pipe(
concatMap(filename => ajax.getJSON(`/api/files/${filename}`))
).subscribe(fileData => {
console.log('File loaded in order:', fileData);
});Map to an observable, ignoring new values while inner observable is active.
/**
* Map each value to an observable, ignoring new values while inner observable is active
* @param project - Function that returns an observable for each value
* @returns Operator function exhausting mapped observables
*/
function exhaustMap<T, R>(
project: (value: T, index: number) => ObservableInput<R>
): OperatorFunction<T, R>;
function exhaustMap<T, R, O>(
project: (value: T, index: number) => ObservableInput<R>,
resultSelector: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => O
): OperatorFunction<T, O>;Usage Examples:
import { fromEvent, exhaustMap } from "rxjs";
import { ajax } from "rxjs/ajax";
// Prevent multiple simultaneous requests
const saveButton = document.getElementById('save');
fromEvent(saveButton, 'click').pipe(
exhaustMap(() => ajax.post('/api/save', { data: 'example' }))
).subscribe(
response => console.log('Saved:', response),
err => console.error('Save failed:', err)
);
// Clicks during ongoing save are ignoredApply accumulator function over time, emitting intermediate results.
/**
* Apply accumulator function over time, emitting intermediate results
* @param accumulator - Function to compute accumulated value
* @param seed - Initial accumulated value
* @returns Operator function scanning with accumulator
*/
function scan<T, R>(
accumulator: (acc: R, value: T, index: number) => R,
seed: R
): OperatorFunction<T, R>;
function scan<T>(
accumulator: (acc: T, value: T, index: number) => T
): OperatorFunction<T, T>;Usage Examples:
import { of, scan } from "rxjs";
// Running sum
of(1, 2, 3, 4).pipe(
scan((acc, value) => acc + value, 0)
).subscribe(sum => console.log('Running sum:', sum));
// Output: 1, 3, 6, 10
// Build object over time
of(
{ type: 'SET_NAME', payload: 'Alice' },
{ type: 'SET_AGE', payload: 25 },
{ type: 'SET_EMAIL', payload: 'alice@example.com' }
).pipe(
scan((state, action) => {
switch (action.type) {
case 'SET_NAME': return { ...state, name: action.payload };
case 'SET_AGE': return { ...state, age: action.payload };
case 'SET_EMAIL': return { ...state, email: action.payload };
default: return state;
}
}, {})
).subscribe(state => console.log('State:', state));Buffer values until boundary observable emits.
/**
* Buffer values until boundary observable emits
* @param closingNotifier - Observable that triggers buffer emission
* @returns Operator function buffering values until boundary
*/
function buffer<T>(closingNotifier: ObservableInput<any>): OperatorFunction<T, T[]>;Usage Examples:
import { interval, buffer, fromEvent } from "rxjs";
// Buffer interval values until button click
const source$ = interval(1000);
const clicks$ = fromEvent(document.getElementById('flush'), 'click');
source$.pipe(
buffer(clicks$)
).subscribe(buffered => console.log('Buffered values:', buffered));
// Click after 3 seconds: [0, 1, 2]Buffer values until specific count is reached.
/**
* Buffer values until buffer reaches specified size
* @param bufferSize - Size of buffer
* @param startBufferEvery - Interval to start new buffer
* @returns Operator function buffering by count
*/
function bufferCount<T>(bufferSize: number, startBufferEvery?: number): OperatorFunction<T, T[]>;Usage Examples:
import { of, bufferCount } from "rxjs";
// Buffer every 3 values
of(1, 2, 3, 4, 5, 6, 7, 8).pipe(
bufferCount(3)
).subscribe(buffer => console.log('Buffer:', buffer));
// Output: [1, 2, 3], [4, 5, 6], [7, 8]
// Overlapping buffers
of(1, 2, 3, 4, 5, 6).pipe(
bufferCount(3, 1) // New buffer every 1, size 3
).subscribe(buffer => console.log('Buffer:', buffer));
// Output: [1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]Buffer values for specific time periods.
/**
* Buffer values for specified time periods
* @param bufferTimeSpan - Time span of buffer in milliseconds
* @param bufferCreationInterval - Interval to start new buffer
* @param maxBufferSize - Maximum buffer size
* @param scheduler - Optional scheduler
* @returns Operator function buffering by time
*/
function bufferTime<T>(
bufferTimeSpan: number,
bufferCreationInterval?: number,
maxBufferSize?: number,
scheduler?: SchedulerLike
): OperatorFunction<T, T[]>;Usage Examples:
import { interval, bufferTime } from "rxjs";
// Buffer values for 2 seconds
interval(500).pipe(
bufferTime(2000)
).subscribe(buffer => console.log('2-second buffer:', buffer));
// Every 2 seconds: [0, 1, 2], [3, 4, 5, 6], etc./**
* Buffer values from opening of one observable until closing of another
* @param openings - Observable that opens the buffer
* @param closingSelector - Function that returns observable to close buffer
* @returns Operator function buffering between opening and closing
*/
function bufferToggle<T, O>(
openings: ObservableInput<O>,
closingSelector: (value: O) => ObservableInput<any>
): OperatorFunction<T, T[]>;
/**
* Buffer values until boundary observable emits
* @param closingSelector - Function returning boundary observable
* @returns Operator function buffering until boundary
*/
function bufferWhen<T>(closingSelector: () => ObservableInput<any>): OperatorFunction<T, T[]>;Usage Examples:
import { interval, bufferToggle, bufferWhen, fromEvent, timer } from "rxjs";
// Buffer between mouse down and mouse up
const mouseDown$ = fromEvent(document, 'mousedown');
const mouseUp$ = fromEvent(document, 'mouseup');
interval(100).pipe(
bufferToggle(mouseDown$, () => mouseUp$)
).subscribe(buffered => {
console.log('Values during mouse press:', buffered);
});
// Buffer until random interval
interval(100).pipe(
bufferWhen(() => timer(Math.random() * 2000))
).subscribe(buffered => {
console.log('Random buffer:', buffered);
});Group values by key into separate observables.
/**
* Group values by key into separate GroupedObservable instances
* @param keySelector - Function to select grouping key
* @param elementSelector - Function to select element for group
* @param durationSelector - Function returning observable that determines group lifetime
* @returns Operator function grouping by key
*/
function groupBy<T, K>(
keySelector: (value: T) => K
): OperatorFunction<T, GroupedObservable<K, T>>;
function groupBy<T, K, R>(
keySelector: (value: T) => K,
elementSelector: (value: T) => R
): OperatorFunction<T, GroupedObservable<K, R>>;
function groupBy<T, K, R>(
keySelector: (value: T) => K,
elementSelector?: (value: T) => R,
durationSelector?: (grouped: GroupedObservable<K, R>) => ObservableInput<any>
): OperatorFunction<T, GroupedObservable<K, R>>;Usage Examples:
import { of, groupBy, mergeMap, toArray } from "rxjs";
// Group by category
of(
{ category: 'fruit', name: 'apple' },
{ category: 'vegetable', name: 'carrot' },
{ category: 'fruit', name: 'banana' },
{ category: 'vegetable', name: 'lettuce' }
).pipe(
groupBy(item => item.category),
mergeMap(group =>
group.pipe(
toArray(),
map(items => ({ category: group.key, items }))
)
)
).subscribe(result => console.log(result));
// Output: { category: 'fruit', items: [apple, banana] }
// { category: 'vegetable', items: [carrot, lettuce] }/**
* Convert all emissions and notifications to Notification objects
* @returns Operator function materializing notifications
*/
function materialize<T>(): OperatorFunction<T, Notification<T>>;
/**
* Convert Notification objects back to emissions
* @returns Operator function dematerializing notifications
*/
function dematerialize<T>(): OperatorFunction<Notification<T>, T>;Usage Examples:
import { of, materialize, dematerialize, map } from "rxjs";
// Materialize all notifications
of(1, 2, 3).pipe(
materialize()
).subscribe(notification => {
console.log('Kind:', notification.kind);
if (notification.kind === 'N') {
console.log('Value:', notification.value);
}
});
// Convert back from notifications
const notifications$ = of(
{ kind: 'N', value: 1 },
{ kind: 'N', value: 2 },
{ kind: 'C' }
);
notifications$.pipe(
dematerialize()
).subscribe(
value => console.log('Value:', value),
err => console.error('Error:', err),
() => console.log('Complete!')
);/**
* Map each value to constant value
* @param value - Constant value to map to
* @returns Operator function mapping to constant
*/
function mapTo<R>(value: R): OperatorFunction<any, R>;
/**
* Recursively projects each source value to an observable which is merged in the output observable
* @param project - Function returning observable for recursion
* @param concurrent - Maximum concurrent recursions
* @returns Operator function expanding recursively
*/
function expand<T, R>(
project: (value: T, index: number) => ObservableInput<R>,
concurrent?: number,
scheduler?: SchedulerLike
): OperatorFunction<T, R>;
/**
* Emit previous and current value as pair
* @returns Operator function emitting pairs of consecutive values
*/
function pairwise<T>(): OperatorFunction<T, [T, T]>;
/**
* Select properties from source objects
* @param properties - Property keys to pluck
* @returns Operator function plucking properties
*/
function pluck<T, K1 extends keyof T>(k1: K1): OperatorFunction<T, T[K1]>;
function pluck<T, K1 extends keyof T, K2 extends keyof T[K1]>(k1: K1, k2: K2): OperatorFunction<T, T[K1][K2]>;
function pluck<T, K1 extends keyof T, K2 extends keyof T[K1], K3 extends keyof T[K1][K2]>(k1: K1, k2: K2, k3: K3): OperatorFunction<T, T[K1][K2][K3]>;
/**
* Apply multiple operators in sequence using mergeScan
* @param accumulator - Accumulator function returning observable
* @param seed - Initial seed value
* @param concurrent - Maximum concurrent inner subscriptions
* @returns Operator function applying mergeScan pattern
*/
function mergeScan<T, R>(
accumulator: (acc: R, value: T, index: number) => ObservableInput<R>,
seed: R,
concurrent?: number
): OperatorFunction<T, R>;
/**
* Apply switchMap pattern with accumulator
* @param accumulator - Accumulator function returning observable
* @param seed - Initial seed value
* @returns Operator function applying switchScan pattern
*/
function switchScan<T, R>(
accumulator: (acc: R, value: T, index: number) => ObservableInput<R>,
seed: R
): OperatorFunction<T, R>;
/**
* Count emissions that pass optional predicate
* @param predicate - Optional predicate to filter counted emissions
* @returns Operator function emitting count of emissions
*/
function count<T>(predicate?: (value: T, index: number) => boolean): OperatorFunction<T, number>;
/**
* Test if all emissions satisfy predicate
* @param predicate - Predicate function to test emissions
* @returns Operator function emitting boolean result
*/
function every<T>(predicate: (value: T, index: number) => boolean): OperatorFunction<T, boolean>;
/**
* Find maximum value using optional comparer
* @param comparer - Optional comparison function
* @returns Operator function emitting maximum value
*/
function max<T>(comparer?: (x: T, y: T) => number): OperatorFunction<T, T>;
/**
* Find minimum value using optional comparer
* @param comparer - Optional comparison function
* @returns Operator function emitting minimum value
*/
function min<T>(comparer?: (x: T, y: T) => number): OperatorFunction<T, T>;
/**
* Reduce emissions to single accumulated value
* @param accumulator - Accumulator function
* @param seed - Initial seed value
* @returns Operator function emitting final accumulated value
*/
function reduce<T, R>(
accumulator: (acc: R, value: T, index: number) => R,
seed: R
): OperatorFunction<T, R>;
function reduce<T>(accumulator: (acc: T, value: T, index: number) => T): OperatorFunction<T, T>;
/**
* Collect all emissions into array
* @returns Operator function emitting array of all values
*/
function toArray<T>(): OperatorFunction<T, T[]>;
/**
* Materialize notifications as emission objects
* @returns Operator function emitting notification objects
*/
function materialize<T>(): OperatorFunction<T, Notification<T>>;
/**
* Dematerialize notification objects back to emissions
* @returns Operator function converting notifications to emissions
*/
function dematerialize<T>(): OperatorFunction<Notification<T>, T>;
/**
* Add timestamp to each emission
* @param timestampProvider - Provider for timestamp values
* @returns Operator function adding timestamps
*/
function timestamp<T>(timestampProvider?: TimestampProvider): OperatorFunction<T, Timestamp<T>>;
/**
* Add time interval between emissions
* @param timestampProvider - Provider for timestamp values
* @returns Operator function adding time intervals
*/
function timeInterval<T>(timestampProvider?: TimestampProvider): OperatorFunction<T, TimeInterval<T>>;interface GroupedObservable<K, T> extends Observable<T> {
readonly key: K;
}
type OperatorFunction<T, R> = (source: Observable<T>) => Observable<R>;
type ObservableInput<T> = Observable<T> | Promise<T> | Iterable<T>;