Library for composing asynchronous and event-based operations in JavaScript using Observable sequences and fluent query operators
—
Control timing and concurrency of observable operations with different scheduling strategies.
Built-in schedulers for different execution contexts.
/**
* Immediate scheduler - executes work synchronously
* @type {Scheduler}
*/
Rx.Scheduler.immediate;
/**
* Current thread scheduler - executes work on current thread with trampoline
* @type {Scheduler}
*/
Rx.Scheduler.currentThread;
/**
* Default scheduler - executes work asynchronously (alias for async)
* @type {Scheduler}
*/
Rx.Scheduler.default;
/**
* Async scheduler - executes work asynchronously using setTimeout
* @type {Scheduler}
*/
Rx.Scheduler.async;Utility methods for working with schedulers.
/**
* Checks if an object is a scheduler
* @param {*} obj - Object to test
* @returns {boolean} True if object is a scheduler
*/
Rx.Scheduler.isScheduler = function(obj);
/**
* Gets the current time
* @returns {number} Current time in milliseconds
*/
Rx.Scheduler.now = function();
/**
* Normalizes a time span value
* @param {number} timeSpan - Time span to normalize
* @returns {number} Normalized time span
*/
Rx.Scheduler.normalize = function(timeSpan);Core scheduling methods available on all scheduler instances.
/**
* Schedules work to be executed immediately
* @param {*} state - Initial state passed to action
* @param {function} action - Action to execute
* @returns {Disposable} Disposable to cancel the scheduled work
*/
scheduler.schedule = function(state, action);
/**
* Schedules work to be executed after a delay
* @param {*} state - Initial state passed to action
* @param {number} dueTime - Delay in milliseconds
* @param {function} action - Action to execute
* @returns {Disposable} Disposable to cancel the scheduled work
*/
scheduler.scheduleFuture = function(state, dueTime, action);
/**
* Schedules recursive work
* @param {*} state - Initial state passed to action
* @param {function} action - Recursive action that can reschedule itself
* @returns {Disposable} Disposable to cancel the scheduled work
*/
scheduler.scheduleRecursive = function(state, action);
/**
* Schedules periodic work
* @param {*} state - Initial state passed to action
* @param {number} period - Period in milliseconds between executions
* @param {function} action - Action to execute periodically
* @returns {Disposable} Disposable to cancel the periodic work
*/
scheduler.schedulePeriodic = function(state, period, action);
/**
* Gets the current time according to this scheduler
* @returns {number} Current time in milliseconds
*/
scheduler.now = function();Schedulers can be specified for various operations:
/**
* Specifies the scheduler to use for subscription
* @param {Scheduler} scheduler - Scheduler to use for subscription
* @returns {Observable} Observable with specified subscription scheduler
*/
observable.subscribeOn = function(scheduler);
/**
* Specifies the scheduler to use for observation
* @param {Scheduler} scheduler - Scheduler to use for observation
* @returns {Observable} Observable with specified observation scheduler
*/
observable.observeOn = function(scheduler);Usage Examples:
// Execute subscription on current thread scheduler
var source = Rx.Observable.range(1, 5)
.subscribeOn(Rx.Scheduler.currentThread);
// Observe results on async scheduler
var asyncSource = source.observeOn(Rx.Scheduler.async);
// Create observable with specific scheduler
var timerSource = Rx.Observable.timer(1000, Rx.Scheduler.async);Action functions receive specific parameters for scheduling:
/**
* Action function for schedule method
* @param {Scheduler} scheduler - The scheduler instance
* @param {*} state - The state passed to schedule
* @returns {Disposable} Optional disposable to cancel work
*/
function scheduleAction(scheduler, state) {
// Perform work with state
return optionalDisposable;
}
/**
* Recursive action function for scheduleRecursive method
* @param {*} state - The current state
* @param {function} recurse - Function to call for recursion
* @returns {Disposable} Optional disposable to cancel work
*/
function recursiveAction(state, recurse) {
// Perform work with state
if (shouldContinue) {
recurse(newState); // Schedule next iteration
}
return optionalDisposable;
}
/**
* Periodic action function for schedulePeriodic method
* @param {*} state - The current state
* @returns {*} New state for next iteration
*/
function periodicAction(state) {
// Perform periodic work
return newState; // State for next execution
}Creating custom schedulers for specialized timing needs:
/**
* Base scheduler constructor for custom schedulers
* @param {function} now - Function that returns current time
* @param {function} schedule - Function to schedule immediate work
* @param {function} scheduleFuture - Function to schedule delayed work
* @param {function} scheduleRecursive - Function to schedule recursive work
* @constructor
*/
function CustomScheduler(now, schedule, scheduleFuture, scheduleRecursive) {
this.now = now;
this.schedule = schedule;
this.scheduleFuture = scheduleFuture;
this.scheduleRecursive = scheduleRecursive;
}| Scheduler | Execution Context | Use Case |
|---|---|---|
immediate | Synchronous | Testing, immediate execution |
currentThread | Current thread with trampoline | Avoiding stack overflow |
default/async | Asynchronous with setTimeout | General async operations |
Usage Example:
// Compare scheduler behaviors
var immediate = Rx.Observable.return(1, Rx.Scheduler.immediate);
var async = Rx.Observable.return(1, Rx.Scheduler.async);
console.log('Before subscribe');
immediate.subscribe(function(x) { console.log('Immediate: ' + x); });
async.subscribe(function(x) { console.log('Async: ' + x); });
console.log('After subscribe');
// Output:
// Before subscribe
// Immediate: 1
// After subscribe
// Async: 1