or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

ajax-operations.mdcombination-operators.mdcore-types.mderror-handling.mdfetch-operations.mdfiltering-operators.mdindex.mdobservable-creation.mdschedulers.mdsubjects.mdtesting-utilities.mdtransformation-operators.mdwebsocket-operations.md

schedulers.mddocs/

0

# Schedulers

1

2

Control timing and concurrency of observable execution with various scheduling strategies for different execution contexts and performance requirements.

3

4

## Capabilities

5

6

### Scheduler Types

7

8

RxJS provides several built-in schedulers for different use cases.

9

10

```typescript { .api }

11

/**

12

* Scheduler interface for controlling timing and concurrency

13

*/

14

interface SchedulerLike {

15

/**

16

* Schedule work to be executed

17

* @param work - Function to execute

18

* @param delay - Delay in milliseconds

19

* @param state - Optional state to pass to work function

20

* @returns Subscription for cancelling the scheduled work

21

*/

22

schedule<T>(

23

work: (this: SchedulerAction<T>, state?: T) => void,

24

delay?: number,

25

state?: T

26

): Subscription;

27

28

/**

29

* Current time according to scheduler

30

*/

31

now(): number;

32

}

33

34

/**

35

* Scheduler action interface

36

*/

37

interface SchedulerAction<T> extends Subscription {

38

/**

39

* Schedule this action to run again

40

* @param state - Optional state

41

* @param delay - Optional delay

42

* @returns This action for chaining

43

*/

44

schedule(state?: T, delay?: number): SchedulerAction<T>;

45

}

46

```

47

48

### asyncScheduler

49

50

Uses setTimeout/setInterval for scheduling work asynchronously.

51

52

```typescript { .api }

53

/**

54

* Async scheduler using setTimeout/setInterval (default for time-based operations)

55

*/

56

const asyncScheduler: SchedulerLike;

57

const async: SchedulerLike; // Alias for asyncScheduler

58

```

59

60

**Usage Examples:**

61

62

```typescript

63

import { of, asyncScheduler } from "rxjs";

64

import { observeOn, subscribeOn } from "rxjs/operators";

65

66

// Schedule subscription on async scheduler

67

of(1, 2, 3).pipe(

68

subscribeOn(asyncScheduler)

69

).subscribe(x => console.log('Async subscription:', x));

70

71

// Schedule observation on async scheduler

72

of(1, 2, 3).pipe(

73

observeOn(asyncScheduler)

74

).subscribe(x => console.log('Async observation:', x));

75

76

// Direct scheduling

77

asyncScheduler.schedule(() => {

78

console.log('Scheduled work executed');

79

}, 1000); // Execute after 1 second

80

```

81

82

### asapScheduler

83

84

Uses Promise.resolve() or setImmediate for scheduling work as soon as possible.

85

86

```typescript { .api }

87

/**

88

* ASAP scheduler using Promise.resolve() for microtask scheduling

89

*/

90

const asapScheduler: SchedulerLike;

91

const asap: SchedulerLike; // Alias for asapScheduler

92

```

93

94

**Usage Examples:**

95

96

```typescript

97

import { of, asapScheduler } from "rxjs";

98

import { observeOn } from "rxjs/operators";

99

100

// Schedule on microtask queue (higher priority than setTimeout)

101

of(1, 2, 3).pipe(

102

observeOn(asapScheduler)

103

).subscribe(x => console.log('ASAP:', x));

104

105

console.log('Synchronous code');

106

// Output order: 'Synchronous code', then 'ASAP: 1', 'ASAP: 2', 'ASAP: 3'

107

```

108

109

### queueScheduler

110

111

Executes work immediately on the current thread (synchronous).

112

113

```typescript { .api }

114

/**

115

* Queue scheduler for synchronous execution (immediate)

116

*/

117

const queueScheduler: SchedulerLike;

118

const queue: SchedulerLike; // Alias for queueScheduler

119

```

120

121

**Usage Examples:**

122

123

```typescript

124

import { of, queueScheduler } from "rxjs";

125

import { observeOn } from "rxjs/operators";

126

127

// Synchronous execution

128

of(1, 2, 3).pipe(

129

observeOn(queueScheduler)

130

).subscribe(x => console.log('Queue:', x));

131

132

console.log('After subscription');

133

// Output: 'Queue: 1', 'Queue: 2', 'Queue: 3', 'After subscription'

134

```

135

136

