Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences
—
Control timing and concurrency of observable execution with various scheduling strategies for different execution contexts and performance requirements.
RxJS provides several built-in schedulers for different use cases.
/**
* Scheduler interface for controlling timing and concurrency
*/
interface SchedulerLike {
/**
* Schedule work to be executed
* @param work - Function to execute
* @param delay - Delay in milliseconds
* @param state - Optional state to pass to work function
* @returns Subscription for cancelling the scheduled work
*/
schedule<T>(
work: (this: SchedulerAction<T>, state?: T) => void,
delay?: number,
state?: T
): Subscription;
/**
* Current time according to scheduler
*/
now(): number;
}
/**
* Scheduler action interface
*/
interface SchedulerAction<T> extends Subscription {
/**
* Schedule this action to run again
* @param state - Optional state
* @param delay - Optional delay
* @returns This action for chaining
*/
schedule(state?: T, delay?: number): SchedulerAction<T>;
}Uses setTimeout/setInterval for scheduling work asynchronously.
/**
* Async scheduler using setTimeout/setInterval (default for time-based operations)
*/
const asyncScheduler: SchedulerLike;
const async: SchedulerLike; // Alias for asyncSchedulerUsage Examples:
import { of, asyncScheduler } from "rxjs";
import { observeOn, subscribeOn } from "rxjs/operators";
// Schedule subscription on async scheduler
of(1, 2, 3).pipe(
subscribeOn(asyncScheduler)
).subscribe(x => console.log('Async subscription:', x));
// Schedule observation on async scheduler
of(1, 2, 3).pipe(
observeOn(asyncScheduler)
).subscribe(x => console.log('Async observation:', x));
// Direct scheduling
asyncScheduler.schedule(() => {
console.log('Scheduled work executed');
}, 1000); // Execute after 1 secondUses Promise.resolve() or setImmediate for scheduling work as soon as possible.
/**
* ASAP scheduler using Promise.resolve() for microtask scheduling
*/
const asapScheduler: SchedulerLike;
const asap: SchedulerLike; // Alias for asapSchedulerUsage Examples:
import { of, asapScheduler } from "rxjs";
import { observeOn } from "rxjs/operators";
// Schedule on microtask queue (higher priority than setTimeout)
of(1, 2, 3).pipe(
observeOn(asapScheduler)
).subscribe(x => console.log('ASAP:', x));
console.log('Synchronous code');
// Output order: 'Synchronous code', then 'ASAP: 1', 'ASAP: 2', 'ASAP: 3'Executes work immediately on the current thread (synchronous).
/**
* Queue scheduler for synchronous execution (immediate)
*/
const queueScheduler: SchedulerLike;
const queue: SchedulerLike; // Alias for queueSchedulerUsage Examples:
import { of, queueScheduler } from "rxjs";
import { observeOn } from "rxjs/operators";
// Synchronous execution
of(1, 2, 3).pipe(
observeOn(queueScheduler)
).subscribe(x => console.log('Queue:', x));
console.log('After subscription');
// Output: 'Queue: 1', 'Queue: 2', 'Queue: 3', 'After subscription'Uses requestAnimationFrame for scheduling work aligned with browser rendering.
/**
* Animation frame scheduler using requestAnimationFrame
*/
const animationFrameScheduler: SchedulerLike;
const animationFrame: SchedulerLike; // Alias for animationFrameSchedulerUsage Examples:
import { interval, animationFrameScheduler } from "rxjs";
import { map } from "rxjs/operators";
// Smooth animation loop
interval(0, animationFrameScheduler).pipe(
map(() => performance.now())
).subscribe(timestamp => {
// Update animation at ~60fps
updateAnimation(timestamp);
});
// Schedule DOM updates
animationFrameScheduler.schedule(() => {
element.style.left = '100px';
console.log('DOM updated on next frame');
});Scheduler for testing with virtual time control.
/**
* Virtual time scheduler for testing time-based operations
*/
class VirtualTimeScheduler extends AsyncScheduler {
/**
* Current virtual time
*/
frame: number;
/**
* Collection of scheduled actions
*/
actions: Array<AsyncAction<any>>;
/**
* Execute all scheduled work up to specified time
* @param to - Time to flush to (optional)
*/
flush(): void;
/**
* Get current virtual time
*/
now(): number;
}
/**
* Virtual time action
*/
class VirtualAction<T> extends AsyncAction<T> {
/**
* Index in scheduler queue
*/
index: number;
/**
* Whether action is active
*/
active: boolean;
}Usage Examples:
import { VirtualTimeScheduler } from "rxjs";
import { delay } from "rxjs/operators";
// Testing time-dependent operations
const scheduler = new VirtualTimeScheduler();
const source$ = of(1, 2, 3).pipe(
delay(1000, scheduler) // Use virtual scheduler
);
source$.subscribe(x => console.log('Value:', x));
// Fast-forward virtual time
scheduler.flush(); // Immediately executes delayed operationsControl which scheduler observables use for emission.
/**
* Re-emit notifications on specified scheduler
* @param scheduler - Scheduler to observe on
* @param delay - Optional delay in milliseconds
* @returns Operator function changing observation scheduler
*/
function observeOn<T>(scheduler: SchedulerLike, delay?: number): OperatorFunction<T, T>;Usage Examples:
import { of, asyncScheduler, queueScheduler } from "rxjs";
import { observeOn } from "rxjs/operators";
// Change from synchronous to asynchronous
of(1, 2, 3).pipe(
observeOn(asyncScheduler)
).subscribe(x => console.log('Async:', x));
console.log('Synchronous code');
// Output: 'Synchronous code', then async valuesControl which scheduler observable uses for subscription.
/**
* Subscribe to source on specified scheduler
* @param scheduler - Scheduler to subscribe on
* @param delay - Optional delay in milliseconds
* @returns Operator function changing subscription scheduler
*/
function subscribeOn<T>(scheduler: SchedulerLike, delay?: number): OperatorFunction<T, T>;Usage Examples:
import { of, asyncScheduler } from "rxjs";
import { subscribeOn } from "rxjs/operators";
// Defer subscription to async scheduler
of(1, 2, 3).pipe(
subscribeOn(asyncScheduler)
).subscribe(x => console.log('Deferred subscription:', x));
console.log('Immediate code');
// Output: 'Immediate code', then subscription happens asynchronouslyimport { Scheduler, AsyncAction } from "rxjs";
// Custom scheduler with logging
class LoggingScheduler extends Scheduler {
constructor(SchedulerAction: typeof AsyncAction, now: () => number = Date.now) {
super(SchedulerAction, now);
}
schedule<T>(
work: (this: SchedulerAction<T>, state?: T) => void,
delay?: number,
state?: T
): Subscription {
console.log(`Scheduling work with delay: ${delay}ms`);
return super.schedule(work, delay, state);
}
}
const loggingScheduler = new LoggingScheduler(AsyncAction);
// Use custom scheduler
of(1, 2, 3).pipe(
delay(1000, loggingScheduler)
).subscribe(x => console.log('Value:', x));import {
asyncScheduler,
asapScheduler,
queueScheduler,
animationFrameScheduler
} from "rxjs";
function createOptimizedObservable<T>(
source: Observable<T>,
context: 'animation' | 'io' | 'computation' | 'immediate'
): Observable<T> {
const schedulers = {
animation: animationFrameScheduler,
io: asyncScheduler,
computation: asapScheduler,
immediate: queueScheduler
};
return source.pipe(
observeOn(schedulers[context])
);
}
// Usage
const data$ = ajax.getJSON('/api/data');
// Optimize for different contexts
const animationData$ = createOptimizedObservable(data$, 'animation');
const ioData$ = createOptimizedObservable(data$, 'io');
const computationData$ = createOptimizedObservable(data$, 'computation');import { range, queueScheduler, asyncScheduler } from "rxjs";
import { observeOn, map } from "rxjs/operators";
// Heavy computation - use queue scheduler to avoid blocking
range(1, 10000).pipe(
map(n => heavyComputation(n)),
observeOn(queueScheduler) // Keep synchronous for performance
).subscribe(result => console.log('Computed:', result));
// UI updates - use animation frame scheduler
const updates$ = interval(16).pipe( // ~60fps
observeOn(animationFrameScheduler),
map(() => ({ x: Math.random() * 100, y: Math.random() * 100 }))
);
updates$.subscribe(pos => {
element.style.transform = `translate(${pos.x}px, ${pos.y}px)`;
});
// Network requests - use async scheduler
ajax.getJSON('/api/data').pipe(
observeOn(asyncScheduler),
map(processData)
).subscribe(data => updateUI(data));import { TestScheduler } from "rxjs/testing";
import { delay, take } from "rxjs/operators";
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
testScheduler.run(({ cold, hot, expectObservable }) => {
// Test delay operator
const source$ = cold('a-b-c|');
const expected = ' ---a-b-c|';
const result$ = source$.pipe(delay(30));
expectObservable(result$).toBe(expected);
});interface SchedulerLike {
now(): number;
schedule<T>(
work: (this: SchedulerAction<T>, state?: T) => void,
delay?: number,
state?: T
): Subscription;
}
interface SchedulerAction<T> extends Subscription {
schedule(state?: T, delay?: number): SchedulerAction<T>;
}
abstract class Scheduler implements SchedulerLike {
constructor(
SchedulerAction: typeof Action,
now?: () => number
);
static now: () => number;
abstract schedule<T>(
work: (this: SchedulerAction<T>, state?: T) => void,
delay?: number,
state?: T
): Subscription;
now(): number;
}