CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-rxjs

Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences

Pending
Overview
Eval results
Files

transformation-operators.mddocs/

Transformation Operators

Operators for transforming values emitted by observables into new forms and structures, including mapping, flattening, scanning, and buffering operations.

Capabilities

map

Transform each value using a projection function.

/**
 * Transform each emitted value using a projection function
 * @param project - Function to transform each value
 * @returns Operator function applying transformation to each value
 */
function map<T, R>(project: (value: T, index: number) => R): OperatorFunction<T, R>;

Usage Examples:

import { of, map } from "rxjs";

// Double each number
of(1, 2, 3, 4).pipe(
  map(x => x * 2)
).subscribe(x => console.log(x)); // 2, 4, 6, 8

// Transform objects
of(
  { firstName: 'John', lastName: 'Doe' },
  { firstName: 'Jane', lastName: 'Smith' }
).pipe(
  map(person => `${person.firstName} ${person.lastName}`)
).subscribe(name => console.log(name)); // John Doe, Jane Smith

// With index
of('a', 'b', 'c').pipe(
  map((letter, index) => `${index}: ${letter}`)
).subscribe(result => console.log(result)); // 0: a, 1: b, 2: c

mergeMap (flatMap)

Map each value to an observable and merge all inner observables.

/**
 * Map each value to an observable and merge all inner observables
 * @param project - Function that returns an observable for each value
 * @param concurrent - Maximum number of concurrent inner subscriptions
 * @returns Operator function flattening mapped observables
 */
function mergeMap<T, R>(
  project: (value: T, index: number) => ObservableInput<R>, 
  concurrent?: number
): OperatorFunction<T, R>;
function mergeMap<T, R, O>(
  project: (value: T, index: number) => ObservableInput<R>,
  resultSelector: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => O,
  concurrent?: number
): OperatorFunction<T, O>;

/**
 * Alias for mergeMap
 */
function flatMap<T, R>(
  project: (value: T, index: number) => ObservableInput<R>, 
  concurrent?: number
): OperatorFunction<T, R>;

Usage Examples:

import { of, mergeMap, delay } from "rxjs";
import { ajax } from "rxjs/ajax";

// Map to HTTP requests
of('user1', 'user2', 'user3').pipe(
  mergeMap(userId => ajax.getJSON(`/api/users/${userId}`))
).subscribe(user => console.log('Loaded user:', user));

// Control concurrency
of(1, 2, 3, 4, 5).pipe(
  mergeMap(n => of(n).pipe(delay(1000)), 2) // Max 2 concurrent
).subscribe(x => console.log('Value:', x));

// Map to arrays (flattened)
of([1, 2], [3, 4], [5, 6]).pipe(
  mergeMap(arr => arr)
).subscribe(x => console.log(x)); // 1, 2, 3, 4, 5, 6

switchMap

Map each value to an observable, canceling previous inner observables.

/**
 * Map each value to an observable, canceling previous inner observables
 * @param project - Function that returns an observable for each value
 * @returns Operator function switching to new mapped observables
 */
function switchMap<T, R>(
  project: (value: T, index: number) => ObservableInput<R>
): OperatorFunction<T, R>;
function switchMap<T, R, O>(
  project: (value: T, index: number) => ObservableInput<R>,
  resultSelector: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => O
): OperatorFunction<T, O>;

Usage Examples:

import { fromEvent, switchMap, debounceTime, map } from "rxjs";
import { ajax } from "rxjs/ajax";

// Search with automatic cancellation
const searchInput = document.getElementById('search');
fromEvent(searchInput, 'input').pipe(
  map(event => event.target.value),
  debounceTime(300),
  switchMap(term => ajax.getJSON(`/api/search?q=${term}`))
).subscribe(results => {
  console.log('Search results:', results);
  // Previous search requests are cancelled automatically
});

// Latest value wins
of(1, 2, 3).pipe(
  switchMap(n => interval(1000).pipe(
    map(i => `${n}-${i}`),
    take(3)
  ))
).subscribe(x => console.log(x)); // Only outputs from last (3): 3-0, 3-1, 3-2

concatMap

Map each value to an observable and concatenate in order.

/**
 * Map each value to an observable and concatenate in sequential order
 * @param project - Function that returns an observable for each value
 * @returns Operator function concatenating mapped observables in order
 */
function concatMap<T, R>(
  project: (value: T, index: number) => ObservableInput<R>
): OperatorFunction<T, R>;
function concatMap<T, R, O>(
  project: (value: T, index: number) => ObservableInput<R>,
  resultSelector: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => O
): OperatorFunction<T, O>;

Usage Examples:

import { of, concatMap, delay } from "rxjs";

