0
# Schedulers
1
2
Control timing and concurrency of observable execution with various scheduling strategies for different execution contexts and performance requirements.
3
4
## Capabilities
5
6
### Scheduler Types
7
8
RxJS provides several built-in schedulers for different use cases.
9
10
```typescript { .api }
11
/**
12
* Scheduler interface for controlling timing and concurrency
13
*/
14
interface SchedulerLike {
15
/**
16
* Schedule work to be executed
17
* @param work - Function to execute
18
* @param delay - Delay in milliseconds
19
* @param state - Optional state to pass to work function
20
* @returns Subscription for cancelling the scheduled work
21
*/
22
schedule<T>(
23
work: (this: SchedulerAction<T>, state?: T) => void,
24
delay?: number,
25
state?: T
26
): Subscription;
27
28
/**
29
* Current time according to scheduler
30
*/
31
now(): number;
32
}
33
34
/**
35
* Scheduler action interface
36
*/
37
interface SchedulerAction<T> extends Subscription {
38
/**
39
* Schedule this action to run again
40
* @param state - Optional state
41
* @param delay - Optional delay
42
* @returns This action for chaining
43
*/
44
schedule(state?: T, delay?: number): SchedulerAction<T>;
45
}
46
```
47
48
### asyncScheduler
49
50
Uses setTimeout/setInterval for scheduling work asynchronously.
51
52
```typescript { .api }
53
/**
54
* Async scheduler using setTimeout/setInterval (default for time-based operations)
55
*/
56
const asyncScheduler: SchedulerLike;
57
const async: SchedulerLike; // Alias for asyncScheduler
58
```
59
60
**Usage Examples:**
61
62
```typescript
63
import { of, asyncScheduler } from "rxjs";
64
import { observeOn, subscribeOn } from "rxjs/operators";
65
66
// Schedule subscription on async scheduler
67
of(1, 2, 3).pipe(
68
subscribeOn(asyncScheduler)
69
).subscribe(x => console.log('Async subscription:', x));
70
71
// Schedule observation on async scheduler
72
of(1, 2, 3).pipe(
73
observeOn(asyncScheduler)
74
).subscribe(x => console.log('Async observation:', x));
75
76
// Direct scheduling
77
asyncScheduler.schedule(() => {
78
console.log('Scheduled work executed');
79
}, 1000); // Execute after 1 second
80
```
81
82
### asapScheduler
83
84
Uses Promise.resolve() or setImmediate for scheduling work as soon as possible.
85
86
```typescript { .api }
87
/**
88
* ASAP scheduler using Promise.resolve() for microtask scheduling
89
*/
90
const asapScheduler: SchedulerLike;
91
const asap: SchedulerLike; // Alias for asapScheduler
92
```
93
94
**Usage Examples:**
95
96
```typescript
97
import { of, asapScheduler } from "rxjs";
98
import { observeOn } from "rxjs/operators";
99
100
// Schedule on microtask queue (higher priority than setTimeout)
101
of(1, 2, 3).pipe(
102
observeOn(asapScheduler)
103
).subscribe(x => console.log('ASAP:', x));
104
105
console.log('Synchronous code');
106
// Output order: 'Synchronous code', then 'ASAP: 1', 'ASAP: 2', 'ASAP: 3'
107
```
108
109
### queueScheduler
110
111
Executes work immediately on the current thread (synchronous).
112
113
```typescript { .api }
114
/**
115
* Queue scheduler for synchronous execution (immediate)
116
*/
117
const queueScheduler: SchedulerLike;
118
const queue: SchedulerLike; // Alias for queueScheduler
119
```
120
121
**Usage Examples:**
122
123
```typescript
124
import { of, queueScheduler } from "rxjs";
125
import { observeOn } from "rxjs/operators";
126
127
// Synchronous execution
128
of(1, 2, 3).pipe(
129
observeOn(queueScheduler)
130
).subscribe(x => console.log('Queue:', x));
131
132
console.log('After subscription');
133
// Output: 'Queue: 1', 'Queue: 2', 'Queue: 3', 'After subscription'
134
```
135
136
### animationFrameScheduler
137
138
Uses requestAnimationFrame for scheduling work aligned with browser rendering.
139
140
```typescript { .api }
141
/**
142
* Animation frame scheduler using requestAnimationFrame
143
*/
144
const animationFrameScheduler: SchedulerLike;
145
const animationFrame: SchedulerLike; // Alias for animationFrameScheduler
146
```
147
148
**Usage Examples:**
149
150
```typescript
151
import { interval, animationFrameScheduler } from "rxjs";
152
import { map } from "rxjs/operators";
153
154
// Smooth animation loop
155
interval(0, animationFrameScheduler).pipe(
156
map(() => performance.now())
157
).subscribe(timestamp => {
158
// Update animation at ~60fps
159
updateAnimation(timestamp);
160
});
161
162
// Schedule DOM updates
163
animationFrameScheduler.schedule(() => {
164
element.style.left = '100px';
165
console.log('DOM updated on next frame');
166
});
167
```
168
169
### VirtualTimeScheduler
170
171
Scheduler for testing with virtual time control.
172
173
```typescript { .api }
174
/**
175
* Virtual time scheduler for testing time-based operations
176
*/
177
class VirtualTimeScheduler extends AsyncScheduler {
178
/**
179
* Current virtual time
180
*/
181
frame: number;
182
183
/**
184
* Collection of scheduled actions
185
*/
186
actions: Array<AsyncAction<any>>;
187
188
/**
189
* Execute all scheduled work up to specified time
190
* @param to - Time to flush to (optional)
191
*/
192
flush(): void;
193
194
/**
195
* Get current virtual time
196
*/
197
now(): number;
198
}
199
200
/**
201
* Virtual time action
202
*/
203
class VirtualAction<T> extends AsyncAction<T> {
204
/**
205
* Index in scheduler queue
206
*/
207
index: number;
208
209
/**
210
* Whether action is active
211
*/
212
active: boolean;
213
}
214
```
215
216
**Usage Examples:**
217
218
```typescript
219
import { VirtualTimeScheduler } from "rxjs";
220
import { delay } from "rxjs/operators";
221
222
// Testing time-dependent operations
223
const scheduler = new VirtualTimeScheduler();
224
225
const source$ = of(1, 2, 3).pipe(
226
delay(1000, scheduler) // Use virtual scheduler
227
);
228
229
source$.subscribe(x => console.log('Value:', x));
230
231
// Fast-forward virtual time
232
scheduler.flush(); // Immediately executes delayed operations
233
```
234
235
## Operator Integration
236
237
### observeOn
238
239
Control which scheduler observables use for emission.
240
241
```typescript { .api }
242
/**
243
* Re-emit notifications on specified scheduler
244
* @param scheduler - Scheduler to observe on
245
* @param delay - Optional delay in milliseconds
246
* @returns Operator function changing observation scheduler
247
*/
248
function observeOn<T>(scheduler: SchedulerLike, delay?: number): OperatorFunction<T, T>;
249
```
250
251
**Usage Examples:**
252
253
```typescript
254
import { of, asyncScheduler, queueScheduler } from "rxjs";
255
import { observeOn } from "rxjs/operators";
256
257
// Change from synchronous to asynchronous
258
of(1, 2, 3).pipe(
259
observeOn(asyncScheduler)
260
).subscribe(x => console.log('Async:', x));
261
262
console.log('Synchronous code');
263
// Output: 'Synchronous code', then async values
264
```
265
266
### subscribeOn
267
268
Control which scheduler observable uses for subscription.
269
270
```typescript { .api }
271
/**
272
* Subscribe to source on specified scheduler
273
* @param scheduler - Scheduler to subscribe on
274
* @param delay - Optional delay in milliseconds
275
* @returns Operator function changing subscription scheduler
276
*/
277
function subscribeOn<T>(scheduler: SchedulerLike, delay?: number): OperatorFunction<T, T>;
278
```
279
280
**Usage Examples:**
281
282
```typescript
283
import { of, asyncScheduler } from "rxjs";
284
import { subscribeOn } from "rxjs/operators";
285
286
// Defer subscription to async scheduler
287
of(1, 2, 3).pipe(
288
subscribeOn(asyncScheduler)
289
).subscribe(x => console.log('Deferred subscription:', x));
290
291
console.log('Immediate code');
292
// Output: 'Immediate code', then subscription happens asynchronously
293
```
294
295
## Advanced Scheduler Patterns
296
297
### Custom Scheduler
298
299
```typescript
300
import { Scheduler, AsyncAction } from "rxjs";
301
302
// Custom scheduler with logging
303
class LoggingScheduler extends Scheduler {
304
constructor(SchedulerAction: typeof AsyncAction, now: () => number = Date.now) {
305
super(SchedulerAction, now);
306
}
307
308
schedule<T>(
309
work: (this: SchedulerAction<T>, state?: T) => void,
310
delay?: number,
311
state?: T
312
): Subscription {
313
console.log(`Scheduling work with delay: ${delay}ms`);
314
return super.schedule(work, delay, state);
315
}
316
}
317
318
const loggingScheduler = new LoggingScheduler(AsyncAction);
319
320
// Use custom scheduler
321
of(1, 2, 3).pipe(
322
delay(1000, loggingScheduler)
323
).subscribe(x => console.log('Value:', x));
324
```
325
326
### Scheduler Selection
327
328
```typescript
329
import {
330
asyncScheduler,
331
asapScheduler,
332
queueScheduler,
333
animationFrameScheduler
334
} from "rxjs";
335
336
function createOptimizedObservable<T>(
337
source: Observable<T>,
338
context: 'animation' | 'io' | 'computation' | 'immediate'
339
): Observable<T> {
340
const schedulers = {
341
animation: animationFrameScheduler,
342
io: asyncScheduler,
343
computation: asapScheduler,
344
immediate: queueScheduler
345
};
346
347
return source.pipe(
348
observeOn(schedulers[context])
349
);
350
}
351
352
// Usage
353
const data$ = ajax.getJSON('/api/data');
354
355
// Optimize for different contexts
356
const animationData$ = createOptimizedObservable(data$, 'animation');
357
const ioData$ = createOptimizedObservable(data$, 'io');
358
const computationData$ = createOptimizedObservable(data$, 'computation');
359
```
360
361
### Performance Optimization
362
363
```typescript
364
import { range, queueScheduler, asyncScheduler } from "rxjs";
365
import { observeOn, map } from "rxjs/operators";
366
367
// Heavy computation - use queue scheduler to avoid blocking
368
range(1, 10000).pipe(
369
map(n => heavyComputation(n)),
370
observeOn(queueScheduler) // Keep synchronous for performance
371
).subscribe(result => console.log('Computed:', result));
372
373
// UI updates - use animation frame scheduler
374
const updates$ = interval(16).pipe( // ~60fps
375
observeOn(animationFrameScheduler),
376
map(() => ({ x: Math.random() * 100, y: Math.random() * 100 }))
377
);
378
379
updates$.subscribe(pos => {
380
element.style.transform = `translate(${pos.x}px, ${pos.y}px)`;
381
});
382
383
// Network requests - use async scheduler
384
ajax.getJSON('/api/data').pipe(
385
observeOn(asyncScheduler),
386
map(processData)
387
).subscribe(data => updateUI(data));
388
```
389
390
## Testing with Schedulers
391
392
```typescript
393
import { TestScheduler } from "rxjs/testing";
394
import { delay, take } from "rxjs/operators";
395
396
const testScheduler = new TestScheduler((actual, expected) => {
397
expect(actual).toEqual(expected);
398
});
399
400
testScheduler.run(({ cold, hot, expectObservable }) => {
401
// Test delay operator
402
const source$ = cold('a-b-c|');
403
const expected = ' ---a-b-c|';
404
405
const result$ = source$.pipe(delay(30));
406
407
expectObservable(result$).toBe(expected);
408
});
409
```
410
411
## Types
412
413
```typescript { .api }
414
interface SchedulerLike {
415
now(): number;
416
schedule<T>(
417
work: (this: SchedulerAction<T>, state?: T) => void,
418
delay?: number,
419
state?: T
420
): Subscription;
421
}
422
423
interface SchedulerAction<T> extends Subscription {
424
schedule(state?: T, delay?: number): SchedulerAction<T>;
425
}
426
427
abstract class Scheduler implements SchedulerLike {
428
constructor(
429
SchedulerAction: typeof Action,
430
now?: () => number
431
);
432
433
static now: () => number;
434
435
abstract schedule<T>(
436
work: (this: SchedulerAction<T>, state?: T) => void,
437
delay?: number,
438
state?: T
439
): Subscription;
440
441
now(): number;
442
}
443
```