Library for composing asynchronous and event-based operations in JavaScript using Observable sequences and fluent query operators
—
Methods for working with asynchronous patterns including callbacks, events, and promises.
Creates an observable from a Node.js-style callback function.
/**
* Creates an observable from a function that uses Node.js callback pattern
* @param {function} func - Function that takes arguments and a callback(error, result)
* @param {*} [context] - Context to bind the function to
* @param {function} [selector] - Function to transform the callback result
* @returns {function} Function that returns an observable when called with arguments
*/
Rx.Observable.fromCallback = function(func, context, selector);Usage Example:
var fs = require('fs');
var readFile = Rx.Observable.fromCallback(fs.readFile);
var source = readFile('file.txt', 'utf8');
source.subscribe(
function(data) { console.log('File contents: ' + data); },
function(err) { console.log('Error: ' + err); }
);Creates an observable from a Node.js-style callback with error-first pattern.
/**
* Creates an observable from a Node.js-style callback function
* @param {function} func - Function that takes arguments and a callback(error, ...results)
* @param {*} [context] - Context to bind the function to
* @param {function} [selector] - Function to transform the callback results
* @returns {function} Function that returns an observable when called with arguments
*/
Rx.Observable.fromNodeCallback = function(func, context, selector);Creates an observable from DOM events or EventEmitter events.
/**
* Creates an observable from DOM events or EventEmitter events
* @param {Element|EventEmitter} element - DOM element or EventEmitter
* @param {string} eventName - Name of the event to listen for
* @param {function} [selector] - Function to transform the event object
* @returns {Observable} Observable sequence of events
*/
Rx.Observable.fromEvent = function(element, eventName, selector);Usage Example:
var button = document.getElementById('myButton');
var clicks = Rx.Observable.fromEvent(button, 'click');
clicks.subscribe(function(event) {
console.log('Button clicked!');
});Creates an observable from custom event patterns.
/**
* Creates an observable from a custom event pattern
* @param {function} addHandler - Function to add event handler
* @param {function} removeHandler - Function to remove event handler
* @param {function} [selector] - Function to transform the event arguments
* @returns {Observable} Observable sequence of events
*/
Rx.Observable.fromEventPattern = function(addHandler, removeHandler, selector);Usage Example:
var source = Rx.Observable.fromEventPattern(
function(handler) { emitter.addListener('data', handler); },
function(handler) { emitter.removeListener('data', handler); }
);Creates an observable from a Promise.
/**
* Creates an observable from a Promise
* @param {Promise} promise - Promise to convert to observable
* @returns {Observable} Observable sequence that emits promise result
*/
Rx.Observable.fromPromise = function(promise);Usage Example:
var promise = fetch('/api/data');
var source = Rx.Observable.fromPromise(promise);
source.subscribe(
function(response) { console.log('Response:', response); },
function(error) { console.log('Error:', error); }
);Converts an observable to a Promise.
/**
* Converts the observable sequence to a Promise
* @param {function} [promiseCtor] - Promise constructor to use
* @returns {Promise} Promise that resolves with the last emitted value
*/
observable.toPromise = function(promiseCtor);Usage Example:
var source = Rx.Observable.return(42);
var promise = source.toPromise();
promise.then(function(value) {
console.log('Promise resolved with:', value);
});Creates an observable from a generator function.
/**
* Creates an observable from a generator function using spawn-like behavior
* @param {function} generatorFunction - Generator function to execute
* @returns {Observable} Observable sequence from generator execution
*/
Rx.Observable.spawn = function(generatorFunction);Usage Example:
var source = Rx.Observable.spawn(function* () {
var a = yield Rx.Observable.return(1);
var b = yield Rx.Observable.return(2);
return a + b;
});Invokes a function asynchronously and returns the result as an observable.
/**
* Invokes a function asynchronously and returns the result as an observable
* @param {function} func - Function to execute asynchronously
* @param {*} [context] - Context to bind the function to
* @param {Scheduler} [scheduler] - Scheduler to execute the function on
* @returns {Observable} Observable sequence containing the function result
*/
Rx.Observable.start = function(func, context, scheduler);Converts a function to return an observable instead of calling a callback.
/**
* Converts a function to return an observable instead of using callbacks
* @param {function} func - Function to convert
* @param {*} [context] - Context to bind the function to
* @param {Scheduler} [scheduler] - Scheduler to execute on
* @returns {function} Function that returns an observable
*/
Rx.Observable.toAsync = function(func, context, scheduler);Starts an async function and returns the result as an observable.
/**
* Starts an async function and returns the result as an observable
* @param {function} functionAsync - Async function that returns a Promise
* @returns {Observable} Observable sequence containing the async function result
*/
Rx.Observable.startAsync = function(functionAsync);Wraps a function to work with observables.
/**
* Wraps a function to work with observables
* @param {function} fn - Function to wrap
* @returns {function} Wrapped function that works with observables
*/
Rx.Observable.wrap = function(fn);Async operations integrate with RxJS error handling:
/**
* Handles errors from async operations
* @param {function} handler - Error handling function that returns an observable
* @returns {Observable} Observable sequence with error handling
*/
observable.catch = function(handler);
/**
* Retries the async operation on error
* @param {number} [retryCount] - Number of retry attempts
* @returns {Observable} Observable sequence with retry logic
*/
observable.retry = function(retryCount);Usage Example:
var source = Rx.Observable.fromPromise(fetch('/api/data'))
.retry(3)
.catch(function(error) {
return Rx.Observable.return({error: 'Failed to fetch data'});
});