// Sequential processing (waits for each to complete)
of(1, 2, 3).pipe(
  concatMap(n => of(`Processing ${n}`).pipe(delay(1000)))
).subscribe(x => console.log(x));
// Outputs: Processing 1 (after 1s), Processing 2 (after 2s), Processing 3 (after 3s)

// Preserve order
of('file1.txt', 'file2.txt', 'file3.txt').pipe(
  concatMap(filename => ajax.getJSON(`/api/files/${filename}`))
).subscribe(fileData => {
  console.log('File loaded in order:', fileData);
});

exhaustMap

Map to an observable, ignoring new values while inner observable is active.

/**
 * Map each value to an observable, ignoring new values while inner observable is active
 * @param project - Function that returns an observable for each value
 * @returns Operator function exhausting mapped observables
 */
function exhaustMap<T, R>(
  project: (value: T, index: number) => ObservableInput<R>
): OperatorFunction<T, R>;
function exhaustMap<T, R, O>(
  project: (value: T, index: number) => ObservableInput<R>,
  resultSelector: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => O
): OperatorFunction<T, O>;

Usage Examples:

import { fromEvent, exhaustMap } from "rxjs";
import { ajax } from "rxjs/ajax";

// Prevent multiple simultaneous requests
const saveButton = document.getElementById('save');
fromEvent(saveButton, 'click').pipe(
  exhaustMap(() => ajax.post('/api/save', { data: 'example' }))
).subscribe(
  response => console.log('Saved:', response),
  err => console.error('Save failed:', err)
);
// Clicks during ongoing save are ignored

scan

Apply accumulator function over time, emitting intermediate results.

/**
 * Apply accumulator function over time, emitting intermediate results
 * @param accumulator - Function to compute accumulated value
 * @param seed - Initial accumulated value
 * @returns Operator function scanning with accumulator
 */
function scan<T, R>(
  accumulator: (acc: R, value: T, index: number) => R, 
  seed: R
): OperatorFunction<T, R>;
function scan<T>(
  accumulator: (acc: T, value: T, index: number) => T
): OperatorFunction<T, T>;

Usage Examples:

import { of, scan } from "rxjs";

// Running sum
of(1, 2, 3, 4).pipe(
  scan((acc, value) => acc + value, 0)
).subscribe(sum => console.log('Running sum:', sum));
// Output: 1, 3, 6, 10

// Build object over time
of(
  { type: 'SET_NAME', payload: 'Alice' },
  { type: 'SET_AGE', payload: 25 },
  { type: 'SET_EMAIL', payload: 'alice@example.com' }
).pipe(
  scan((state, action) => {
    switch (action.type) {
      case 'SET_NAME': return { ...state, name: action.payload };
      case 'SET_AGE': return { ...state, age: action.payload };
      case 'SET_EMAIL': return { ...state, email: action.payload };
      default: return state;
    }
  }, {})
).subscribe(state => console.log('State:', state));

buffer

Buffer values until boundary observable emits.

/**
 * Buffer values until boundary observable emits
 * @param closingNotifier - Observable that triggers buffer emission
 * @returns Operator function buffering values until boundary
 */
function buffer<T>(closingNotifier: ObservableInput<any>): OperatorFunction<T, T[]>;

Usage Examples:

import { interval, buffer, fromEvent } from "rxjs";

// Buffer interval values until button click
const source$ = interval(1000);
const clicks$ = fromEvent(document.getElementById('flush'), 'click');

source$.pipe(
  buffer(clicks$)
).subscribe(buffered => console.log('Buffered values:', buffered));
// Click after 3 seconds: [0, 1, 2]

bufferCount

Buffer values until specific count is reached.

/**
 * Buffer values until buffer reaches specified size
 * @param bufferSize - Size of buffer
 * @param startBufferEvery - Interval to start new buffer
 * @returns Operator function buffering by count
 */
function bufferCount<T>(bufferSize: number, startBufferEvery?: number): OperatorFunction<T, T[]>;

Usage Examples:

import { of, bufferCount } from "rxjs";

// Buffer every 3 values
of(1, 2, 3, 4, 5, 6, 7, 8).pipe(
  bufferCount(3)
).subscribe(buffer => console.log('Buffer:', buffer));
// Output: [1, 2, 3], [4, 5, 6], [7, 8]

// Overlapping buffers
of(1, 2, 3, 4, 5, 6).pipe(
  bufferCount(3, 1) // New buffer every 1, size 3
).subscribe(buffer => console.log('Buffer:', buffer));
// Output: [1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]

bufferTime

Buffer values for specific time periods.

/**
 * Buffer values for specified time periods
 * @param bufferTimeSpan - Time span of buffer in milliseconds
 * @param bufferCreationInterval - Interval to start new buffer
 * @param maxBufferSize - Maximum buffer size
 * @param scheduler - Optional scheduler
 * @returns Operator function buffering by time
 */
