or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

ajax-operations.mdcombination-operators.mdcore-types.mderror-handling.mdfetch-operations.mdfiltering-operators.mdindex.mdobservable-creation.mdschedulers.mdsubjects.mdtesting-utilities.mdtransformation-operators.mdwebsocket-operations.md
tile.json

tessl/npm-rxjs

Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/rxjs@7.8.x

To install, run

npx @tessl/cli install tessl/npm-rxjs@7.8.0

index.mddocs/

RxJS

RxJS is the most comprehensive reactive programming library for JavaScript and TypeScript, providing powerful tools for composing asynchronous and event-based programs using observable sequences. It offers an extensive collection of operators for transforming, filtering, combining, and managing streams of data, with features including backpressure handling, error propagation, resource management, and comprehensive testing utilities.

Package Information

  • Package Name: rxjs
  • Package Type: npm
  • Language: TypeScript
  • Installation: npm install rxjs

Core Imports

import { Observable, Subject, BehaviorSubject, map, filter, mergeMap } from "rxjs";

For operators specifically:

import { map, filter, mergeMap, catchError } from "rxjs/operators";

For specialized modules:

import { ajax } from "rxjs/ajax";
import { webSocket } from "rxjs/webSocket";
import { TestScheduler } from "rxjs/testing";
import { fromFetch } from "rxjs/fetch";

CommonJS:

const { Observable, Subject, map, filter } = require("rxjs");
const { ajax } = require("rxjs/ajax");

Basic Usage

import { Observable, Subject, map, filter, mergeMap, catchError } from "rxjs";

// Create observables from various sources
const numbers$ = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

// Use operators to transform data
const processedNumbers$ = numbers$.pipe(
  map(x => x * 2),
  filter(x => x > 2),
  mergeMap(x => [x, x + 1])
);

// Subscribe to receive values
processedNumbers$.subscribe({
  next: value => console.log(value),
  error: err => console.error(err),
  complete: () => console.log('Complete!')
});

// Work with subjects for multicasting
const subject = new Subject<string>();
subject.subscribe(value => console.log('Observer 1:', value));
subject.subscribe(value => console.log('Observer 2:', value));
subject.next('Hello World');

Architecture

RxJS is built around several foundational concepts:

  • Observables: Core reactive streams that emit values over time
  • Operators: Pure functions for transforming observables (map, filter, merge, etc.)
  • Subjects: Special observables that can multicast to multiple observers
  • Schedulers: Control timing and concurrency of observable execution
  • Subscriptions: Represent execution of observables and provide cleanup mechanisms

The library uses a pull-based approach where operators create new observables rather than mutating existing ones, enabling powerful composition patterns and predictable data flow.

Capabilities

Core Observable Types

Foundation classes for reactive programming including Observable, ConnectableObservable, and GroupedObservable for different stream patterns.

class Observable<T> {
  constructor(subscribe?: (observer: Observer<T>) => TeardownLogic);
  subscribe(observer?: Partial<Observer<T>>): Subscription;
  pipe<A>(op1: OperatorFunction<T, A>): Observable<A>;
  pipe<A, B>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>): Observable<B>;
}

class ConnectableObservable<T> extends Observable<T> {
  connect(): Subscription;
  refCount(): Observable<T>;
}

class GroupedObservable<K, T> extends Observable<T> {
  readonly key: K;
}

Core Types

Subject Types

Special observables that can act as both observer and observable, enabling multicasting patterns.

class Subject<T> extends Observable<T> {
  next(value: T): void;
  error(err: any): void;
  complete(): void;
}

class BehaviorSubject<T> extends Subject<T> {
  constructor(initialValue: T);
  readonly value: T;
}

class ReplaySubject<T> extends Subject<T> {
  constructor(bufferSize?: number, windowTime?: number, timestampProvider?: TimestampProvider);
}

class AsyncSubject<T> extends Subject<T> {}

Subjects

Observable Creation

Static functions for creating observables from various sources including events, promises, iterables, and custom logic.

function from<T>(input: ObservableInput<T>): Observable<T>;
function of<T>(...args: T[]): Observable<T>;
function fromEvent<T>(target: any, eventName: string): Observable<T>;
function interval(period: number): Observable<number>;
function timer(dueTime: number | Date, period?: number): Observable<number>;
function combineLatest<T>(sources: ObservableInput<T>[]): Observable<T[]>;
function merge<T>(...sources: ObservableInput<T>[]): Observable<T>;
function forkJoin<T>(sources: ObservableInput<T>[]): Observable<T[]>;

Observable Creation

Filtering Operators

Operators for selecting specific values from observable streams based on various criteria.

function filter<T>(predicate: (value: T, index: number) => boolean): OperatorFunction<T, T>;
function take<T>(count: number): OperatorFunction<T, T>;
function skip<T>(count: number): OperatorFunction<T, T>;
function first<T>(predicate?: (value: T, index: number) => boolean): OperatorFunction<T, T>;
function last<T>(predicate?: (value: T, index: number) => boolean): OperatorFunction<T, T>;
function distinct<T>(keySelector?: (value: T) => any): OperatorFunction<T, T>;
function debounceTime<T>(dueTime: number): OperatorFunction<T, T>;
function throttleTime<T>(duration: number): OperatorFunction<T, T>;

Filtering Operators

Transformation Operators

Operators for transforming values emitted by observables into new forms and structures.

