Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences
—
Operators for combining multiple observable streams in various ways, including merging, concatenating, and timing-based combinations.
Combine latest values with other observables.
/**
* Combine latest values from source with provided observables
* @param sources - Other observables to combine with
* @returns Operator function combining latest values as arrays
*/
function combineLatestWith<T, A>(...sources: ObservableInput<A>[]): OperatorFunction<T, [T, ...A[]]>;Usage Examples:
import { interval, combineLatestWith, map } from "rxjs";
// Combine timer with user interactions
const timer$ = interval(1000);
const clicks$ = fromEvent(document, 'click').pipe(map(() => 'click'));
timer$.pipe(
combineLatestWith(clicks$)
).subscribe(([time, lastClick]) => {
console.log(`Time: ${time}, Last action: ${lastClick}`);
});Merge source with other observables.
/**
* Merge source observable with provided observables
* @param sources - Other observables to merge with
* @returns Operator function merging all sources
*/
function mergeWith<T>(...sources: ObservableInput<T>[]): OperatorFunction<T, T>;Usage Examples:
import { of, mergeWith, delay } from "rxjs";
// Merge multiple streams
const source1$ = of(1, 2, 3);
const source2$ = of(4, 5, 6).pipe(delay(1000));
const source3$ = of(7, 8, 9).pipe(delay(2000));
source1$.pipe(
mergeWith(source2$, source3$)
).subscribe(value => console.log('Merged value:', value));
// Output: 1, 2, 3 (immediately), then 4, 5, 6 (after 1s), then 7, 8, 9 (after 2s)Concatenate source with other observables in sequence.
/**
* Concatenate source observable with provided observables sequentially
* @param sources - Other observables to concatenate after source
* @returns Operator function concatenating in order
*/
function concatWith<T>(...sources: ObservableInput<T>[]): OperatorFunction<T, T>;Usage Examples:
import { of, concatWith, delay } from "rxjs";
// Sequential execution
const intro$ = of('Starting...');
const process$ = of('Processing...').pipe(delay(1000));
const complete$ = of('Complete!').pipe(delay(1000));
intro$.pipe(
concatWith(process$, complete$)
).subscribe(message => console.log(message));
// Output: Starting... (immediately), Processing... (after 1s), Complete! (after 2s total)Emit specified values before source values.
/**
* Emit specified values before source observable values
* @param values - Values to emit first
* @returns Operator function prepending values
*/
function startWith<T>(...values: T[]): OperatorFunction<T, T>;
function startWith<T>(...values: (T | SchedulerLike)[]): OperatorFunction<T, T>;Usage Examples:
import { of, startWith } from "rxjs";
// Add initial values
of(4, 5, 6).pipe(
startWith(1, 2, 3)
).subscribe(x => console.log(x)); // 1, 2, 3, 4, 5, 6
// Start with loading state
const data$ = ajax.getJSON('/api/data');
data$.pipe(
startWith({ loading: true })
).subscribe(result => console.log(result));
// Immediately emits { loading: true }, then actual dataEmit specified values after source completes.
/**
* Emit specified values after source observable completes
* @param values - Values to emit after completion
* @returns Operator function appending values
*/
function endWith<T>(...values: T[]): OperatorFunction<T, T>;
function endWith<T>(...values: (T | SchedulerLike)[]): OperatorFunction<T, T>;Usage Examples:
import { of, endWith } from "rxjs";
// Add final values
of(1, 2, 3).pipe(
endWith('complete', 'done')
).subscribe(x => console.log(x)); // 1, 2, 3, 'complete', 'done'Combine source with latest values from other observables when source emits.
/**
* Combine each source emission with latest values from other observables
* @param sources - Other observables to get latest values from
* @returns Operator function combining with latest values
*/
function withLatestFrom<T, A>(...sources: ObservableInput<A>[]): OperatorFunction<T, [T, ...A[]]>;
function withLatestFrom<T, A, R>(
...sourcesAndProject: [...ObservableInput<A>[], (...values: [T, ...A[]]) => R]
): OperatorFunction<T, R>;Usage Examples:
import { fromEvent, interval, withLatestFrom, map } from "rxjs";
// Get latest timer value on button click
const clicks$ = fromEvent(document.getElementById('button'), 'click');
const timer$ = interval(1000);
clicks$.pipe(
withLatestFrom(timer$),
map(([click, time]) => `Clicked at timer value: ${time}`)
).subscribe(message => console.log(message));Zip source with other observables.
/**
* Zip source observable with provided observables
* @param sources - Other observables to zip with
* @returns Operator function zipping corresponding values
*/
function zipWith<T, A>(...sources: ObservableInput<A>[]): OperatorFunction<T, [T, ...A[]]>;Usage Examples:
import { of, zipWith } from "rxjs";
// Zip corresponding values
const letters$ = of('a', 'b', 'c');
const numbers$ = of(1, 2, 3);
const symbols$ = of('!', '@', '#');
letters$.pipe(
zipWith(numbers$, symbols$)
).subscribe(([letter, number, symbol]) => {
console.log(`${letter}${number}${symbol}`); // a1!, b2@, c3#
});Race source with other observables, emit from first to emit.
/**
* Race source with other observables, emit values from the first to emit
* @param sources - Other observables to race with
* @returns Operator function racing with other sources
*/
function raceWith<T>(...sources: ObservableInput<T>[]): OperatorFunction<T, T>;Usage Examples:
import { timer, raceWith, map } from "rxjs";
// Race different timers
const fast$ = timer(1000).pipe(map(() => 'fast'));
const slow$ = timer(3000).pipe(map(() => 'slow'));
fast$.pipe(
raceWith(slow$)
).subscribe(winner => console.log('Winner:', winner)); // 'fast' (after 1s)/**
* Flatten higher-order observable by merging all inner observables
* @param concurrent - Maximum concurrent inner subscriptions
* @returns Operator function merging all inner observables
*/
function mergeAll<T>(concurrent?: number): OperatorFunction<ObservableInput<T>, T>;
/**
* Flatten higher-order observable by concatenating inner observables
* @returns Operator function concatenating all inner observables
*/
function concatAll<T>(): OperatorFunction<ObservableInput<T>, T>;
/**
* Flatten higher-order observable by switching to latest inner observable
* @returns Operator function switching to latest inner observable
*/
function switchAll<T>(): OperatorFunction<ObservableInput<T>, T>;
/**
* Flatten higher-order observable by exhausting (ignoring while active)
* @returns Operator function exhausting inner observables
*/
function exhaustAll<T>(): OperatorFunction<ObservableInput<T>, T>;
/**
* Flatten higher-order observable by combining latest from all inner observables
* @returns Operator function combining latest from inner observables
*/
function combineLatestAll<T>(): OperatorFunction<ObservableInput<T>, T[]>;
function combineLatestAll<T, R>(project: (...values: T[]) => R): OperatorFunction<ObservableInput<T>, R>;
/**
* Flatten higher-order observable by zipping inner observables
* @returns Operator function zipping inner observables
*/
function zipAll<T>(): OperatorFunction<ObservableInput<T>, T[]>;
function zipAll<T, R>(project: (...values: T[]) => R): OperatorFunction<ObservableInput<T>, R>;Usage Examples:
import { of, map, mergeAll, concatAll, switchAll } from "rxjs";
import { delay } from "rxjs/operators";
// Create higher-order observable
const higherOrder$ = of(1, 2, 3).pipe(
map(n => of(`inner-${n}`).pipe(delay(n * 1000)))
);
// Different flattening strategies:
// mergeAll - all inner observables run concurrently
higherOrder$.pipe(mergeAll()).subscribe(x => console.log('Merge:', x));
// Output: inner-1 (1s), inner-2 (2s), inner-3 (3s)
// concatAll - inner observables run sequentially
higherOrder$.pipe(concatAll()).subscribe(x => console.log('Concat:', x));
// Output: inner-1 (1s), inner-2 (3s), inner-3 (6s)
// switchAll - switch to latest inner observable
higherOrder$.pipe(switchAll()).subscribe(x => console.log('Switch:', x));
// Output: inner-3 (3s) onlyimport { of, combineLatest, startWith, switchMap } from "rxjs";
// Conditional data loading
const userId$ = new BehaviorSubject(null);
const userPermissions$ = new BehaviorSubject([]);
const userData$ = combineLatest([userId$, userPermissions$]).pipe(
switchMap(([userId, permissions]) => {
if (userId && permissions.includes('read')) {
return ajax.getJSON(`/api/users/${userId}`);
}
return of(null);
}),
startWith({ loading: true })
);import { merge, scan, startWith } from "rxjs";
// Combine multiple action streams
const userActions$ = fromEvent(userButton, 'click').pipe(map(() => ({ type: 'USER_ACTION' })));
const systemEvents$ = fromEvent(window, 'beforeunload').pipe(map(() => ({ type: 'SYSTEM_EVENT' })));
const apiEvents$ = apiErrorStream$.pipe(map(error => ({ type: 'API_ERROR', error })));
const allEvents$ = merge(userActions$, systemEvents$, apiEvents$).pipe(
scan((state, event) => {
switch (event.type) {
case 'USER_ACTION': return { ...state, lastUserAction: Date.now() };
case 'SYSTEM_EVENT': return { ...state, systemState: 'closing' };
case 'API_ERROR': return { ...state, errors: [...state.errors, event.error] };
default: return state;
}
}, { lastUserAction: null, systemState: 'active', errors: [] }),
startWith({ lastUserAction: null, systemState: 'active', errors: [] })
);type ObservableInput<T> = Observable<T> | Promise<T> | Iterable<T>;
type OperatorFunction<T, R> = (source: Observable<T>) => Observable<R>;