Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences
—
Operators for selecting specific values from observable streams based on various criteria including predicates, time windows, and positional filters.
Filter values based on a predicate function.
/**
* Filter values based on a predicate function
* @param predicate - Function that returns true for values to keep
* @returns Operator function filtering the source observable
*/
function filter<T>(predicate: (value: T, index: number) => boolean): OperatorFunction<T, T>;
function filter<T, S extends T>(predicate: (value: T, index: number) => value is S): OperatorFunction<T, S>;Usage Examples:
import { of, filter } from "rxjs";
// Filter even numbers
of(1, 2, 3, 4, 5, 6).pipe(
filter(x => x % 2 === 0)
).subscribe(x => console.log(x)); // 2, 4, 6
// Filter objects by property
of(
{ name: 'Alice', age: 25 },
{ name: 'Bob', age: 17 },
{ name: 'Charlie', age: 30 }
).pipe(
filter(person => person.age >= 18)
).subscribe(person => console.log(person.name)); // Alice, CharlieTake only the first n values.
/**
* Take only the first count values from the source
* @param count - Number of values to take
* @returns Operator function taking first count values
*/
function take<T>(count: number): OperatorFunction<T, T>;Usage Examples:
import { interval, take } from "rxjs";
// Take first 5 values from interval
interval(1000).pipe(
take(5)
).subscribe(x => console.log(x)); // 0, 1, 2, 3, 4 (then completes)Skip the first n values.
/**
* Skip the first count values from the source
* @param count - Number of values to skip
* @returns Operator function skipping first count values
*/
function skip<T>(count: number): OperatorFunction<T, T>;Usage Examples:
import { of, skip } from "rxjs";
// Skip first 2 values
of(1, 2, 3, 4, 5).pipe(
skip(2)
).subscribe(x => console.log(x)); // 3, 4, 5Emit only the first value (or first matching a condition).
/**
* Emit only the first value or first value matching predicate
* @param predicate - Optional predicate function
* @param defaultValue - Default value if no match found
* @returns Operator function emitting first matching value
*/
function first<T>(predicate?: (value: T, index: number) => boolean): OperatorFunction<T, T>;
function first<T, D>(predicate: (value: T, index: number) => boolean, defaultValue: D): OperatorFunction<T, T | D>;
function first<T, D>(predicate: null | undefined, defaultValue: D): OperatorFunction<T, T | D>;Usage Examples:
import { of, first } from "rxjs";
// First value
of(1, 2, 3).pipe(first()).subscribe(x => console.log(x)); // 1
// First even number
of(1, 3, 4, 5, 6).pipe(
first(x => x % 2 === 0)
).subscribe(x => console.log(x)); // 4
// With default value
of(1, 3, 5).pipe(
first(x => x % 2 === 0, -1)
).subscribe(x => console.log(x)); // -1 (no even numbers found)Emit only the last value (or last matching a condition).
/**
* Emit only the last value or last value matching predicate
* @param predicate - Optional predicate function
* @param defaultValue - Default value if no match found
* @returns Operator function emitting last matching value
*/
function last<T>(predicate?: (value: T, index: number) => boolean): OperatorFunction<T, T>;
function last<T, D>(predicate: (value: T, index: number) => boolean, defaultValue: D): OperatorFunction<T, T | D>;
function last<T, D>(predicate: null | undefined, defaultValue: D): OperatorFunction<T, T | D>;Usage Examples:
import { of, last } from "rxjs";
// Last value
of(1, 2, 3).pipe(last()).subscribe(x => console.log(x)); // 3
// Last even number
of(1, 2, 4, 5, 6, 7).pipe(
last(x => x % 2 === 0)
).subscribe(x => console.log(x)); // 6Emit only distinct values.
/**
* Emit only values that are distinct from previous values
* @param keySelector - Optional function to select comparison key
* @param flushes - Optional observable that flushes the distinct key collection
* @returns Operator function emitting distinct values
*/
function distinct<T>(keySelector?: (value: T) => any): OperatorFunction<T, T>;
function distinct<T>(keySelector: (value: T) => any, flushes: Observable<any>): OperatorFunction<T, T>;Usage Examples:
import { of, distinct } from "rxjs";
// Distinct numbers
of(1, 1, 2, 2, 3, 1).pipe(
distinct()
).subscribe(x => console.log(x)); // 1, 2, 3
// Distinct by property
of(
{ id: 1, name: 'Alice' },
{ id: 2, name: 'Bob' },
{ id: 1, name: 'Alice' },
{ id: 3, name: 'Charlie' }
).pipe(
distinct(person => person.id)
).subscribe(person => console.log(person.name)); // Alice, Bob, CharlieEmit only when current value differs from previous value.
/**
* Emit only when current value differs from the previous value
* @param compare - Optional comparison function
* @param keySelector - Optional key selector function
* @returns Operator function emitting when value changes
*/
function distinctUntilChanged<T>(compare?: (previous: T, current: T) => boolean): OperatorFunction<T, T>;
function distinctUntilChanged<T, K>(compare: (previous: K, current: K) => boolean, keySelector: (value: T) => K): OperatorFunction<T, T>;Usage Examples:
import { of, distinctUntilChanged } from "rxjs";
// Consecutive duplicates removed
of(1, 1, 2, 2, 2, 1, 3).pipe(
distinctUntilChanged()
).subscribe(x => console.log(x)); // 1, 2, 1, 3
// Custom comparison
of(
{ id: 1, name: 'Alice' },
{ id: 1, name: 'Alice Updated' },
{ id: 2, name: 'Bob' }
).pipe(
distinctUntilChanged((prev, curr) => prev.id === curr.id)
).subscribe(person => console.log(person)); // First Alice, Bob onlyEmit values only after a silence period.
/**
* Emit values only after specified silence duration
* @param dueTime - Silence duration in milliseconds
* @param scheduler - Optional scheduler
* @returns Operator function debouncing by time
*/
function debounceTime<T>(dueTime: number, scheduler?: SchedulerLike): OperatorFunction<T, T>;Usage Examples:
import { fromEvent, debounceTime, map } from "rxjs";
// Debounce user input
const searchInput = document.getElementById('search');
fromEvent(searchInput, 'input').pipe(
map(event => event.target.value),
debounceTime(300) // Wait 300ms after user stops typing
).subscribe(searchTerm => {
console.log('Search for:', searchTerm);
// Perform search here
});Emit values at most once per time period.
/**
* Emit values at most once per specified time period
* @param duration - Throttle duration in milliseconds
* @param scheduler - Optional scheduler
* @param config - Throttle configuration
* @returns Operator function throttling by time
*/
function throttleTime<T>(
duration: number,
scheduler?: SchedulerLike,
config?: ThrottleConfig
): OperatorFunction<T, T>;
interface ThrottleConfig {
leading?: boolean;
trailing?: boolean;
}Usage Examples:
import { fromEvent, throttleTime } from "rxjs";
// Throttle mouse moves
fromEvent(document, 'mousemove').pipe(
throttleTime(100) // At most once per 100ms
).subscribe(event => {
console.log('Mouse position:', event.clientX, event.clientY);
});
// Throttle with trailing emission
fromEvent(button, 'click').pipe(
throttleTime(1000, undefined, { leading: true, trailing: true })
).subscribe(() => console.log('Button clicked (throttled)'));Take values until another observable emits.
/**
* Take values until notifier observable emits
* @param notifier - Observable that terminates the source
* @returns Operator function taking until notifier emits
*/
function takeUntil<T>(notifier: ObservableInput<any>): OperatorFunction<T, T>;Usage Examples:
import { interval, fromEvent, takeUntil } from "rxjs";
// Take interval values until button click
const stop$ = fromEvent(document.getElementById('stop'), 'click');
interval(1000).pipe(
takeUntil(stop$)
).subscribe(x => console.log('Timer:', x));
// Stops when stop button is clickedTake values while condition is true.
/**
* Take values while predicate returns true
* @param predicate - Function returning boolean condition
* @param inclusive - Whether to include value that failed predicate
* @returns Operator function taking while condition is true
*/
function takeWhile<T>(predicate: (value: T, index: number) => boolean, inclusive?: boolean): OperatorFunction<T, T>;Usage Examples:
import { of, takeWhile } from "rxjs";
// Take while less than 5
of(1, 2, 3, 4, 5, 6, 7).pipe(
takeWhile(x => x < 5)
).subscribe(x => console.log(x)); // 1, 2, 3, 4
// Include the failing value
of(1, 2, 3, 4, 5, 6, 7).pipe(
takeWhile(x => x < 5, true)
).subscribe(x => console.log(x)); // 1, 2, 3, 4, 5/**
* Skip values until notifier observable emits
* @param notifier - Observable that starts emitting values
* @returns Operator function skipping until notifier emits
*/
function skipUntil<T>(notifier: ObservableInput<any>): OperatorFunction<T, T>;
/**
* Skip values while predicate returns true
* @param predicate - Function returning boolean condition
* @returns Operator function skipping while condition is true
*/
function skipWhile<T>(predicate: (value: T, index: number) => boolean): OperatorFunction<T, T>;/**
* Emit value at specific index
* @param index - Zero-based index of desired value
* @param defaultValue - Default value if index not found
* @returns Operator function emitting value at index
*/
function elementAt<T>(index: number): OperatorFunction<T, T>;
function elementAt<T, D>(index: number, defaultValue: D): OperatorFunction<T, T | D>;
/**
* Emit single value that matches predicate
* @param predicate - Optional predicate function
* @returns Operator function emitting single matching value
*/
function single<T>(predicate?: (value: T, index: number) => boolean): OperatorFunction<T, T>;/**
* Emit default value if the source observable is empty
* @param defaultValue - Value to emit if source is empty
* @returns Operator function providing default value
*/
function defaultIfEmpty<T, R>(defaultValue: R): OperatorFunction<T, T | R>;
/**
* Emit true if source is empty, false otherwise
* @returns Operator function checking if source is empty
*/
function isEmpty<T>(): OperatorFunction<T, boolean>;Usage Examples:
import { EMPTY, of, defaultIfEmpty, isEmpty } from "rxjs";
// Empty source gets default value
EMPTY.pipe(
defaultIfEmpty('No data available')
).subscribe(x => console.log(x)); // 'No data available'
// Non-empty source passes through
of(1, 2, 3).pipe(
defaultIfEmpty('No data')
).subscribe(x => console.log(x)); // 1, 2, 3
// Check if empty
of(1, 2, 3).pipe(isEmpty()).subscribe(x => console.log('Empty:', x)); // false
EMPTY.pipe(isEmpty()).subscribe(x => console.log('Empty:', x)); // true/**
* Sample values when another observable emits
* @param notifier - Observable that triggers sampling
* @returns Operator function sampling on notifier emissions
*/
function sample<T>(notifier: ObservableInput<any>): OperatorFunction<T, T>;
/**
* Sample values at regular time intervals
* @param period - Sample period in milliseconds
* @param scheduler - Optional scheduler
* @returns Operator function sampling by time
*/
function sampleTime<T>(period: number, scheduler?: SchedulerLike): OperatorFunction<T, T>;
/**
* Audit values using another observable
* @param durationSelector - Function returning observable for audit duration
* @returns Operator function auditing with duration selector
*/
function audit<T>(durationSelector: (value: T) => ObservableInput<any>): OperatorFunction<T, T>;
/**
* Audit values for specified time duration
* @param duration - Audit duration in milliseconds
* @param scheduler - Optional scheduler
* @returns Operator function auditing by time
*/
function auditTime<T>(duration: number, scheduler?: SchedulerLike): OperatorFunction<T, T>;
/**
* Ignore all emitted values, only pass through completion and error
* @returns Operator function ignoring all values
*/
function ignoreElements<T>(): OperatorFunction<T, never>;
/**
* Find first emission matching predicate
* @param predicate - Predicate function to match emissions
* @param defaultValue - Default value if no match found
* @returns Operator function emitting first matching value
*/
function find<T>(predicate: (value: T, index: number) => boolean, defaultValue?: T): OperatorFunction<T, T | undefined>;
/**
* Find index of first emission matching predicate
* @param predicate - Predicate function to match emissions
* @returns Operator function emitting index of first match
*/
function findIndex<T>(predicate: (value: T, index: number) => boolean): OperatorFunction<T, number>;
/**
* Skip last n emissions
* @param skipCount - Number of emissions to skip from end
* @returns Operator function skipping last emissions
*/
function skipLast<T>(skipCount: number): OperatorFunction<T, T>;
/**
* Take last n emissions
* @param count - Number of emissions to take from end
* @returns Operator function taking last emissions
*/
function takeLast<T>(count: number): OperatorFunction<T, T>;
/**
* Get element at specified index
* @param index - Index of element to get
* @param defaultValue - Default value if index out of bounds
* @returns Operator function emitting element at index
*/
function elementAt<T>(index: number, defaultValue?: T): OperatorFunction<T, T>;
/**
* Test if observable is empty
* @returns Operator function emitting boolean indicating if empty
*/
function isEmpty<T>(): OperatorFunction<T, boolean>;
/**
* Emit only single value, error if more than one
* @param predicate - Optional predicate to filter single value
* @returns Operator function emitting single value
*/
function single<T>(predicate?: (value: T, index: number) => boolean): OperatorFunction<T, T>;interface ThrottleConfig {
leading?: boolean;
trailing?: boolean;
}
type OperatorFunction<T, R> = (source: Observable<T>) => Observable<R>;
type MonoTypeOperatorFunction<T> = OperatorFunction<T, T>;