or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

completable.mddisposables.mderror-handling.mdflowable.mdindex.mdmaybe.mdobservable.mdschedulers.mdsingle.mdsubjects.md

schedulers.mddocs/

0

# Schedulers and Threading

1

2

Control execution context and threading for reactive streams. Schedulers abstract away threading details and provide a way to specify where operations should run and where observers should be notified.

3

4

## Capabilities

5

6

### Built-in Schedulers

7

8

RxJava provides several built-in schedulers for common use cases.

9

10

```java { .api }

11

/**

12

* Factory class for built-in schedulers

13

*/

14

public final class Schedulers {

15

/**

16

* IO scheduler for blocking I/O operations (file, network, database)

17

* Uses unbounded thread pool that grows as needed

18

*/

19

public static Scheduler io();

20

21

/**

22

* Computation scheduler for CPU-intensive work

23

* Thread pool size equals number of available processors

24

*/

25

public static Scheduler computation();

26

27

/**

28

* Creates a new thread for each scheduled task

29

*/

30

public static Scheduler newThread();

31

32

/**

33

* Single-threaded scheduler with FIFO execution

34

* Useful for event loops and sequential processing

35

*/

36

public static Scheduler single();

37

38

/**

39

* Trampoline scheduler that queues work on current thread

40

* Executes immediately if no other work is queued

41

*/

42

public static Scheduler trampoline();

43

44

/**

45

* Creates scheduler from custom Executor

46

*/

47

public static Scheduler from(Executor executor);

48

49

/**

50

* Test scheduler for testing with virtual time

51

*/

52

public static TestScheduler test();

53

}

54

```

55

56

### Core Scheduler Interface

57

58

Abstract base class for all schedulers.

59

60

```java { .api }

61

/**

62

* Abstract scheduler that coordinates scheduling across time

63

*/

64

public abstract class Scheduler {

65

/**

66

* Creates a Worker for scheduling tasks

67

* Each Worker operates on a single thread

68

*/

69

public abstract Worker createWorker();

70

71

/**

72

* Schedules a task to run immediately

73

*/

74

public Disposable scheduleDirect(Runnable run);

75

76

/**

77

* Schedules a task to run after a delay

78

*/

79

public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit);

80

81

/**

82

* Schedules a task to run periodically

83

*/

84

public Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit);

85

86

/**

87

* Returns current time in milliseconds

88

*/

89

public long now(TimeUnit unit);

90

91

/**

92

* Starts the scheduler (for lifecycle management)

93

*/

94

public void start();

95

96

/**

97

* Shuts down the scheduler

98

*/

99

public void shutdown();

100

}

101

```

102

103

### Worker Interface

104

105

Worker represents a sequential scheduler that executes tasks on a single thread.

106

107

```java { .api }

108

/**

109

* Sequential scheduler that executes tasks on a single thread

110

*/

111

public abstract static class Worker implements Disposable {

112

/**

113

* Schedules a task to run immediately

114

*/

115

public abstract Disposable schedule(Runnable run);

116

117

/**

118

* Schedules a task to run after a delay

119

*/

120

public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit);

121

122

/**

123

* Schedules a task to run periodically

124

*/

125

public Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit);

126

127

/**

128

* Returns current time in milliseconds

129

*/

130

public long now(TimeUnit unit);

131

132

/**

133

* Cancels all scheduled tasks and cleans up resources

134

*/

135

public abstract void dispose();

136

137

/**

138

* Returns true if this worker has been disposed

139

*/

140

public abstract boolean isDisposed();

141

}

142

```

143

144

### Threading Operators

145

146

Control which scheduler reactive streams use for subscription and observation.

147

148

```java { .api }

149

/**

150

* Available on all reactive types (Observable, Flowable, Single, Maybe, Completable)

151

*/

152

153

/**

154

* Specifies the Scheduler on which the source will operate

155

* Affects where the subscription and upstream operations run

156

*/

157

public final T subscribeOn(Scheduler scheduler);

158

159

/**

160

* Specifies the Scheduler on which observers will be notified

161

* Affects where downstream operations and subscription callbacks run

162

*/

163

public final T observeOn(Scheduler scheduler);

164

public final T observeOn(Scheduler scheduler, boolean delayError);

165

public final T observeOn(Scheduler scheduler, boolean delayError, int bufferSize);

166

```

167

168

### Test Scheduler

169

170

Special scheduler for testing with virtual time control.

