CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-rxjs

Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences

Pending
Overview
Eval results
Files

schedulers.mddocs/

Schedulers

Control timing and concurrency of observable execution with various scheduling strategies for different execution contexts and performance requirements.

Capabilities

Scheduler Types

RxJS provides several built-in schedulers for different use cases.

/**
 * Scheduler interface for controlling timing and concurrency
 */
interface SchedulerLike {
  /**
   * Schedule work to be executed
   * @param work - Function to execute
   * @param delay - Delay in milliseconds
   * @param state - Optional state to pass to work function
   * @returns Subscription for cancelling the scheduled work
   */
  schedule<T>(
    work: (this: SchedulerAction<T>, state?: T) => void,
    delay?: number,
    state?: T
  ): Subscription;
  
  /**
   * Current time according to scheduler
   */
  now(): number;
}

/**
 * Scheduler action interface
 */
interface SchedulerAction<T> extends Subscription {
  /**
   * Schedule this action to run again
   * @param state - Optional state
   * @param delay - Optional delay
   * @returns This action for chaining
   */
  schedule(state?: T, delay?: number): SchedulerAction<T>;
}

asyncScheduler

Uses setTimeout/setInterval for scheduling work asynchronously.

/**
 * Async scheduler using setTimeout/setInterval (default for time-based operations)
 */
const asyncScheduler: SchedulerLike;
const async: SchedulerLike; // Alias for asyncScheduler

Usage Examples:

import { of, asyncScheduler } from "rxjs";
import { observeOn, subscribeOn } from "rxjs/operators";

// Schedule subscription on async scheduler  
of(1, 2, 3).pipe(
  subscribeOn(asyncScheduler)
).subscribe(x => console.log('Async subscription:', x));

// Schedule observation on async scheduler
of(1, 2, 3).pipe(
  observeOn(asyncScheduler)
).subscribe(x => console.log('Async observation:', x));

// Direct scheduling
asyncScheduler.schedule(() => {
  console.log('Scheduled work executed');
}, 1000); // Execute after 1 second

asapScheduler

Uses Promise.resolve() or setImmediate for scheduling work as soon as possible.

/**
 * ASAP scheduler using Promise.resolve() for microtask scheduling
 */
const asapScheduler: SchedulerLike;
const asap: SchedulerLike; // Alias for asapScheduler

Usage Examples:

import { of, asapScheduler } from "rxjs";
import { observeOn } from "rxjs/operators";

// Schedule on microtask queue (higher priority than setTimeout)
of(1, 2, 3).pipe(
  observeOn(asapScheduler)
).subscribe(x => console.log('ASAP:', x));

console.log('Synchronous code');
// Output order: 'Synchronous code', then 'ASAP: 1', 'ASAP: 2', 'ASAP: 3'

queueScheduler

Executes work immediately on the current thread (synchronous).

/**
 * Queue scheduler for synchronous execution (immediate)
 */
const queueScheduler: SchedulerLike;
const queue: SchedulerLike; // Alias for queueScheduler

Usage Examples:

import { of, queueScheduler } from "rxjs";
import { observeOn } from "rxjs/operators";

// Synchronous execution
of(1, 2, 3).pipe(
  observeOn(queueScheduler)
).subscribe(x => console.log('Queue:', x));

console.log('After subscription');
// Output: 'Queue: 1', 'Queue: 2', 'Queue: 3', 'After subscription'

animationFrameScheduler

Uses requestAnimationFrame for scheduling work aligned with browser rendering.

/**
 * Animation frame scheduler using requestAnimationFrame
 */
const animationFrameScheduler: SchedulerLike;
const animationFrame: SchedulerLike; // Alias for animationFrameScheduler

Usage Examples:

import { interval, animationFrameScheduler } from "rxjs";
import { map } from "rxjs/operators";

// Smooth animation loop
interval(0, animationFrameScheduler).pipe(
  map(() => performance.now())
).subscribe(timestamp => {
  // Update animation at ~60fps
  updateAnimation(timestamp);
});

// Schedule DOM updates
animationFrameScheduler.schedule(() => {
  element.style.left = '100px';
  console.log('DOM updated on next frame');
});

VirtualTimeScheduler

Scheduler for testing with virtual time control.

/**
 * Virtual time scheduler for testing time-based operations
 */
class VirtualTimeScheduler extends AsyncScheduler {
  /**
   * Current virtual time
   */
  frame: number;
  
  /**
   * Collection of scheduled actions
   */
  actions: Array<AsyncAction<any>>;
  
  /**
   * Execute all scheduled work up to specified time
   * @param to - Time to flush to (optional)
   */
  flush(): void;
  
  /**
   * Get current virtual time
   */
  now(): number;
}

/**
 * Virtual time action
 */
class VirtualAction<T> extends AsyncAction<T> {
  /**
   * Index in scheduler queue
   */
  index: number;
  
  /**
   * Whether action is active
   */
  active: boolean;
}

Usage Examples:

import { VirtualTimeScheduler } from "rxjs";
import { delay } from "rxjs/operators";

// Testing time-dependent operations
const scheduler = new VirtualTimeScheduler();

const source$ = of(1, 2, 3).pipe(
  delay(1000, scheduler) // Use virtual scheduler
);

source$.subscribe(x => console.log('Value:', x));

// Fast-forward virtual time
scheduler.flush(); // Immediately executes delayed operations

Operator Integration

observeOn

Control which scheduler observables use for emission.

