Library for composing asynchronous and event-based operations in JavaScript using Observable sequences and fluent query operators
—
Core methods for creating observable sequences from various sources including arrays, events, callbacks, and custom functions.
Creates an observable from a subscribe function.
/**
* Creates an observable sequence from a subscribe function
* @param {function} subscribe - Function that accepts an observer and returns a disposable
* @returns {Observable} New observable sequence
*/
Rx.Observable.create = function(subscribe);Usage Example:
var source = Rx.Observable.create(function(observer) {
observer.onNext(42);
observer.onNext(56);
observer.onCompleted();
// Return cleanup function
return function() {
console.log('disposed');
};
});Defers observable creation until subscription time.
/**
* Defers observable creation until subscription
* @param {function} observableFactory - Factory function that returns an observable
* @returns {Observable} Deferred observable
*/
Rx.Observable.defer = function(observableFactory);Creates an empty observable that immediately completes.
/**
* Creates an empty observable sequence
* @param {Scheduler} [scheduler] - Scheduler to send completion notification on
* @returns {Observable} Empty observable sequence
*/
Rx.Observable.empty = function(scheduler);Creates an observable that never emits or completes.
/**
* Creates an observable sequence that never terminates
* @returns {Observable} Non-terminating observable sequence
*/
Rx.Observable.never = function();Creates an observable that emits a single value.
/**
* Creates an observable that emits a single value then completes
* @param {*} value - Value to emit
* @param {Scheduler} [scheduler] - Scheduler to send value on
* @returns {Observable} Observable sequence containing the single value
*/
Rx.Observable.return = function(value, scheduler);
Rx.Observable.just = function(value, scheduler); // Alias for returnCreates an observable that immediately emits an error.
/**
* Creates an observable that immediately emits an error
* @param {*} error - Error to emit
* @param {Scheduler} [scheduler] - Scheduler to send error on
* @returns {Observable} Observable sequence that terminates with error
*/
Rx.Observable.throw = function(error, scheduler);Creates an observable sequence of integers within a range.
/**
* Creates an observable sequence of integers within a specified range
* @param {number} start - Start value of the range
* @param {number} count - Number of sequential integers to generate
* @param {Scheduler} [scheduler] - Scheduler to send values on
* @returns {Observable} Observable sequence containing range of integers
*/
Rx.Observable.range = function(start, count, scheduler);Usage Example:
var source = Rx.Observable.range(1, 5);
// Emits: 1, 2, 3, 4, 5Creates an observable that repeats a value.
/**
* Creates an observable that repeats the given value
* @param {*} value - Value to repeat
* @param {number} [repeatCount] - Number of times to repeat (infinite if not specified)
* @param {Scheduler} [scheduler] - Scheduler to send values on
* @returns {Observable} Observable sequence repeating the value
*/
Rx.Observable.repeat = function(value, repeatCount, scheduler);Generates an observable sequence by running a state-driven loop.
/**
* Generates an observable sequence by running a state-driven loop
* @param {*} initialState - Initial state
* @param {function} condition - Condition to continue iteration
* @param {function} iterate - State update function
* @param {function} resultSelector - Result selection function
* @param {Scheduler} [scheduler] - Scheduler to send values on
* @returns {Observable} Generated observable sequence
*/
Rx.Observable.generate = function(initialState, condition, iterate, resultSelector, scheduler);Usage Example:
var source = Rx.Observable.generate(
0, // initial state
function(x) { return x < 3; }, // condition
function(x) { return x + 1; }, // iterate
function(x) { return x * x; } // result selector
);
// Emits: 0, 1, 4Creates an observable from an array.
/**
* Creates an observable sequence from an array
* @param {Array} array - Array to convert to observable
* @param {Scheduler} [scheduler] - Scheduler to send values on
* @returns {Observable} Observable sequence containing array elements
*/
Rx.Observable.fromArray = function(array, scheduler);Creates an observable from an iterable.
/**
* Creates an observable from an iterable object
* @param {Iterable} iterable - Iterable to convert
* @param {function} [mapFn] - Map function to transform each element
* @param {*} [thisArg] - Value to use as this when executing mapFn
* @param {Scheduler} [scheduler] - Scheduler to send values on
* @returns {Observable} Observable sequence from iterable
*/
Rx.Observable.from = function(iterable, mapFn, thisArg, scheduler);Creates an observable from the arguments.
/**
* Creates an observable sequence from the arguments
* @param {...*} args - Arguments to emit as values
* @returns {Observable} Observable sequence containing the arguments
*/
Rx.Observable.of = function(...args);
/**
* Creates an observable sequence from arguments with scheduler
* @param {Scheduler} scheduler - Scheduler to send values on
* @param {...*} args - Arguments to emit as values
* @returns {Observable} Observable sequence containing the arguments
*/
Rx.Observable.ofWithScheduler = function(scheduler, ...args);Creates an observable from object key-value pairs.
/**
* Creates an observable of key-value pairs from an object
* @param {Object} obj - Object to convert to key-value pairs
* @param {Scheduler} [scheduler] - Scheduler to send values on
* @returns {Observable} Observable sequence of [key, value] pairs
*/
Rx.Observable.pairs = function(obj, scheduler);Usage Example:
var source = Rx.Observable.pairs({a: 1, b: 2});
// Emits: ['a', 1], ['b', 2]Creates an observable with automatic resource management.
/**
* Creates an observable with resource management
* @param {function} resourceFactory - Function that returns a disposable resource
* @param {function} observableFactory - Function that uses the resource to create an observable
* @returns {Observable} Observable sequence with automatic resource cleanup
*/
Rx.Observable.using = function(resourceFactory, observableFactory);Usage Example:
var source = Rx.Observable.using(
function() { return new FileHandle(); }, // resource factory
function(resource) { // observable factory
return Rx.Observable.fromCallback(resource.read.bind(resource));
}
);Runs all observable sequences in parallel and collects their last emitted values.
/**
* Runs all observable sequences in parallel and collects their last values
* @param {...Observable} sources - Observable sequences to combine
* @returns {Observable} Observable sequence with combined last values as array
*/
Rx.Observable.forkJoin = function(...sources);
/**
* Alternative signature with result selector
* @param {Array<Observable>} sources - Array of observable sequences
* @param {function} [resultSelector] - Function to transform the result array
* @returns {Observable} Observable sequence with combined result
*/
Rx.Observable.forkJoin = function(sources, resultSelector);Usage Example:
var source1 = Rx.Observable.timer(1000).map(function() { return 'A'; });
var source2 = Rx.Observable.timer(2000).map(function() { return 'B'; });
var combined = Rx.Observable.forkJoin(source1, source2);
// Emits: ['A', 'B'] after 2 secondsReturns one of two observables based on a condition.
/**
* Returns one of two observables based on a condition
* @param {function} condition - Function that returns a boolean
* @param {Observable} thenSource - Observable to return if condition is true
* @param {Observable} [elseSource] - Observable to return if condition is false (defaults to empty)
* @returns {Observable} Conditional observable sequence
*/
Rx.Observable.if = function(condition, thenSource, elseSource);Usage Example:
var condition = function() { return Math.random() > 0.5; };
var source = Rx.Observable.if(
condition,
Rx.Observable.just('Heads'),
Rx.Observable.just('Tails')
);Repeats source observable while condition holds true.
/**
* Repeats source observable while condition holds true
* @param {function} condition - Function that returns a boolean
* @param {Observable} source - Observable to repeat
* @returns {Observable} Observable sequence that repeats while condition is true
*/
Rx.Observable.whileDo = function(condition, source);Continues with the next observable sequence on error.
/**
* Continues with the next observable sequence on error for all sources
* @param {...Observable} sources - Observable sequences to chain
* @returns {Observable} Observable sequence that continues on error
*/
Rx.Observable.onErrorResumeNext = function(...sources);Merges all observable sequences but delays errors until all sources complete.
/**
* Merges all sources but delays error propagation until all complete
* @param {...Observable} sources - Observable sequences to merge
* @returns {Observable} Merged observable sequence with delayed errors
*/
Rx.Observable.mergeDelayError = function(...sources);Checks if an object is an observable.
/**
* Determines whether an object is an observable
* @param {*} obj - Object to test
* @returns {boolean} True if the object is an observable
*/
Rx.Observable.isObservable = function(obj);Usage Example:
var obs = Rx.Observable.just(42);
var isObs = Rx.Observable.isObservable(obs); // true
var notObs = Rx.Observable.isObservable("hello"); // falseGenerates an observable sequence with time-based state iteration.
/**
* Generates an observable sequence with absolute time
* @param {*} initialState - Initial state
* @param {function} condition - Condition to determine whether to continue iteration
* @param {function} iterate - Iteration step function
* @param {function} resultSelector - Function to select the result value
* @param {function} timeSelector - Function to select the time for each iteration
* @param {Scheduler} [scheduler] - Scheduler to run timers on
* @returns {Observable} Generated observable sequence
*/
Rx.Observable.generateWithAbsoluteTime = function(initialState, condition, iterate, resultSelector, timeSelector, scheduler);
/**
* Generates an observable sequence with relative time
* @param {*} initialState - Initial state
* @param {function} condition - Condition to determine whether to continue iteration
* @param {function} iterate - Iteration step function
* @param {function} resultSelector - Function to select the result value
* @param {function} timeSelector - Function to select the relative time for each iteration
* @param {Scheduler} [scheduler] - Scheduler to run timers on
* @returns {Observable} Generated observable sequence
*/
Rx.Observable.generateWithRelativeTime = function(initialState, condition, iterate, resultSelector, timeSelector, scheduler);