171

172

```java { .api }

173

/**

174

* Scheduler for testing that allows manual time control

175

*/

176

public final class TestScheduler extends Scheduler {

177

/**

178

* Advances virtual time by the specified amount

179

*/

180

public void advanceTimeBy(long delayTime, TimeUnit unit);

181

182

/**

183

* Advances virtual time to the specified point

184

*/

185

public void advanceTimeTo(long delayTime, TimeUnit unit);

186

187

/**

188

* Triggers all tasks scheduled for the current virtual time

189

*/

190

public void triggerActions();

191

192

/**

193

* Returns current virtual time

194

*/

195

public long now(TimeUnit unit);

196

197

/**

198

* Creates a Worker bound to this test scheduler

199

*/

200

public Worker createWorker();

201

}

202

```

203

204

## Usage Examples

205

206

**Basic Threading with subscribeOn and observeOn:**

207

208

```java

209

import io.reactivex.Observable;

210

import io.reactivex.schedulers.Schedulers;

211

212

Observable<String> source = Observable.fromCallable(() -> {

213

// This runs on IO thread due to subscribeOn

214

System.out.println("Source thread: " + Thread.currentThread().getName());

215

Thread.sleep(1000); // Simulate blocking I/O

216

return "Data from server";

217

})

218

.subscribeOn(Schedulers.io()) // Source runs on IO scheduler

219

.observeOn(Schedulers.computation()); // Observer notified on computation scheduler

220

221

source.subscribe(data -> {

222

// This runs on computation thread due to observeOn

223

System.out.println("Observer thread: " + Thread.currentThread().getName());

224

System.out.println("Received: " + data);

225

});

226

```

227

228

**Choosing the Right Scheduler:**

229

230

```java

231

// I/O operations (network, file, database)

232

Observable<String> networkCall = Observable.fromCallable(() -> fetchFromNetwork())

233

.subscribeOn(Schedulers.io());

234

235

// CPU-intensive computations

236

Observable<Integer> computation = Observable.range(1, 1000000)

237

.map(i -> heavyComputation(i))

238

.subscribeOn(Schedulers.computation());

239

240

// UI updates (Android example)

241

networkCall

242

.observeOn(AndroidSchedulers.mainThread()) // Android-specific

243

.subscribe(data -> updateUI(data));

244

245

// Sequential processing

246

Observable<String> sequential = Observable.just("task1", "task2", "task3")

247

.subscribeOn(Schedulers.single())

248

.doOnNext(task -> System.out.println("Processing: " + task));

249

```

250

251

**Custom Scheduler from Executor:**

252

253

```java

254

import java.util.concurrent.Executors;

255

import java.util.concurrent.ExecutorService;

256

257

// Create custom thread pool

258

ExecutorService customExecutor = Executors.newFixedThreadPool(4);

259

Scheduler customScheduler = Schedulers.from(customExecutor);

260

261

Observable.range(1, 10)

262

.subscribeOn(customScheduler)

263

.subscribe(

264

value -> System.out.println("Value: " + value + " on " + Thread.currentThread().getName()),

265

error -> error.printStackTrace(),

266

() -> {

267

System.out.println("Completed");

268

customExecutor.shutdown(); // Don't forget to shutdown

269

}

270

);

271

```

272

273

**Direct Scheduling with Scheduler:**

274

275

```java

276

import io.reactivex.disposables.Disposable;

277

278

Scheduler scheduler = Schedulers.io();

279

280

// Schedule immediate task

281

Disposable task1 = scheduler.scheduleDirect(() -> {

282

System.out.println("Immediate task executed");

283

});

284

285

// Schedule delayed task

286

Disposable task2 = scheduler.scheduleDirect(() -> {

287

System.out.println("Delayed task executed");

288

}, 2, TimeUnit.SECONDS);

289

290

// Schedule periodic task

291

Disposable task3 = scheduler.schedulePeriodically(() -> {

292

System.out.println("Periodic task: " + System.currentTimeMillis());

293

}, 1, 3, TimeUnit.SECONDS);

294

295

// Cancel tasks when done

296

Thread.sleep(10000);

297

task3.dispose();

298

```

299

300

**Using Worker for Sequential Tasks:**

301

302