/**
 * Re-emit notifications on specified scheduler
 * @param scheduler - Scheduler to observe on
 * @param delay - Optional delay in milliseconds
 * @returns Operator function changing observation scheduler
 */
function observeOn<T>(scheduler: SchedulerLike, delay?: number): OperatorFunction<T, T>;

Usage Examples:

import { of, asyncScheduler, queueScheduler } from "rxjs";
import { observeOn } from "rxjs/operators";

// Change from synchronous to asynchronous
of(1, 2, 3).pipe(
  observeOn(asyncScheduler)
).subscribe(x => console.log('Async:', x));

console.log('Synchronous code');
// Output: 'Synchronous code', then async values

subscribeOn

Control which scheduler observable uses for subscription.

/**
 * Subscribe to source on specified scheduler
 * @param scheduler - Scheduler to subscribe on
 * @param delay - Optional delay in milliseconds
 * @returns Operator function changing subscription scheduler
 */
function subscribeOn<T>(scheduler: SchedulerLike, delay?: number): OperatorFunction<T, T>;

Usage Examples:

import { of, asyncScheduler } from "rxjs";
import { subscribeOn } from "rxjs/operators";

// Defer subscription to async scheduler
of(1, 2, 3).pipe(
  subscribeOn(asyncScheduler)
).subscribe(x => console.log('Deferred subscription:', x));

console.log('Immediate code');
// Output: 'Immediate code', then subscription happens asynchronously

Advanced Scheduler Patterns

Custom Scheduler

import { Scheduler, AsyncAction } from "rxjs";

// Custom scheduler with logging
class LoggingScheduler extends Scheduler {
  constructor(SchedulerAction: typeof AsyncAction, now: () => number = Date.now) {
    super(SchedulerAction, now);
  }
  
  schedule<T>(
    work: (this: SchedulerAction<T>, state?: T) => void,
    delay?: number,
    state?: T
  ): Subscription {
    console.log(`Scheduling work with delay: ${delay}ms`);
    return super.schedule(work, delay, state);
  }
}

const loggingScheduler = new LoggingScheduler(AsyncAction);

// Use custom scheduler
of(1, 2, 3).pipe(
  delay(1000, loggingScheduler)
).subscribe(x => console.log('Value:', x));

Scheduler Selection

import { 
  asyncScheduler, 
  asapScheduler, 
  queueScheduler, 
  animationFrameScheduler 
} from "rxjs";

function createOptimizedObservable<T>(
  source: Observable<T>,
  context: 'animation' | 'io' | 'computation' | 'immediate'
): Observable<T> {
  const schedulers = {
    animation: animationFrameScheduler,
    io: asyncScheduler,
    computation: asapScheduler,
    immediate: queueScheduler
  };
  
  return source.pipe(
    observeOn(schedulers[context])
  );
}

// Usage
const data$ = ajax.getJSON('/api/data');

// Optimize for different contexts
const animationData$ = createOptimizedObservable(data$, 'animation');
const ioData$ = createOptimizedObservable(data$, 'io');
const computationData$ = createOptimizedObservable(data$, 'computation');

Performance Optimization

import { range, queueScheduler, asyncScheduler } from "rxjs";
import { observeOn, map } from "rxjs/operators";

// Heavy computation - use queue scheduler to avoid blocking
range(1, 10000).pipe(
  map(n => heavyComputation(n)), 
  observeOn(queueScheduler) // Keep synchronous for performance
).subscribe(result => console.log('Computed:', result));

// UI updates - use animation frame scheduler
const updates$ = interval(16).pipe( // ~60fps
  observeOn(animationFrameScheduler),
  map(() => ({ x: Math.random() * 100, y: Math.random() * 100 }))
);

updates$.subscribe(pos => {
  element.style.transform = `translate(${pos.x}px, ${pos.y}px)`;
});

// Network requests - use async scheduler
ajax.getJSON('/api/data').pipe(
  observeOn(asyncScheduler),
  map(processData)
).subscribe(data => updateUI(data));

Testing with Schedulers

import { TestScheduler } from "rxjs/testing";
import { delay, take } from "rxjs/operators";

const testScheduler = new TestScheduler((actual, expected) => {
  expect(actual).toEqual(expected);
});

testScheduler.run(({ cold, hot, expectObservable }) => {
  // Test delay operator
  const source$ = cold('a-b-c|');
  const expected = '   ---a-b-c|';
  
  const result$ = source$.pipe(delay(30));
  
  expectObservable(result$).toBe(expected);
});

Types

interface SchedulerLike {
  now(): number;
  schedule<T>(
    work: (this: SchedulerAction<T>, state?: T) => void,
    delay?: number,
    state?: T
  ): Subscription;
}

interface SchedulerAction<T> extends Subscription {
  schedule(state?: T, delay?: number): SchedulerAction<T>;
}

abstract class Scheduler implements SchedulerLike {
  constructor(
    SchedulerAction: typeof Action,
    now?: () => number
  );
  
  static now: () => number;
  
  abstract schedule<T>(
    work: (this: SchedulerAction<T>, state?: T) => void,
    delay?: number,
    state?: T
  ): Subscription;
  
  now(): number;
}

Install with Tessl CLI

npx tessl i tessl/npm-rxjs

docs

ajax-operations.md

combination-operators.md

core-types.md

error-handling.md

fetch-operations.md

filtering-operators.md

index.md

observable-creation.md

schedulers.md

subjects.md

testing-utilities.md

transformation-operators.md

websocket-operations.md

tile.json