Library for composing asynchronous and event-based operations in JavaScript using Observable sequences and fluent query operators
—
Operators for transforming observable sequences including map, flatMap, scan, and buffer operations.
Transforms each element of an observable sequence.
/**
* Projects each element of an observable sequence into a new form
* @param {function} selector - Transform function to apply to each element
* @param {*} [thisArg] - Object to use as this when executing selector
* @returns {Observable} Observable sequence with transformed elements
*/
observable.map = function(selector, thisArg);
observable.select = function(selector, thisArg); // Alias for mapUsage Example:
var source = Rx.Observable.range(1, 5);
var doubled = source.map(function(x) { return x * 2; });
// Emits: 2, 4, 6, 8, 10Projects each element to an observable and flattens the result.
/**
* Projects each element to an observable and flattens the resulting sequences
* @param {function} selector - Function that transforms each element into an observable
* @param {function} [resultSelector] - Function to combine source and projected elements
* @param {*} [thisArg] - Object to use as this when executing selector
* @returns {Observable} Flattened observable sequence
*/
observable.flatMap = function(selector, resultSelector, thisArg);
observable.selectMany = function(selector, resultSelector, thisArg); // Alias
observable.mergeMap = function(selector, resultSelector, thisArg); // AliasUsage Example:
var source = Rx.Observable.range(1, 3);
var flattened = source.flatMap(function(x) {
return Rx.Observable.range(x, 2);
});
// Emits: 1, 1, 2, 2, 3, 3Projects to observables and switches to the latest one.
/**
* Projects each element to an observable and switches to the latest observable
* @param {function} selector - Function that transforms each element into an observable
* @param {function} [resultSelector] - Function to combine source and projected elements
* @param {*} [thisArg] - Object to use as this when executing selector
* @returns {Observable} Observable sequence switching to latest projection
*/
observable.flatMapLatest = function(selector, resultSelector, thisArg);
observable.switchMap = function(selector, resultSelector, thisArg); // AliasProjects to observables and concatenates them sequentially.
/**
* Projects each element to an observable and concatenates sequences sequentially
* @param {function} selector - Function that transforms each element into an observable
* @param {function} [resultSelector] - Function to combine source and projected elements
* @param {*} [thisArg] - Object to use as this when executing selector
* @returns {Observable} Concatenated observable sequence
*/
observable.flatMapConcat = function(selector, resultSelector, thisArg);
observable.concatMap = function(selector, resultSelector, thisArg); // AliasProjects to observables and ignores new projections while current is active.
/**
* Projects to observables, ignoring new ones while current observable is active
* @param {function} selector - Function that transforms each element into an observable
* @param {function} [resultSelector] - Function to combine source and projected elements
* @param {*} [thisArg] - Object to use as this when executing selector
* @returns {Observable} Observable sequence exhausting projections
*/
observable.flatMapFirst = function(selector, resultSelector, thisArg);
observable.exhaustMap = function(selector, resultSelector, thisArg); // AliasApplies an accumulator function over the sequence and returns each intermediate result.
/**
* Applies an accumulator function and emits each intermediate result
* @param {function} accumulator - Accumulator function taking current accumulation and next value
* @param {*} [seed] - Initial accumulator value
* @returns {Observable} Observable sequence of accumulated values
*/
observable.scan = function(accumulator, seed);Usage Example:
var source = Rx.Observable.range(1, 5);
var accumulated = source.scan(function(acc, x) { return acc + x; }, 0);
// Emits: 0, 1, 3, 6, 10, 15Prepends values to the beginning of the sequence.
/**
* Prepends a sequence of values to an observable sequence
* @param {...*} values - Values to prepend to the sequence
* @returns {Observable} Observable sequence with prepended values
*/
observable.startWith = function(...values);Usage Example:
var source = Rx.Observable.range(3, 3);
var withStart = source.startWith(1, 2);
// Emits: 1, 2, 3, 4, 5Retrieves the value of a specified property from each element.
/**
* Retrieves the value of a specified property from all elements
* @param {...(string|number)} properties - Property path to pluck
* @returns {Observable} Observable sequence of plucked values
*/
observable.pluck = function(...properties);Usage Example:
var source = Rx.Observable.fromArray([
{name: 'Alice', age: 25},
{name: 'Bob', age: 30}
]);
var names = source.pluck('name');
// Emits: 'Alice', 'Bob'Buffers elements into arrays of specified size.
/**
* Buffers the source observable sequence elements into arrays of specified size
* @param {number} count - Maximum element count of a buffer
* @param {number} [skip] - Number of elements to skip between creation of consecutive buffers
* @returns {Observable} Observable sequence of arrays
*/
observable.bufferWithCount = function(count, skip);
observable.bufferCount = function(count, skip); // AliasUsage Example:
var source = Rx.Observable.range(1, 10);
var buffered = source.bufferWithCount(3);
// Emits: [1,2,3], [4,5,6], [7,8,9], [10]Windows elements into observables of specified size.
/**
* Projects elements into zero or more windows of specified size
* @param {number} count - Maximum element count of a window
* @param {number} [skip] - Number of elements to skip between creation of consecutive windows
* @returns {Observable} Observable sequence of observable windows
*/
observable.windowWithCount = function(count, skip);
observable.windowCount = function(count, skip); // AliasProjects elements using separate functions for onNext, onError, and onCompleted.
/**
* Projects elements using observer method handlers
* @param {function} onNext - Function to project next values
* @param {function} onError - Function to project error values
* @param {function} onCompleted - Function to project completion
* @param {*} [thisArg] - Object to use as this when executing functions
* @returns {Observable} Projected observable sequence
*/
observable.flatMapObserver = function(onNext, onError, onCompleted, thisArg);Concatenates all inner observable sequences.
/**
* Concatenates all inner observable sequences sequentially
* @returns {Observable} Observable sequence containing concatenated elements
*/
observable.concatAll = function();Merges all inner observable sequences.
/**
* Merges all inner observable sequences into one sequence
* @returns {Observable} Observable sequence containing merged elements
*/
observable.mergeAll = function();Switches to the latest inner observable sequence.
/**
* Transforms an observable of observables by switching to the latest observable
* @returns {Observable} Observable sequence that switches to latest inner observable
*/
observable.switch = function();
observable.switchLatest = function(); // AliasApplies a transducer to transform the observable sequence.
/**
* Applies a transducer to transform the observable sequence
* @param {Object} transducer - Transducer object with @@transducer/step method
* @returns {Observable} Transformed observable sequence
*/
observable.transduce = function(transducer);Continues the observable sequence with another sequence on error.
/**
* Continues the observable sequence with another sequence on error
* @param {function|Observable} handlerOrSecond - Error handler function or fallback observable
* @returns {Observable} Observable sequence continuing on error
*/
observable.catch = function(handlerOrSecond);
observable.catchError = function(handlerOrSecond); // AliasUsage Example:
var source = Rx.Observable.throw(new Error('Oops!'));
var withCatch = source.catch(function(err) {
return Rx.Observable.just('Recovered');
});
// Emits: 'Recovered'Repeats the observable sequence on error up to specified retry count.
/**
* Repeats the observable sequence on error up to specified retry count
* @param {number} [retryCount] - Number of retries, infinite if not specified
* @returns {Observable} Observable sequence that retries on error
*/
observable.retry = function(retryCount);Usage Example:
var source = someFailingObservable();
var withRetry = source.retry(3);
// Will retry up to 3 times before giving upRepeats the observable sequence on error when the notifier emits a value.
/**
* Repeats the observable sequence on error when the notifier emits
* @param {function} notificationHandler - Function that receives error notifications and returns an observable
* @returns {Observable} Observable sequence that retries conditionally
*/
observable.retryWhen = function(notificationHandler);Usage Example:
var source = someFailingObservable();
var withRetryWhen = source.retryWhen(function(errors) {
return errors.delay(1000); // Retry after 1 second delay
});Continues with the next observable sequence on error.
/**
* Continues with the next observable sequence on error without propagating the error
* @param {Observable} second - Observable sequence to continue with on error
* @returns {Observable} Observable sequence that continues on error
*/
observable.onErrorResumeNext = function(second);Wraps notifications in Notification objects.
/**
* Materializes the implicit notifications of an observable sequence as explicit notifications
* @returns {Observable} Observable sequence of Notification objects
*/
observable.materialize = function();Unwraps Notification objects back to notifications.
/**
* Dematerializes the explicit notifications of an observable sequence as implicit notifications
* @returns {Observable} Observable sequence with dematerialized notifications
*/
observable.dematerialize = function();Performs side effects for each element without modifying the sequence.
/**
* Performs side effects for each element without modifying the sequence
* @param {function|Observer} observerOrOnNext - Observer or onNext callback
* @param {function} [onError] - OnError callback
* @param {function} [onCompleted] - OnCompleted callback
* @returns {Observable} Original observable sequence
*/
observable.do = function(observerOrOnNext, onError, onCompleted);
observable.tap = function(observerOrOnNext, onError, onCompleted); // Alias
observable.doAction = function(observerOrOnNext, onError, onCompleted); // AliasUsage Example:
var source = Rx.Observable.range(1, 3);
var withSideEffect = source
.do(function(x) { console.log('Processing: ' + x); })
.map(function(x) { return x * 2; });
// Logs: 'Processing: 1', 'Processing: 2', 'Processing: 3'
// Emits: 2, 4, 6Performs side effects for each next element.
/**
* Performs side effects for each next element
* @param {function} onNext - Callback for next values
* @param {*} [thisArg] - Object to use as this when executing callback
* @returns {Observable} Original observable sequence
*/
observable.doOnNext = function(onNext, thisArg);
observable.tapOnNext = function(onNext, thisArg); // AliasPerforms side effects for each error.
/**
* Performs side effects for each error
* @param {function} onError - Callback for errors
* @param {*} [thisArg] - Object to use as this when executing callback
* @returns {Observable} Original observable sequence
*/
observable.doOnError = function(onError, thisArg);
observable.tapOnError = function(onError, thisArg); // AliasPerforms side effects on sequence completion.
/**
* Performs side effects on sequence completion
* @param {function} onCompleted - Callback for completion
* @param {*} [thisArg] - Object to use as this when executing callback
* @returns {Observable} Original observable sequence
*/
observable.doOnCompleted = function(onCompleted, thisArg);
observable.tapOnCompleted = function(onCompleted, thisArg); // Alias