```java

303

Scheduler.Worker worker = Schedulers.io().createWorker();

304

305

// All tasks scheduled on this worker run sequentially on the same thread

306

worker.schedule(() -> System.out.println("Task 1"));

307

worker.schedule(() -> System.out.println("Task 2"), 1, TimeUnit.SECONDS);

308

worker.schedule(() -> System.out.println("Task 3"), 2, TimeUnit.SECONDS);

309

310

// Clean up worker when done

311

Thread.sleep(5000);

312

worker.dispose();

313

```

314

315

**Testing with TestScheduler:**

316

317

```java

318

import io.reactivex.observers.TestObserver;

319

import io.reactivex.schedulers.TestScheduler;

320

321

TestScheduler testScheduler = new TestScheduler();

322

323

Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS, testScheduler)

324

.take(3);

325

326

TestObserver<Long> testObserver = source.test();

327

328

// Initially no emissions

329

testObserver.assertValueCount(0);

330

331

// Advance time by 1 second

332

testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);

333

testObserver.assertValueCount(1);

334

testObserver.assertValues(0L);

335

336

// Advance time by 2 more seconds

337

testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);

338

testObserver.assertValueCount(3);

339

testObserver.assertValues(0L, 1L, 2L);

340

testObserver.assertComplete();

341

```

342

343

**Complex Threading Example:**

344

345

```java

346

Observable<String> pipeline = Observable.fromCallable(() -> {

347

// Heavy I/O operation

348

System.out.println("Fetching data on: " + Thread.currentThread().getName());

349

Thread.sleep(2000);

350

return "raw-data";

351

})

352

.subscribeOn(Schedulers.io()) // I/O operation on IO scheduler

353

354

.map(data -> {

355

// CPU intensive transformation

356

System.out.println("Processing data on: " + Thread.currentThread().getName());

357

return data.toUpperCase() + "-processed";

358

})

359

.observeOn(Schedulers.computation()) // Switch to computation for CPU work

360

361

.flatMap(processed -> {

362

// Another I/O operation

363

return Observable.fromCallable(() -> {

364

System.out.println("Saving data on: " + Thread.currentThread().getName());

365

Thread.sleep(1000);

366

return processed + "-saved";

367

}).subscribeOn(Schedulers.io()); // Back to I/O for saving

368

})

369

370

.observeOn(Schedulers.single()); // Final result on single thread

371

372

pipeline.subscribe(

373

result -> System.out.println("Final result on: " + Thread.currentThread().getName() + " -> " + result),

374

error -> error.printStackTrace()

375

);

376

```

377

378

**Error Handling with Schedulers:**

379

380

```java

381

Observable.fromCallable(() -> {

382

if (Math.random() > 0.5) {

383

throw new RuntimeException("Random error");

384

}

385

return "Success";

386

})

387

.subscribeOn(Schedulers.io())

388

.observeOn(Schedulers.computation(), true) // delayError = true

389

.retry(3)

390

.subscribe(

391

result -> System.out.println("Result: " + result),

392

error -> System.err.println("Final error: " + error)

393

);

394

```

395

396

## Scheduler Guidelines

397

398

**When to use each scheduler:**

399

400

- **`Schedulers.io()`**: File I/O, network calls, database operations, blocking operations

401

- **`Schedulers.computation()`**: CPU-intensive work, mathematical computations, image processing

402

- **`Schedulers.newThread()`**: When you need guaranteed separate thread (use sparingly)

403

- **`Schedulers.single()`**: Sequential processing, event loops, coordination

404

- **`Schedulers.trampoline()`**: Testing, when you want synchronous execution

405

- **`Schedulers.from(executor)`**: Custom thread pools, specific threading requirements

406

407

**Best Practices:**

408

409

1. Use `subscribeOn()` to specify where the source Observable does its work

410

2. Use `observeOn()` to specify where observers receive notifications

411

3. Avoid blocking operations on the computation scheduler

412

4. Don't forget to dispose of custom schedulers and workers

413

5. Use TestScheduler for time-based testing

414

6. Be careful with thread safety when sharing state between threads

415

416

## Types

417

418

```java { .api }

419

/**

420

* Time unit enumeration

421

*/

422

public enum TimeUnit {

423

NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS

424

}

425

426

/**

427

* Interface for runnable introspection

428

*/

429

public interface SchedulerRunnableIntrospection {

430

Runnable getWrappedRunnable();

431

}

432

433

/**

434

* Timed value wrapper

435

*/

436

public final class Timed<T> {

437

public long time();

438

public TimeUnit unit();

439

public T value();

440

}

441

```