CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-rx

Library for composing asynchronous and event-based operations in JavaScript using Observable sequences and fluent query operators

Pending
Overview
Eval results
Files

subjects.mddocs/

Subject Types

Hot observables that can multicast values to multiple observers and act as both observable and observer.

Capabilities

Subject

Basic subject that multicasts values to all subscribed observers.

/**
 * Creates a subject that multicasts values to multiple observers
 * @constructor
 * @returns {Subject} New subject instance
 */
function Subject();

// Static creation method
/**
 * Creates an anonymous subject with separate observer and observable
 * @param {Observer} [observer] - Observer to send values to
 * @param {Observable} [observable] - Observable to subscribe to
 * @returns {AnonymousSubject} Anonymous subject instance
 */
Rx.Subject.create = function(observer, observable);

Usage Example:

var subject = new Rx.Subject();

// Subscribe multiple observers
subject.subscribe(function(x) { console.log('Observer 1: ' + x); });
subject.subscribe(function(x) { console.log('Observer 2: ' + x); });

// Emit values to all observers
subject.onNext(42);
subject.onNext(56);
subject.onCompleted();

Behavior Subject

Subject that requires an initial value and emits current value to new subscribers.

/**
 * Creates a behavior subject with an initial value
 * @constructor
 * @param {*} initialValue - Initial value to emit to subscribers
 * @returns {BehaviorSubject} New behavior subject instance
 */
function BehaviorSubject(initialValue);

/**
 * Gets the current value of the behavior subject
 * @returns {*} Current value
 */
BehaviorSubject.prototype.getValue = function();

Usage Example:

var behaviorSubject = new Rx.BehaviorSubject(42);

behaviorSubject.subscribe(function(x) { 
  console.log('Subscriber 1: ' + x); 
});
// Immediately emits: Subscriber 1: 42

behaviorSubject.onNext(56);
// Emits: Subscriber 1: 56

behaviorSubject.subscribe(function(x) { 
  console.log('Subscriber 2: ' + x); 
});
// Immediately emits: Subscriber 2: 56

Replay Subject

Subject that replays a specified number of values to new subscribers.

/**
 * Creates a replay subject that buffers values for new subscribers
 * @constructor
 * @param {number} [bufferSize] - Maximum number of values to buffer (unlimited if not specified)
 * @param {number} [windowSize] - Maximum time window in milliseconds to buffer values
 * @param {Scheduler} [scheduler] - Scheduler to use for time-based operations
 * @returns {ReplaySubject} New replay subject instance
 */
function ReplaySubject(bufferSize, windowSize, scheduler);

Usage Example:

var replaySubject = new Rx.ReplaySubject(2); // Buffer last 2 values

replaySubject.onNext(1);
replaySubject.onNext(2);
replaySubject.onNext(3);

replaySubject.subscribe(function(x) { 
  console.log('Late subscriber: ' + x); 
});
// Immediately emits: Late subscriber: 2, Late subscriber: 3

Async Subject

Subject that only emits the last value when the sequence completes.

/**
 * Creates an async subject that emits only the last value on completion
 * @constructor
 * @returns {AsyncSubject} New async subject instance
 */
function AsyncSubject();

Usage Example:

var asyncSubject = new Rx.AsyncSubject();

asyncSubject.subscribe(function(x) { 
  console.log('Result: ' + x); 
});

asyncSubject.onNext(1);
asyncSubject.onNext(2);
asyncSubject.onNext(3);
// Nothing emitted yet

asyncSubject.onCompleted();
// Now emits: Result: 3

Anonymous Subject

Subject created with separate observer and observable components.

/**
 * Anonymous subject with separate observer and observable
 * @constructor
 * @param {Observer} observer - Observer to send values to
 * @param {Observable} observable - Observable to subscribe to
 * @returns {AnonymousSubject} Anonymous subject instance
 */
function AnonymousSubject(observer, observable);

Subject Interface Methods

All subjects implement both Observable and Observer interfaces:

Observer Methods

/**
 * Sends a value to all subscribed observers
 * @param {*} value - Value to emit
 */
subject.onNext = function(value);

/**
 * Sends an error to all subscribed observers and terminates the sequence
 * @param {*} error - Error to emit
 */
subject.onError = function(error);

/**
 * Signals completion to all subscribed observers and terminates the sequence
 */
subject.onCompleted = function();

Observable Methods

/**
 * Subscribes an observer to the subject
 * @param {Observer|function} observer - Observer or onNext callback
 * @param {function} [onError] - Error callback
 * @param {function} [onCompleted] - Completion callback
 * @returns {Disposable} Subscription disposable
 */
subject.subscribe = function(observer, onError, onCompleted);

Subject-Specific Methods

/**
 * Checks if the subject has any observers
 * @returns {boolean} True if subject has observers
 */
subject.hasObservers = function();

/**
 * Disposes the subject and releases all observers
 */
subject.dispose = function();

Hot vs Cold Observables

Subjects are hot observables, meaning they emit values regardless of whether anyone is subscribed:

Usage Example:

// Cold observable - creates new sequence for each subscriber
var cold = Rx.Observable.interval(1000);

// Hot observable - shares single sequence among all subscribers
var subject = new Rx.Subject();
Rx.Observable.interval(1000).subscribe(subject);

// Multiple subscriptions to subject share the same sequence
subject.subscribe(observer1);
subject.subscribe(observer2);

Multicast Patterns

Subjects enable converting cold observables to hot using multicast operators:

/**
 * Multicasts the source sequence using a subject
 * @param {Subject|function} subjectOrSelector - Subject or subject factory function
 * @param {function} [selector] - Selector function for connection lifetime
 * @returns {ConnectableObservable} Connectable observable that multicasts through subject
 */
observable.multicast = function(subjectOrSelector, selector);

/**
 * Shares the source sequence using a subject (shorthand for multicast + refCount)
 * @returns {Observable} Observable sequence shared among multiple subscribers
 */
observable.share = function();

Install with Tessl CLI

npx tessl i tessl/npm-rx

docs

async-operations.md

disposables.md

index.md

observable-aggregation.md

observable-combination.md

observable-creation.md

observable-filtering.md

observable-transformation.md

schedulers.md

subjects.md

testing.md

time-operations.md

tile.json