Library for composing asynchronous and event-based operations in JavaScript using Observable sequences and fluent query operators
—
Hot observables that can multicast values to multiple observers and act as both observable and observer.
Basic subject that multicasts values to all subscribed observers.
/**
* Creates a subject that multicasts values to multiple observers
* @constructor
* @returns {Subject} New subject instance
*/
function Subject();
// Static creation method
/**
* Creates an anonymous subject with separate observer and observable
* @param {Observer} [observer] - Observer to send values to
* @param {Observable} [observable] - Observable to subscribe to
* @returns {AnonymousSubject} Anonymous subject instance
*/
Rx.Subject.create = function(observer, observable);Usage Example:
var subject = new Rx.Subject();
// Subscribe multiple observers
subject.subscribe(function(x) { console.log('Observer 1: ' + x); });
subject.subscribe(function(x) { console.log('Observer 2: ' + x); });
// Emit values to all observers
subject.onNext(42);
subject.onNext(56);
subject.onCompleted();Subject that requires an initial value and emits current value to new subscribers.
/**
* Creates a behavior subject with an initial value
* @constructor
* @param {*} initialValue - Initial value to emit to subscribers
* @returns {BehaviorSubject} New behavior subject instance
*/
function BehaviorSubject(initialValue);
/**
* Gets the current value of the behavior subject
* @returns {*} Current value
*/
BehaviorSubject.prototype.getValue = function();Usage Example:
var behaviorSubject = new Rx.BehaviorSubject(42);
behaviorSubject.subscribe(function(x) {
console.log('Subscriber 1: ' + x);
});
// Immediately emits: Subscriber 1: 42
behaviorSubject.onNext(56);
// Emits: Subscriber 1: 56
behaviorSubject.subscribe(function(x) {
console.log('Subscriber 2: ' + x);
});
// Immediately emits: Subscriber 2: 56Subject that replays a specified number of values to new subscribers.
/**
* Creates a replay subject that buffers values for new subscribers
* @constructor
* @param {number} [bufferSize] - Maximum number of values to buffer (unlimited if not specified)
* @param {number} [windowSize] - Maximum time window in milliseconds to buffer values
* @param {Scheduler} [scheduler] - Scheduler to use for time-based operations
* @returns {ReplaySubject} New replay subject instance
*/
function ReplaySubject(bufferSize, windowSize, scheduler);Usage Example:
var replaySubject = new Rx.ReplaySubject(2); // Buffer last 2 values
replaySubject.onNext(1);
replaySubject.onNext(2);
replaySubject.onNext(3);
replaySubject.subscribe(function(x) {
console.log('Late subscriber: ' + x);
});
// Immediately emits: Late subscriber: 2, Late subscriber: 3Subject that only emits the last value when the sequence completes.
/**
* Creates an async subject that emits only the last value on completion
* @constructor
* @returns {AsyncSubject} New async subject instance
*/
function AsyncSubject();Usage Example:
var asyncSubject = new Rx.AsyncSubject();
asyncSubject.subscribe(function(x) {
console.log('Result: ' + x);
});
asyncSubject.onNext(1);
asyncSubject.onNext(2);
asyncSubject.onNext(3);
// Nothing emitted yet
asyncSubject.onCompleted();
// Now emits: Result: 3Subject created with separate observer and observable components.
/**
* Anonymous subject with separate observer and observable
* @constructor
* @param {Observer} observer - Observer to send values to
* @param {Observable} observable - Observable to subscribe to
* @returns {AnonymousSubject} Anonymous subject instance
*/
function AnonymousSubject(observer, observable);All subjects implement both Observable and Observer interfaces:
/**
* Sends a value to all subscribed observers
* @param {*} value - Value to emit
*/
subject.onNext = function(value);
/**
* Sends an error to all subscribed observers and terminates the sequence
* @param {*} error - Error to emit
*/
subject.onError = function(error);
/**
* Signals completion to all subscribed observers and terminates the sequence
*/
subject.onCompleted = function();/**
* Subscribes an observer to the subject
* @param {Observer|function} observer - Observer or onNext callback
* @param {function} [onError] - Error callback
* @param {function} [onCompleted] - Completion callback
* @returns {Disposable} Subscription disposable
*/
subject.subscribe = function(observer, onError, onCompleted);/**
* Checks if the subject has any observers
* @returns {boolean} True if subject has observers
*/
subject.hasObservers = function();
/**
* Disposes the subject and releases all observers
*/
subject.dispose = function();Subjects are hot observables, meaning they emit values regardless of whether anyone is subscribed:
Usage Example:
// Cold observable - creates new sequence for each subscriber
var cold = Rx.Observable.interval(1000);
// Hot observable - shares single sequence among all subscribers
var subject = new Rx.Subject();
Rx.Observable.interval(1000).subscribe(subject);
// Multiple subscriptions to subject share the same sequence
subject.subscribe(observer1);
subject.subscribe(observer2);Subjects enable converting cold observables to hot using multicast operators:
/**
* Multicasts the source sequence using a subject
* @param {Subject|function} subjectOrSelector - Subject or subject factory function
* @param {function} [selector] - Selector function for connection lifetime
* @returns {ConnectableObservable} Connectable observable that multicasts through subject
*/
observable.multicast = function(subjectOrSelector, selector);
/**
* Shares the source sequence using a subject (shorthand for multicast + refCount)
* @returns {Observable} Observable sequence shared among multiple subscribers
*/
observable.share = function();