function bufferTime<T>(
  bufferTimeSpan: number,
  bufferCreationInterval?: number,
  maxBufferSize?: number,
  scheduler?: SchedulerLike
): OperatorFunction<T, T[]>;

Usage Examples:

import { interval, bufferTime } from "rxjs";

// Buffer values for 2 seconds
interval(500).pipe(
  bufferTime(2000)
).subscribe(buffer => console.log('2-second buffer:', buffer));
// Every 2 seconds: [0, 1, 2], [3, 4, 5, 6], etc.

bufferToggle and bufferWhen

/**
 * Buffer values from opening of one observable until closing of another
 * @param openings - Observable that opens the buffer
 * @param closingSelector - Function that returns observable to close buffer
 * @returns Operator function buffering between opening and closing
 */
function bufferToggle<T, O>(
  openings: ObservableInput<O>,
  closingSelector: (value: O) => ObservableInput<any>
): OperatorFunction<T, T[]>;

/**
 * Buffer values until boundary observable emits
 * @param closingSelector - Function returning boundary observable
 * @returns Operator function buffering until boundary
 */
function bufferWhen<T>(closingSelector: () => ObservableInput<any>): OperatorFunction<T, T[]>;

Usage Examples:

import { interval, bufferToggle, bufferWhen, fromEvent, timer } from "rxjs";

// Buffer between mouse down and mouse up
const mouseDown$ = fromEvent(document, 'mousedown');
const mouseUp$ = fromEvent(document, 'mouseup');

interval(100).pipe(
  bufferToggle(mouseDown$, () => mouseUp$)
).subscribe(buffered => {
  console.log('Values during mouse press:', buffered);
});

// Buffer until random interval
interval(100).pipe(
  bufferWhen(() => timer(Math.random() * 2000))
).subscribe(buffered => {
  console.log('Random buffer:', buffered);
});

groupBy

Group values by key into separate observables.

/**
 * Group values by key into separate GroupedObservable instances
 * @param keySelector - Function to select grouping key
 * @param elementSelector - Function to select element for group
 * @param durationSelector - Function returning observable that determines group lifetime
 * @returns Operator function grouping by key
 */
function groupBy<T, K>(
  keySelector: (value: T) => K
): OperatorFunction<T, GroupedObservable<K, T>>;
function groupBy<T, K, R>(
  keySelector: (value: T) => K,
  elementSelector: (value: T) => R
): OperatorFunction<T, GroupedObservable<K, R>>;
function groupBy<T, K, R>(
  keySelector: (value: T) => K,
  elementSelector?: (value: T) => R,
  durationSelector?: (grouped: GroupedObservable<K, R>) => ObservableInput<any>
): OperatorFunction<T, GroupedObservable<K, R>>;

Usage Examples:

import { of, groupBy, mergeMap, toArray } from "rxjs";

// Group by category
of(
  { category: 'fruit', name: 'apple' },
  { category: 'vegetable', name: 'carrot' },
  { category: 'fruit', name: 'banana' },
  { category: 'vegetable', name: 'lettuce' }
).pipe(
  groupBy(item => item.category),
  mergeMap(group => 
    group.pipe(
      toArray(),
      map(items => ({ category: group.key, items }))
    )
  )
).subscribe(result => console.log(result));
// Output: { category: 'fruit', items: [apple, banana] }
//         { category: 'vegetable', items: [carrot, lettuce] }

Materialization Operators

/**
 * Convert all emissions and notifications to Notification objects
 * @returns Operator function materializing notifications
 */
function materialize<T>(): OperatorFunction<T, Notification<T>>;

/**
 * Convert Notification objects back to emissions
 * @returns Operator function dematerializing notifications  
 */
function dematerialize<T>(): OperatorFunction<Notification<T>, T>;

Usage Examples:

import { of, materialize, dematerialize, map } from "rxjs";

// Materialize all notifications
of(1, 2, 3).pipe(
  materialize()
).subscribe(notification => {
  console.log('Kind:', notification.kind);
  if (notification.kind === 'N') {
    console.log('Value:', notification.value);
  }
});

// Convert back from notifications
const notifications$ = of(
  { kind: 'N', value: 1 },
  { kind: 'N', value: 2 },
  { kind: 'C' }
);

notifications$.pipe(
  dematerialize()
).subscribe(
  value => console.log('Value:', value),
  err => console.error('Error:', err),
  () => console.log('Complete!')
);

Advanced Transformation Operators

/**
 * Map each value to constant value
 * @param value - Constant value to map to
 * @returns Operator function mapping to constant
 */
function mapTo<R>(value: R): OperatorFunction<any, R>;

/**
 * Recursively projects each source value to an observable which is merged in the output observable
 * @param project - Function returning observable for recursion
 * @param concurrent - Maximum concurrent recursions
 * @returns Operator function expanding recursively
 */