### animationFrameScheduler

137

138

Uses requestAnimationFrame for scheduling work aligned with browser rendering.

139

140

```typescript { .api }

141

/**

142

* Animation frame scheduler using requestAnimationFrame

143

*/

144

const animationFrameScheduler: SchedulerLike;

145

const animationFrame: SchedulerLike; // Alias for animationFrameScheduler

146

```

147

148

**Usage Examples:**

149

150

```typescript

151

import { interval, animationFrameScheduler } from "rxjs";

152

import { map } from "rxjs/operators";

153

154

// Smooth animation loop

155

interval(0, animationFrameScheduler).pipe(

156

map(() => performance.now())

157

).subscribe(timestamp => {

158

// Update animation at ~60fps

159

updateAnimation(timestamp);

160

});

161

162

// Schedule DOM updates

163

animationFrameScheduler.schedule(() => {

164

element.style.left = '100px';

165

console.log('DOM updated on next frame');

166

});

167

```

168

169

### VirtualTimeScheduler

170

171

Scheduler for testing with virtual time control.

172

173

```typescript { .api }

174

/**

175

* Virtual time scheduler for testing time-based operations

176

*/

177

class VirtualTimeScheduler extends AsyncScheduler {

178

/**

179

* Current virtual time

180

*/

181

frame: number;

182

183

/**

184

* Collection of scheduled actions

185

*/

186

actions: Array<AsyncAction<any>>;

187

188

/**

189

* Execute all scheduled work up to specified time

190

* @param to - Time to flush to (optional)

191

*/

192

flush(): void;

193

194

/**

195

* Get current virtual time

196

*/

197

now(): number;

198

}

199

200

/**

201

* Virtual time action

202

*/

203

class VirtualAction<T> extends AsyncAction<T> {

204

/**

205

* Index in scheduler queue

206

*/

207

index: number;

208

209

/**

210

* Whether action is active

211

*/

212

active: boolean;

213

}

214

```

215

216

**Usage Examples:**

217

218

```typescript

219

import { VirtualTimeScheduler } from "rxjs";

220

import { delay } from "rxjs/operators";

221

222

// Testing time-dependent operations

223

const scheduler = new VirtualTimeScheduler();

224

225

const source$ = of(1, 2, 3).pipe(

226

delay(1000, scheduler) // Use virtual scheduler

227

);

228

229

source$.subscribe(x => console.log('Value:', x));

230

231

// Fast-forward virtual time

232

scheduler.flush(); // Immediately executes delayed operations

233

```

234

235

## Operator Integration

236

237

### observeOn

238

239

Control which scheduler observables use for emission.

240

241

```typescript { .api }

242

/**

243

* Re-emit notifications on specified scheduler

244

* @param scheduler - Scheduler to observe on

245

* @param delay - Optional delay in milliseconds

246

* @returns Operator function changing observation scheduler

247

*/

248

function observeOn<T>(scheduler: SchedulerLike, delay?: number): OperatorFunction<T, T>;

249

```

250

251

**Usage Examples:**

252

253

```typescript

254

import { of, asyncScheduler, queueScheduler } from "rxjs";

255

import { observeOn } from "rxjs/operators";

256

257

// Change from synchronous to asynchronous

258

of(1, 2, 3).pipe(

259

observeOn(asyncScheduler)

260

).subscribe(x => console.log('Async:', x));

261

262

console.log('Synchronous code');

263

// Output: 'Synchronous code', then async values

264

```

265

266

### subscribeOn

267

268

Control which scheduler observable uses for subscription.

269

270

```typescript { .api }

271

/**

272

* Subscribe to source on specified scheduler

273

* @param scheduler - Scheduler to subscribe on

274

* @param delay - Optional delay in milliseconds

275

* @returns Operator function changing subscription scheduler

276

*/

277

function subscribeOn<T>(scheduler: SchedulerLike, delay?: number): OperatorFunction<T, T>;

278

```

279

280

**Usage Examples:**

281

282

```typescript

283

import { of, asyncScheduler } from "rxjs";

284

import { subscribeOn } from "rxjs/operators";

285

286

// Defer subscription to async scheduler

287

of(1, 2, 3).pipe(

288

subscribeOn(asyncScheduler)

289

).subscribe(x => console.log('Deferred subscription:', x));

290

291

console.log('Immediate code');

292

// Output: 'Immediate code', then subscription happens asynchronously

293

```

294

295

