CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-rx

Library for composing asynchronous and event-based operations in JavaScript using Observable sequences and fluent query operators

Pending
Overview
Eval results
Files

schedulers.mddocs/

Schedulers

Control timing and concurrency of observable operations with different scheduling strategies.

Capabilities

Scheduler Types

Built-in schedulers for different execution contexts.

/**
 * Immediate scheduler - executes work synchronously
 * @type {Scheduler}
 */
Rx.Scheduler.immediate;

/**
 * Current thread scheduler - executes work on current thread with trampoline
 * @type {Scheduler}
 */
Rx.Scheduler.currentThread;

/**
 * Default scheduler - executes work asynchronously (alias for async)
 * @type {Scheduler}
 */
Rx.Scheduler.default;

/**
 * Async scheduler - executes work asynchronously using setTimeout
 * @type {Scheduler}
 */
Rx.Scheduler.async;

Scheduler Static Methods

Utility methods for working with schedulers.

/**
 * Checks if an object is a scheduler
 * @param {*} obj - Object to test
 * @returns {boolean} True if object is a scheduler
 */
Rx.Scheduler.isScheduler = function(obj);

/**
 * Gets the current time
 * @returns {number} Current time in milliseconds
 */
Rx.Scheduler.now = function();

/**
 * Normalizes a time span value
 * @param {number} timeSpan - Time span to normalize
 * @returns {number} Normalized time span
 */
Rx.Scheduler.normalize = function(timeSpan);

Scheduler Instance Methods

Core scheduling methods available on all scheduler instances.

/**
 * Schedules work to be executed immediately
 * @param {*} state - Initial state passed to action
 * @param {function} action - Action to execute
 * @returns {Disposable} Disposable to cancel the scheduled work
 */
scheduler.schedule = function(state, action);

/**
 * Schedules work to be executed after a delay
 * @param {*} state - Initial state passed to action
 * @param {number} dueTime - Delay in milliseconds
 * @param {function} action - Action to execute
 * @returns {Disposable} Disposable to cancel the scheduled work
 */
scheduler.scheduleFuture = function(state, dueTime, action);

/**
 * Schedules recursive work
 * @param {*} state - Initial state passed to action
 * @param {function} action - Recursive action that can reschedule itself
 * @returns {Disposable} Disposable to cancel the scheduled work
 */
scheduler.scheduleRecursive = function(state, action);

/**
 * Schedules periodic work
 * @param {*} state - Initial state passed to action
 * @param {number} period - Period in milliseconds between executions
 * @param {function} action - Action to execute periodically
 * @returns {Disposable} Disposable to cancel the periodic work
 */
scheduler.schedulePeriodic = function(state, period, action);

/**
 * Gets the current time according to this scheduler
 * @returns {number} Current time in milliseconds
 */
scheduler.now = function();

Using Schedulers with Observables

Schedulers can be specified for various operations:

/**
 * Specifies the scheduler to use for subscription
 * @param {Scheduler} scheduler - Scheduler to use for subscription
 * @returns {Observable} Observable with specified subscription scheduler
 */
observable.subscribeOn = function(scheduler);

/**
 * Specifies the scheduler to use for observation
 * @param {Scheduler} scheduler - Scheduler to use for observation
 * @returns {Observable} Observable with specified observation scheduler
 */
observable.observeOn = function(scheduler);

Usage Examples:

// Execute subscription on current thread scheduler
var source = Rx.Observable.range(1, 5)
  .subscribeOn(Rx.Scheduler.currentThread);

// Observe results on async scheduler
var asyncSource = source.observeOn(Rx.Scheduler.async);

// Create observable with specific scheduler
var timerSource = Rx.Observable.timer(1000, Rx.Scheduler.async);

Action Function Format

Action functions receive specific parameters for scheduling:

/**
 * Action function for schedule method
 * @param {Scheduler} scheduler - The scheduler instance
 * @param {*} state - The state passed to schedule
 * @returns {Disposable} Optional disposable to cancel work
 */
function scheduleAction(scheduler, state) {
  // Perform work with state
  return optionalDisposable;
}

/**
 * Recursive action function for scheduleRecursive method
 * @param {*} state - The current state
 * @param {function} recurse - Function to call for recursion
 * @returns {Disposable} Optional disposable to cancel work
 */
function recursiveAction(state, recurse) {
  // Perform work with state
  if (shouldContinue) {
    recurse(newState); // Schedule next iteration
  }
  return optionalDisposable;
}

/**
 * Periodic action function for schedulePeriodic method
 * @param {*} state - The current state
 * @returns {*} New state for next iteration
 */
function periodicAction(state) {
  // Perform periodic work
  return newState; // State for next execution
}

Custom Schedulers

Creating custom schedulers for specialized timing needs:

/**
 * Base scheduler constructor for custom schedulers
 * @param {function} now - Function that returns current time
 * @param {function} schedule - Function to schedule immediate work
 * @param {function} scheduleFuture - Function to schedule delayed work
 * @param {function} scheduleRecursive - Function to schedule recursive work
 * @constructor
 */
function CustomScheduler(now, schedule, scheduleFuture, scheduleRecursive) {
  this.now = now;
  this.schedule = schedule;
  this.scheduleFuture = scheduleFuture;
  this.scheduleRecursive = scheduleRecursive;
}

Scheduler Behavior Comparison

SchedulerExecution ContextUse Case
immediateSynchronousTesting, immediate execution
currentThreadCurrent thread with trampolineAvoiding stack overflow
default/asyncAsynchronous with setTimeoutGeneral async operations

Usage Example:

// Compare scheduler behaviors
var immediate = Rx.Observable.return(1, Rx.Scheduler.immediate);
var async = Rx.Observable.return(1, Rx.Scheduler.async);

console.log('Before subscribe');
immediate.subscribe(function(x) { console.log('Immediate: ' + x); });
async.subscribe(function(x) { console.log('Async: ' + x); });
console.log('After subscribe');

// Output:
// Before subscribe
// Immediate: 1
// After subscribe
// Async: 1

Install with Tessl CLI

npx tessl i tessl/npm-rx

docs

async-operations.md

disposables.md

index.md

observable-aggregation.md

observable-combination.md

observable-creation.md

observable-filtering.md

observable-transformation.md

schedulers.md

subjects.md

testing.md

time-operations.md

tile.json