Library for composing asynchronous and event-based operations in JavaScript using Observable sequences and fluent query operators
npx @tessl/cli install tessl/npm-rx@4.1.0RxJS is a comprehensive library for composing asynchronous and event-based programs using observable sequences and fluent query operators. It unifies Promises, callbacks, and evented data sources into a consistent reactive programming model, enabling powerful composition patterns for handling complex asynchronous operations, error handling, cancellation, and synchronization in both browser and Node.js environments.
npm install rxvar Rx = require('rx');For browser environments:
<script src="rx.all.js"></script>For selective module loading:
var Rx = require('rx');
require('rx/dist/rx.aggregates');
require('rx/dist/rx.async');
require('rx/dist/rx.time');var Rx = require('rx');
// Create an observable from an array
var source = Rx.Observable.fromArray([1, 2, 3, 4, 5]);
// Transform and filter the data
var subscription = source
.filter(function(x) { return x % 2 === 0; })
.map(function(x) { return x * x; })
.subscribe(
function(x) { console.log('Next: ' + x); },
function(err) { console.log('Error: ' + err); },
function() { console.log('Completed'); }
);
// Clean up subscription
subscription.dispose();RxJS is built around several key components:
Core methods for creating observable sequences from various sources including arrays, events, callbacks, and custom functions.
// Static creation methods
Rx.Observable.create(function(observer) { /* ... */ });
Rx.Observable.fromArray([1, 2, 3]);
Rx.Observable.range(1, 10);
Rx.Observable.interval(1000);
Rx.Observable.timer(2000, 1000);Operators for transforming observable sequences including map, flatMap, scan, and buffer operations.
// Instance transformation methods
observable.map(function(x) { return x * 2; });
observable.flatMap(function(x) { return Rx.Observable.range(0, x); });
observable.scan(function(acc, x) { return acc + x; }, 0);
observable.buffer(Rx.Observable.interval(1000));Operators for filtering observable sequences including filter, take, skip, distinct, and debounce operations.
// Instance filtering methods
observable.filter(function(x) { return x > 5; });
observable.take(10);
observable.takeUntil(stopSignal);
observable.skip(5);
observable.distinctUntilChanged();Operators for combining multiple observable sequences including merge, zip, combineLatest, and concat operations.
// Static combination methods
Rx.Observable.merge(obs1, obs2, obs3);
Rx.Observable.zip(obs1, obs2, function(x, y) { return x + y; });
Rx.Observable.combineLatest(obs1, obs2, function(x, y) { return {x: x, y: y}; });
// Instance combination methods
observable.merge(otherObservable);
observable.concat(nextObservable);Operators for aggregating observable sequences including reduce, count, min, max, and statistical operations.
// Aggregation methods
observable.reduce(function(acc, x) { return acc + x; }, 0);
observable.count();
observable.min();
observable.max();
observable.average();
observable.sum();Methods for working with asynchronous patterns including callbacks, events, and promises.
// Async creation methods
Rx.Observable.fromCallback(fs.readFile);
Rx.Observable.fromEvent(button, 'click');
Rx.Observable.fromPromise(fetch('/api/data'));
Rx.Observable.spawn(generatorFunction);Operators for time-based operations including delay, throttle, debounce, and interval scheduling.
// Time-based methods
observable.delay(1000);
observable.debounce(500);
observable.throttle(100);
observable.timeout(5000);
observable.sample(Rx.Observable.interval(1000));Hot observables that can multicast values to multiple observers and act as both observable and observer.
// Subject types
var subject = new Rx.Subject();
var behaviorSubject = new Rx.BehaviorSubject(initialValue);
var replaySubject = new Rx.ReplaySubject(bufferSize);
var asyncSubject = new Rx.AsyncSubject();Control timing and concurrency of observable operations with different scheduling strategies.
// Scheduler types
Rx.Scheduler.immediate;
Rx.Scheduler.currentThread;
Rx.Scheduler.default;
// Scheduling methods
scheduler.schedule(state, action);
scheduler.scheduleFuture(state, dueTime, action);
scheduler.scheduleRecursive(state, action);Testing utilities for unit testing reactive code with virtual time and marble testing.
// Testing utilities
var testScheduler = new Rx.TestScheduler();
var hotObservable = testScheduler.createHotObservable(/* records */);
var coldObservable = testScheduler.createColdObservable(/* records */);Resource management objects for cleaning up subscriptions and managing the lifetime of observables and observers.
// Disposable types
var disposable = Rx.Disposable.create(action);
var composite = new Rx.CompositeDisposable();
var serial = new Rx.SerialDisposable();
var singleAssignment = new Rx.SingleAssignmentDisposable();
var refCount = new Rx.RefCountDisposable(disposable);// Core interfaces (documentation purposes - JavaScript runtime doesn't have these)
interface Observable<T> {
subscribe(observer: Observer<T>): Disposable;
subscribe(onNext?: function, onError?: function, onCompleted?: function): Disposable;
}
interface Observer<T> {
onNext(value: T): void;
onError(error: any): void;
onCompleted(): void;
}
interface Disposable {
dispose(): void;
}
interface Subject<T> extends Observable<T>, Observer<T> {
hasObservers(): boolean;
}
interface Scheduler {
now(): number;
schedule(state: any, action: function): Disposable;
scheduleFuture(state: any, dueTime: number, action: function): Disposable;
scheduleRecursive(state: any, action: function): Disposable;
schedulePeriodic(state: any, period: number, action: function): Disposable;
}