CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-rxjs

Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences

Pending
Overview
Eval results
Files

combination-operators.mddocs/

Combination Operators

Operators for combining multiple observable streams in various ways, including merging, concatenating, and timing-based combinations.

Capabilities

combineLatestWith

Combine latest values with other observables.

/**
 * Combine latest values from source with provided observables
 * @param sources - Other observables to combine with
 * @returns Operator function combining latest values as arrays
 */
function combineLatestWith<T, A>(...sources: ObservableInput<A>[]): OperatorFunction<T, [T, ...A[]]>;

Usage Examples:

import { interval, combineLatestWith, map } from "rxjs";

// Combine timer with user interactions
const timer$ = interval(1000);
const clicks$ = fromEvent(document, 'click').pipe(map(() => 'click'));

timer$.pipe(
  combineLatestWith(clicks$)
).subscribe(([time, lastClick]) => {
  console.log(`Time: ${time}, Last action: ${lastClick}`);
});

mergeWith

Merge source with other observables.

/**
 * Merge source observable with provided observables
 * @param sources - Other observables to merge with
 * @returns Operator function merging all sources
 */
function mergeWith<T>(...sources: ObservableInput<T>[]): OperatorFunction<T, T>;

Usage Examples:

import { of, mergeWith, delay } from "rxjs";

// Merge multiple streams
const source1$ = of(1, 2, 3);
const source2$ = of(4, 5, 6).pipe(delay(1000));
const source3$ = of(7, 8, 9).pipe(delay(2000));

source1$.pipe(
  mergeWith(source2$, source3$)
).subscribe(value => console.log('Merged value:', value));
// Output: 1, 2, 3 (immediately), then 4, 5, 6 (after 1s), then 7, 8, 9 (after 2s)

concatWith

Concatenate source with other observables in sequence.

/**
 * Concatenate source observable with provided observables sequentially
 * @param sources - Other observables to concatenate after source
 * @returns Operator function concatenating in order
 */
function concatWith<T>(...sources: ObservableInput<T>[]): OperatorFunction<T, T>;

Usage Examples:

import { of, concatWith, delay } from "rxjs";

// Sequential execution
const intro$ = of('Starting...');
const process$ = of('Processing...').pipe(delay(1000));
const complete$ = of('Complete!').pipe(delay(1000));

intro$.pipe(
  concatWith(process$, complete$)
).subscribe(message => console.log(message));
// Output: Starting... (immediately), Processing... (after 1s), Complete! (after 2s total)

startWith

Emit specified values before source values.

/**
 * Emit specified values before source observable values
 * @param values - Values to emit first
 * @returns Operator function prepending values
 */
function startWith<T>(...values: T[]): OperatorFunction<T, T>;
function startWith<T>(...values: (T | SchedulerLike)[]): OperatorFunction<T, T>;

Usage Examples:

import { of, startWith } from "rxjs";

// Add initial values
of(4, 5, 6).pipe(
  startWith(1, 2, 3)
).subscribe(x => console.log(x)); // 1, 2, 3, 4, 5, 6

// Start with loading state
const data$ = ajax.getJSON('/api/data');
data$.pipe(
  startWith({ loading: true })
).subscribe(result => console.log(result));
// Immediately emits { loading: true }, then actual data

endWith

Emit specified values after source completes.

/**
 * Emit specified values after source observable completes
 * @param values - Values to emit after completion
 * @returns Operator function appending values
 */
function endWith<T>(...values: T[]): OperatorFunction<T, T>;
function endWith<T>(...values: (T | SchedulerLike)[]): OperatorFunction<T, T>;

Usage Examples:

import { of, endWith } from "rxjs";

// Add final values
of(1, 2, 3).pipe(
  endWith('complete', 'done')
).subscribe(x => console.log(x)); // 1, 2, 3, 'complete', 'done'

withLatestFrom

Combine source with latest values from other observables when source emits.

/**
 * Combine each source emission with latest values from other observables
 * @param sources - Other observables to get latest values from
 * @returns Operator function combining with latest values
 */
function withLatestFrom<T, A>(...sources: ObservableInput<A>[]): OperatorFunction<T, [T, ...A[]]>;
function withLatestFrom<T, A, R>(
  ...sourcesAndProject: [...ObservableInput<A>[], (...values: [T, ...A[]]) => R]
): OperatorFunction<T, R>;

Usage Examples:

import { fromEvent, interval, withLatestFrom, map } from "rxjs";

// Get latest timer value on button click
const clicks$ = fromEvent(document.getElementById('button'), 'click');
const timer$ = interval(1000);

clicks$.pipe(
  withLatestFrom(timer$),
  map(([click, time]) => `Clicked at timer value: ${time}`)
).subscribe(message => console.log(message));

zipWith

Zip source with other observables.

/**
 * Zip source observable with provided observables
 * @param sources - Other observables to zip with
 * @returns Operator function zipping corresponding values
 */
function zipWith<T, A>(...sources: ObservableInput<A>[]): OperatorFunction<T, [T, ...A[]]>;

Usage Examples:

import { of, zipWith } from "rxjs";

// Zip corresponding values
const letters$ = of('a', 'b', 'c');
const numbers$ = of(1, 2, 3);
const symbols$ = of('!', '@', '#');