function expand<T, R>(
  project: (value: T, index: number) => ObservableInput<R>,
  concurrent?: number,
  scheduler?: SchedulerLike
): OperatorFunction<T, R>;

/**
 * Emit previous and current value as pair
 * @returns Operator function emitting pairs of consecutive values
 */
function pairwise<T>(): OperatorFunction<T, [T, T]>;

/**
 * Select properties from source objects
 * @param properties - Property keys to pluck
 * @returns Operator function plucking properties
 */
function pluck<T, K1 extends keyof T>(k1: K1): OperatorFunction<T, T[K1]>;
function pluck<T, K1 extends keyof T, K2 extends keyof T[K1]>(k1: K1, k2: K2): OperatorFunction<T, T[K1][K2]>;
function pluck<T, K1 extends keyof T, K2 extends keyof T[K1], K3 extends keyof T[K1][K2]>(k1: K1, k2: K2, k3: K3): OperatorFunction<T, T[K1][K2][K3]>;

/**
 * Apply multiple operators in sequence using mergeScan
 * @param accumulator - Accumulator function returning observable
 * @param seed - Initial seed value
 * @param concurrent - Maximum concurrent inner subscriptions
 * @returns Operator function applying mergeScan pattern
 */
function mergeScan<T, R>(
  accumulator: (acc: R, value: T, index: number) => ObservableInput<R>,
  seed: R,
  concurrent?: number
): OperatorFunction<T, R>;

/**
 * Apply switchMap pattern with accumulator
 * @param accumulator - Accumulator function returning observable
 * @param seed - Initial seed value
 * @returns Operator function applying switchScan pattern
 */
function switchScan<T, R>(
  accumulator: (acc: R, value: T, index: number) => ObservableInput<R>,
  seed: R
): OperatorFunction<T, R>;

/**
 * Count emissions that pass optional predicate
 * @param predicate - Optional predicate to filter counted emissions
 * @returns Operator function emitting count of emissions
 */
function count<T>(predicate?: (value: T, index: number) => boolean): OperatorFunction<T, number>;

/**
 * Test if all emissions satisfy predicate
 * @param predicate - Predicate function to test emissions
 * @returns Operator function emitting boolean result
 */
function every<T>(predicate: (value: T, index: number) => boolean): OperatorFunction<T, boolean>;

/**
 * Find maximum value using optional comparer
 * @param comparer - Optional comparison function
 * @returns Operator function emitting maximum value
 */
function max<T>(comparer?: (x: T, y: T) => number): OperatorFunction<T, T>;

/**
 * Find minimum value using optional comparer
 * @param comparer - Optional comparison function
 * @returns Operator function emitting minimum value
 */
function min<T>(comparer?: (x: T, y: T) => number): OperatorFunction<T, T>;

/**
 * Reduce emissions to single accumulated value
 * @param accumulator - Accumulator function
 * @param seed - Initial seed value
 * @returns Operator function emitting final accumulated value
 */
function reduce<T, R>(
  accumulator: (acc: R, value: T, index: number) => R,
  seed: R
): OperatorFunction<T, R>;
function reduce<T>(accumulator: (acc: T, value: T, index: number) => T): OperatorFunction<T, T>;

/**
 * Collect all emissions into array
 * @returns Operator function emitting array of all values
 */
function toArray<T>(): OperatorFunction<T, T[]>;

/**
 * Materialize notifications as emission objects
 * @returns Operator function emitting notification objects
 */
function materialize<T>(): OperatorFunction<T, Notification<T>>;

/**
 * Dematerialize notification objects back to emissions
 * @returns Operator function converting notifications to emissions
 */
function dematerialize<T>(): OperatorFunction<Notification<T>, T>;

/**
 * Add timestamp to each emission
 * @param timestampProvider - Provider for timestamp values
 * @returns Operator function adding timestamps
 */
function timestamp<T>(timestampProvider?: TimestampProvider): OperatorFunction<T, Timestamp<T>>;

/**
 * Add time interval between emissions
 * @param timestampProvider - Provider for timestamp values
 * @returns Operator function adding time intervals
 */
function timeInterval<T>(timestampProvider?: TimestampProvider): OperatorFunction<T, TimeInterval<T>>;

Types

interface GroupedObservable<K, T> extends Observable<T> {
  readonly key: K;
}

type OperatorFunction<T, R> = (source: Observable<T>) => Observable<R>;
type ObservableInput<T> = Observable<T> | Promise<T> | Iterable<T>;

Install with Tessl CLI

npx tessl i tessl/npm-rxjs

docs

ajax-operations.md

combination-operators.md

core-types.md

error-handling.md

fetch-operations.md

filtering-operators.md

index.md

observable-creation.md

schedulers.md

subjects.md

testing-utilities.md

transformation-operators.md

websocket-operations.md

tile.json