function map<T, R>(project: (value: T, index: number) => R): OperatorFunction<T, R>;
function mergeMap<T, R>(project: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R>;
function switchMap<T, R>(project: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R>;
function concatMap<T, R>(project: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R>;
function scan<T, R>(accumulator: (acc: R, value: T, index: number) => R, seed: R): OperatorFunction<T, R>;
function buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T, T[]>;
function groupBy<T, K>(keySelector: (value: T) => K): OperatorFunction<T, GroupedObservable<K, T>>;

Transformation Operators

Combination Operators

Operators for combining multiple observable streams in various ways.

function combineLatestWith<T, A>(...sources: ObservableInput<A>[]): OperatorFunction<T, [T, ...A[]]>;
function mergeWith<T>(...sources: ObservableInput<T>[]): OperatorFunction<T, T>;
function concatWith<T>(...sources: ObservableInput<T>[]): OperatorFunction<T, T>;
function startWith<T>(...values: T[]): OperatorFunction<T, T>;
function withLatestFrom<T, R>(...sources: ObservableInput<R>[]): OperatorFunction<T, [T, ...R[]]>;
function zipWith<T, A>(...sources: ObservableInput<A>[]): OperatorFunction<T, [T, ...A[]]>;

Combination Operators

Error Handling

Operators and patterns for handling errors in reactive streams with recovery mechanisms.

function catchError<T, O>(selector: (err: any, caught: Observable<T>) => O): OperatorFunction<T, T | ObservedValueOf<O>>;
function retry<T>(count?: number): OperatorFunction<T, T>;
function retryWhen<T>(notifier: (errors: Observable<any>) => Observable<any>): OperatorFunction<T, T>;

class TimeoutError extends Error {
  readonly name: "TimeoutError";
}

class EmptyError extends Error {
  readonly name: "EmptyError";
}

Error Handling

Schedulers

Control timing and concurrency of observable execution with various scheduling strategies.

interface SchedulerLike {
  schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, delay?: number, state?: T): Subscription;
}

const asyncScheduler: SchedulerLike;
const asapScheduler: SchedulerLike;
const queueScheduler: SchedulerLike;
const animationFrameScheduler: SchedulerLike;

class VirtualTimeScheduler extends AsyncScheduler {
  flush(): void;
}

Schedulers

AJAX Operations

HTTP request capabilities with full observable integration and response streaming.

function ajax(request: string | AjaxConfig): Observable<AjaxResponse<any>>;

interface AjaxConfig {
  url?: string;
  method?: string;
  headers?: Record<string, any>;
  body?: any;
  timeout?: number;
  responseType?: XMLHttpRequestResponseType;
}

class AjaxResponse<T> {
  readonly response: T;
  readonly status: number;
  readonly responseText: string;
  readonly request: AjaxConfig;
}

AJAX Operations

WebSocket Operations

Real-time bidirectional communication with WebSocket integration for reactive streams.

function webSocket<T>(urlConfigOrSource: string | WebSocketSubjectConfig<T>): WebSocketSubject<T>;

class WebSocketSubject<T> extends Subject<T> {
  multiplex<R>(
    subMsg: () => any,
    unsubMsg: () => any,
    messageFilter: (value: T) => boolean
  ): Observable<R>;
  close(): void;
}

interface WebSocketSubjectConfig<T> {
  url: string;
  protocol?: string | string[];
  serializer?: (value: T) => any;
  deserializer?: (e: MessageEvent) => T;
}

WebSocket Operations

Fetch Operations

Modern fetch-based HTTP requests with full Observable integration, streaming support, and comprehensive error handling.

function fromFetch<T>(
  input: string | Request,
  initWithSelector?: RequestInit & {
    selector?: (response: Response) => ObservableInput<T>;
  }
): Observable<T extends never ? Response : T>;

Fetch Operations

Testing Utilities

Comprehensive testing framework with marble testing and virtual time scheduling.

class TestScheduler extends VirtualTimeScheduler {
  run<T>(callback: (helpers: RunHelpers) => T): T;
  createHotObservable<T>(marbles: string, values?: any, error?: any): HotObservable<T>;
  createColdObservable<T>(marbles: string, values?: any, error?: any): ColdObservable<T>;
  expectObservable<T>(observable: Observable<T>): Expectation<T>;
}

interface RunHelpers {
  cold: typeof TestScheduler.prototype.createColdObservable;
  hot: typeof TestScheduler.prototype.createHotObservable;
  expectObservable: typeof TestScheduler.prototype.expectObservable;
  flush: typeof TestScheduler.prototype.flush;
}

Testing Utilities

Promise Conversion

Utilities for converting observables to promises for integration with async/await patterns.

function firstValueFrom<T>(source: Observable<T>): Promise<T>;
function lastValueFrom<T>(source: Observable<T>): Promise<T>;

Configuration System

Global configuration object for customizing RxJS behavior and error handling.

const config: GlobalConfig;

interface GlobalConfig {
  onUnhandledError: ((err: any) => void) | null;
  onStoppedNotification: ((notification: Notification<any>, subscriber: Subscriber<any>) => void) | null;
  Promise?: PromiseConstructorLike;
  useDeprecatedSynchronousErrorHandling: boolean;
  useDeprecatedNextContext: boolean;
}

Types

type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T>;

interface Observer<T> {
  next: (value: T) => void;
  error: (err: any) => void;
  complete: () => void;
}

interface Subscription {
  unsubscribe(): void;
  readonly closed: boolean;
}

interface OperatorFunction<T, R> {
  (source: Observable<T>): Observable<R>;
}

interface MonoTypeOperatorFunction<T> extends OperatorFunction<T, T> {}

type TeardownLogic = Subscription | Unsubscribable | (() => void) | void;