letters$.pipe(
  zipWith(numbers$, symbols$)
).subscribe(([letter, number, symbol]) => {
  console.log(`${letter}${number}${symbol}`); // a1!, b2@, c3#
});

raceWith

Race source with other observables, emit from first to emit.

/**
 * Race source with other observables, emit values from the first to emit
 * @param sources - Other observables to race with
 * @returns Operator function racing with other sources
 */
function raceWith<T>(...sources: ObservableInput<T>[]): OperatorFunction<T, T>;

Usage Examples:

import { timer, raceWith, map } from "rxjs";

// Race different timers
const fast$ = timer(1000).pipe(map(() => 'fast'));
const slow$ = timer(3000).pipe(map(() => 'slow'));

fast$.pipe(
  raceWith(slow$)
).subscribe(winner => console.log('Winner:', winner)); // 'fast' (after 1s)

Flattening Combination Operators

/**
 * Flatten higher-order observable by merging all inner observables
 * @param concurrent - Maximum concurrent inner subscriptions
 * @returns Operator function merging all inner observables
 */
function mergeAll<T>(concurrent?: number): OperatorFunction<ObservableInput<T>, T>;

/**
 * Flatten higher-order observable by concatenating inner observables
 * @returns Operator function concatenating all inner observables
 */
function concatAll<T>(): OperatorFunction<ObservableInput<T>, T>;

/**
 * Flatten higher-order observable by switching to latest inner observable
 * @returns Operator function switching to latest inner observable
 */
function switchAll<T>(): OperatorFunction<ObservableInput<T>, T>;

/**
 * Flatten higher-order observable by exhausting (ignoring while active)
 * @returns Operator function exhausting inner observables
 */
function exhaustAll<T>(): OperatorFunction<ObservableInput<T>, T>;

/**
 * Flatten higher-order observable by combining latest from all inner observables
 * @returns Operator function combining latest from inner observables
 */
function combineLatestAll<T>(): OperatorFunction<ObservableInput<T>, T[]>;
function combineLatestAll<T, R>(project: (...values: T[]) => R): OperatorFunction<ObservableInput<T>, R>;

/**
 * Flatten higher-order observable by zipping inner observables
 * @returns Operator function zipping inner observables
 */
function zipAll<T>(): OperatorFunction<ObservableInput<T>, T[]>;
function zipAll<T, R>(project: (...values: T[]) => R): OperatorFunction<ObservableInput<T>, R>;

Usage Examples:

import { of, map, mergeAll, concatAll, switchAll } from "rxjs";
import { delay } from "rxjs/operators";

// Create higher-order observable
const higherOrder$ = of(1, 2, 3).pipe(
  map(n => of(`inner-${n}`).pipe(delay(n * 1000)))
);

// Different flattening strategies:

// mergeAll - all inner observables run concurrently
higherOrder$.pipe(mergeAll()).subscribe(x => console.log('Merge:', x));
// Output: inner-1 (1s), inner-2 (2s), inner-3 (3s)

// concatAll - inner observables run sequentially  
higherOrder$.pipe(concatAll()).subscribe(x => console.log('Concat:', x));
// Output: inner-1 (1s), inner-2 (3s), inner-3 (6s)

// switchAll - switch to latest inner observable
higherOrder$.pipe(switchAll()).subscribe(x => console.log('Switch:', x));
// Output: inner-3 (3s) only

Advanced Combination Patterns

Conditional Combination

import { of, combineLatest, startWith, switchMap } from "rxjs";

// Conditional data loading
const userId$ = new BehaviorSubject(null);
const userPermissions$ = new BehaviorSubject([]);

const userData$ = combineLatest([userId$, userPermissions$]).pipe(
  switchMap(([userId, permissions]) => {
    if (userId && permissions.includes('read')) {
      return ajax.getJSON(`/api/users/${userId}`);
    }
    return of(null);
  }),
  startWith({ loading: true })
);

Multi-source State Management

import { merge, scan, startWith } from "rxjs";

// Combine multiple action streams
const userActions$ = fromEvent(userButton, 'click').pipe(map(() => ({ type: 'USER_ACTION' })));
const systemEvents$ = fromEvent(window, 'beforeunload').pipe(map(() => ({ type: 'SYSTEM_EVENT' })));
const apiEvents$ = apiErrorStream$.pipe(map(error => ({ type: 'API_ERROR', error })));

const allEvents$ = merge(userActions$, systemEvents$, apiEvents$).pipe(
  scan((state, event) => {
    switch (event.type) {
      case 'USER_ACTION': return { ...state, lastUserAction: Date.now() };
      case 'SYSTEM_EVENT': return { ...state, systemState: 'closing' };
      case 'API_ERROR': return { ...state, errors: [...state.errors, event.error] };
      default: return state;
    }
  }, { lastUserAction: null, systemState: 'active', errors: [] }),
  startWith({ lastUserAction: null, systemState: 'active', errors: [] })
);

Types

type ObservableInput<T> = Observable<T> | Promise<T> | Iterable<T>;
type OperatorFunction<T, R> = (source: Observable<T>) => Observable<R>;

Install with Tessl CLI

npx tessl i tessl/npm-rxjs

docs

ajax-operations.md

combination-operators.md

core-types.md

error-handling.md

fetch-operations.md

filtering-operators.md

index.md

observable-creation.md

schedulers.md

subjects.md

testing-utilities.md

transformation-operators.md

websocket-operations.md

tile.json