## Advanced Scheduler Patterns

296

297

### Custom Scheduler

298

299

```typescript

300

import { Scheduler, AsyncAction } from "rxjs";

301

302

// Custom scheduler with logging

303

class LoggingScheduler extends Scheduler {

304

constructor(SchedulerAction: typeof AsyncAction, now: () => number = Date.now) {

305

super(SchedulerAction, now);

306

}

307

308

schedule<T>(

309

work: (this: SchedulerAction<T>, state?: T) => void,

310

delay?: number,

311

state?: T

312

): Subscription {

313

console.log(`Scheduling work with delay: ${delay}ms`);

314

return super.schedule(work, delay, state);

315

}

316

}

317

318

const loggingScheduler = new LoggingScheduler(AsyncAction);

319

320

// Use custom scheduler

321

of(1, 2, 3).pipe(

322

delay(1000, loggingScheduler)

323

).subscribe(x => console.log('Value:', x));

324

```

325

326

### Scheduler Selection

327

328

```typescript

329

import {

330

asyncScheduler,

331

asapScheduler,

332

queueScheduler,

333

animationFrameScheduler

334

} from "rxjs";

335

336

function createOptimizedObservable<T>(

337

source: Observable<T>,

338

context: 'animation' | 'io' | 'computation' | 'immediate'

339

): Observable<T> {

340

const schedulers = {

341

animation: animationFrameScheduler,

342

io: asyncScheduler,

343

computation: asapScheduler,

344

immediate: queueScheduler

345

};

346

347

return source.pipe(

348

observeOn(schedulers[context])

349

);

350

}

351

352

// Usage

353

const data$ = ajax.getJSON('/api/data');

354

355

// Optimize for different contexts

356

const animationData$ = createOptimizedObservable(data$, 'animation');

357

const ioData$ = createOptimizedObservable(data$, 'io');

358

const computationData$ = createOptimizedObservable(data$, 'computation');

359

```

360

361

### Performance Optimization

362

363

```typescript

364

import { range, queueScheduler, asyncScheduler } from "rxjs";

365

import { observeOn, map } from "rxjs/operators";

366

367

// Heavy computation - use queue scheduler to avoid blocking

368

range(1, 10000).pipe(

369

map(n => heavyComputation(n)),

370

observeOn(queueScheduler) // Keep synchronous for performance

371

).subscribe(result => console.log('Computed:', result));

372

373

// UI updates - use animation frame scheduler

374

const updates$ = interval(16).pipe( // ~60fps

375

observeOn(animationFrameScheduler),

376

map(() => ({ x: Math.random() * 100, y: Math.random() * 100 }))

377

);

378

379

updates$.subscribe(pos => {

380

element.style.transform = `translate(${pos.x}px, ${pos.y}px)`;

381

});

382

383

// Network requests - use async scheduler

384

ajax.getJSON('/api/data').pipe(

385

observeOn(asyncScheduler),

386

map(processData)

387

).subscribe(data => updateUI(data));

388

```

389

390

## Testing with Schedulers

391

392

```typescript

393

import { TestScheduler } from "rxjs/testing";

394

import { delay, take } from "rxjs/operators";

395

396

const testScheduler = new TestScheduler((actual, expected) => {

397

expect(actual).toEqual(expected);

398

});

399

400

testScheduler.run(({ cold, hot, expectObservable }) => {

401

// Test delay operator

402

const source$ = cold('a-b-c|');

403

const expected = ' ---a-b-c|';

404

405

const result$ = source$.pipe(delay(30));

406

407

expectObservable(result$).toBe(expected);

408

});

409

```

410

411

## Types

412

413

```typescript { .api }

414

interface SchedulerLike {

415

now(): number;

416

schedule<T>(

417

work: (this: SchedulerAction<T>, state?: T) => void,

418

delay?: number,

419

state?: T

420

): Subscription;

421

}

422

423

interface SchedulerAction<T> extends Subscription {

424

schedule(state?: T, delay?: number): SchedulerAction<T>;

425

}

426

427

abstract class Scheduler implements SchedulerLike {

428

constructor(

429

SchedulerAction: typeof Action,

430

now?: () => number

431

);

432

433

static now: () => number;

434

435

abstract schedule<T>(

436

work: (this: SchedulerAction<T>, state?: T) => void,

437

delay?: number,

438

state?: T

439

): Subscription;

440

441

now(): number;

442

}

443

```