0
# Schedulers and Threading
1
2
Control execution context and threading for reactive streams. Schedulers abstract away threading details and provide a way to specify where operations should run and where observers should be notified.
3
4
## Capabilities
5
6
### Built-in Schedulers
7
8
RxJava provides several built-in schedulers for common use cases.
9
10
```java { .api }
11
/**
12
* Factory class for built-in schedulers
13
*/
14
public final class Schedulers {
15
/**
16
* IO scheduler for blocking I/O operations (file, network, database)
17
* Uses unbounded thread pool that grows as needed
18
*/
19
public static Scheduler io();
20
21
/**
22
* Computation scheduler for CPU-intensive work
23
* Thread pool size equals number of available processors
24
*/
25
public static Scheduler computation();
26
27
/**
28
* Creates a new thread for each scheduled task
29
*/
30
public static Scheduler newThread();
31
32
/**
33
* Single-threaded scheduler with FIFO execution
34
* Useful for event loops and sequential processing
35
*/
36
public static Scheduler single();
37
38
/**
39
* Trampoline scheduler that queues work on current thread
40
* Executes immediately if no other work is queued
41
*/
42
public static Scheduler trampoline();
43
44
/**
45
* Creates scheduler from custom Executor
46
*/
47
public static Scheduler from(Executor executor);
48
49
/**
50
* Test scheduler for testing with virtual time
51
*/
52
public static TestScheduler test();
53
}
54
```
55
56
### Core Scheduler Interface
57
58
Abstract base class for all schedulers.
59
60
```java { .api }
61
/**
62
* Abstract scheduler that coordinates scheduling across time
63
*/
64
public abstract class Scheduler {
65
/**
66
* Creates a Worker for scheduling tasks
67
* Each Worker operates on a single thread
68
*/
69
public abstract Worker createWorker();
70
71
/**
72
* Schedules a task to run immediately
73
*/
74
public Disposable scheduleDirect(Runnable run);
75
76
/**
77
* Schedules a task to run after a delay
78
*/
79
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit);
80
81
/**
82
* Schedules a task to run periodically
83
*/
84
public Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit);
85
86
/**
87
* Returns current time in milliseconds
88
*/
89
public long now(TimeUnit unit);
90
91
/**
92
* Starts the scheduler (for lifecycle management)
93
*/
94
public void start();
95
96
/**
97
* Shuts down the scheduler
98
*/
99
public void shutdown();
100
}
101
```
102
103
### Worker Interface
104
105
Worker represents a sequential scheduler that executes tasks on a single thread.
106
107
```java { .api }
108
/**
109
* Sequential scheduler that executes tasks on a single thread
110
*/
111
public abstract static class Worker implements Disposable {
112
/**
113
* Schedules a task to run immediately
114
*/
115
public abstract Disposable schedule(Runnable run);
116
117
/**
118
* Schedules a task to run after a delay
119
*/
120
public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit);
121
122
/**
123
* Schedules a task to run periodically
124
*/
125
public Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit);
126
127
/**
128
* Returns current time in milliseconds
129
*/
130
public long now(TimeUnit unit);
131
132
/**
133
* Cancels all scheduled tasks and cleans up resources
134
*/
135
public abstract void dispose();
136
137
/**
138
* Returns true if this worker has been disposed
139
*/
140
public abstract boolean isDisposed();
141
}
142
```
143
144
### Threading Operators
145
146
Control which scheduler reactive streams use for subscription and observation.
147
148
```java { .api }
149
/**
150
* Available on all reactive types (Observable, Flowable, Single, Maybe, Completable)
151
*/
152
153
/**
154
* Specifies the Scheduler on which the source will operate
155
* Affects where the subscription and upstream operations run
156
*/
157
public final T subscribeOn(Scheduler scheduler);
158
159
/**
160
* Specifies the Scheduler on which observers will be notified
161
* Affects where downstream operations and subscription callbacks run
162
*/
163
public final T observeOn(Scheduler scheduler);
164
public final T observeOn(Scheduler scheduler, boolean delayError);
165
public final T observeOn(Scheduler scheduler, boolean delayError, int bufferSize);
166
```
167
168
### Test Scheduler
169
170
Special scheduler for testing with virtual time control.
171
172
```java { .api }
173
/**
174
* Scheduler for testing that allows manual time control
175
*/
176
public final class TestScheduler extends Scheduler {
177
/**
178
* Advances virtual time by the specified amount
179
*/
180
public void advanceTimeBy(long delayTime, TimeUnit unit);
181
182
/**
183
* Advances virtual time to the specified point
184
*/
185
public void advanceTimeTo(long delayTime, TimeUnit unit);
186
187
/**
188
* Triggers all tasks scheduled for the current virtual time
189
*/
190
public void triggerActions();
191
192
/**
193
* Returns current virtual time
194
*/
195
public long now(TimeUnit unit);
196
197
/**
198
* Creates a Worker bound to this test scheduler
199
*/
200
public Worker createWorker();
201
}
202
```
203
204
## Usage Examples
205
206
**Basic Threading with subscribeOn and observeOn:**
207
208
```java
209
import io.reactivex.Observable;
210
import io.reactivex.schedulers.Schedulers;
211
212
Observable<String> source = Observable.fromCallable(() -> {
213
// This runs on IO thread due to subscribeOn
214
System.out.println("Source thread: " + Thread.currentThread().getName());
215
Thread.sleep(1000); // Simulate blocking I/O
216
return "Data from server";
217
})
218
.subscribeOn(Schedulers.io()) // Source runs on IO scheduler
219
.observeOn(Schedulers.computation()); // Observer notified on computation scheduler
220
221
source.subscribe(data -> {
222
// This runs on computation thread due to observeOn
223
System.out.println("Observer thread: " + Thread.currentThread().getName());
224
System.out.println("Received: " + data);
225
});
226
```
227
228
**Choosing the Right Scheduler:**
229
230
```java
231
// I/O operations (network, file, database)
232
Observable<String> networkCall = Observable.fromCallable(() -> fetchFromNetwork())
233
.subscribeOn(Schedulers.io());
234
235
// CPU-intensive computations
236
Observable<Integer> computation = Observable.range(1, 1000000)
237
.map(i -> heavyComputation(i))
238
.subscribeOn(Schedulers.computation());
239
240
// UI updates (Android example)
241
networkCall
242
.observeOn(AndroidSchedulers.mainThread()) // Android-specific
243
.subscribe(data -> updateUI(data));
244
245
// Sequential processing
246
Observable<String> sequential = Observable.just("task1", "task2", "task3")
247
.subscribeOn(Schedulers.single())
248
.doOnNext(task -> System.out.println("Processing: " + task));
249
```
250
251
**Custom Scheduler from Executor:**
252
253
```java
254
import java.util.concurrent.Executors;
255
import java.util.concurrent.ExecutorService;
256
257
// Create custom thread pool
258
ExecutorService customExecutor = Executors.newFixedThreadPool(4);
259
Scheduler customScheduler = Schedulers.from(customExecutor);
260
261
Observable.range(1, 10)
262
.subscribeOn(customScheduler)
263
.subscribe(
264
value -> System.out.println("Value: " + value + " on " + Thread.currentThread().getName()),
265
error -> error.printStackTrace(),
266
() -> {
267
System.out.println("Completed");
268
customExecutor.shutdown(); // Don't forget to shutdown
269
}
270
);
271
```
272
273
**Direct Scheduling with Scheduler:**
274
275
```java
276
import io.reactivex.disposables.Disposable;
277
278
Scheduler scheduler = Schedulers.io();
279
280
// Schedule immediate task
281
Disposable task1 = scheduler.scheduleDirect(() -> {
282
System.out.println("Immediate task executed");
283
});
284
285
// Schedule delayed task
286
Disposable task2 = scheduler.scheduleDirect(() -> {
287
System.out.println("Delayed task executed");
288
}, 2, TimeUnit.SECONDS);
289
290
// Schedule periodic task
291
Disposable task3 = scheduler.schedulePeriodically(() -> {
292
System.out.println("Periodic task: " + System.currentTimeMillis());
293
}, 1, 3, TimeUnit.SECONDS);
294
295
// Cancel tasks when done
296
Thread.sleep(10000);
297
task3.dispose();
298
```
299
300
**Using Worker for Sequential Tasks:**
301
302
```java
303
Scheduler.Worker worker = Schedulers.io().createWorker();
304
305
// All tasks scheduled on this worker run sequentially on the same thread
306
worker.schedule(() -> System.out.println("Task 1"));
307
worker.schedule(() -> System.out.println("Task 2"), 1, TimeUnit.SECONDS);
308
worker.schedule(() -> System.out.println("Task 3"), 2, TimeUnit.SECONDS);
309
310
// Clean up worker when done
311
Thread.sleep(5000);
312
worker.dispose();
313
```
314
315
**Testing with TestScheduler:**
316
317
```java
318
import io.reactivex.observers.TestObserver;
319
import io.reactivex.schedulers.TestScheduler;
320
321
TestScheduler testScheduler = new TestScheduler();
322
323
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS, testScheduler)
324
.take(3);
325
326
TestObserver<Long> testObserver = source.test();
327
328
// Initially no emissions
329
testObserver.assertValueCount(0);
330
331
// Advance time by 1 second
332
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
333
testObserver.assertValueCount(1);
334
testObserver.assertValues(0L);
335
336
// Advance time by 2 more seconds
337
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
338
testObserver.assertValueCount(3);
339
testObserver.assertValues(0L, 1L, 2L);
340
testObserver.assertComplete();
341
```
342
343
**Complex Threading Example:**
344
345
```java
346
Observable<String> pipeline = Observable.fromCallable(() -> {
347
// Heavy I/O operation
348
System.out.println("Fetching data on: " + Thread.currentThread().getName());
349
Thread.sleep(2000);
350
return "raw-data";
351
})
352
.subscribeOn(Schedulers.io()) // I/O operation on IO scheduler
353
354
.map(data -> {
355
// CPU intensive transformation
356
System.out.println("Processing data on: " + Thread.currentThread().getName());
357
return data.toUpperCase() + "-processed";
358
})
359
.observeOn(Schedulers.computation()) // Switch to computation for CPU work
360
361
.flatMap(processed -> {
362
// Another I/O operation
363
return Observable.fromCallable(() -> {
364
System.out.println("Saving data on: " + Thread.currentThread().getName());
365
Thread.sleep(1000);
366
return processed + "-saved";
367
}).subscribeOn(Schedulers.io()); // Back to I/O for saving
368
})
369
370
.observeOn(Schedulers.single()); // Final result on single thread
371
372
pipeline.subscribe(
373
result -> System.out.println("Final result on: " + Thread.currentThread().getName() + " -> " + result),
374
error -> error.printStackTrace()
375
);
376
```
377
378
**Error Handling with Schedulers:**
379
380
```java
381
Observable.fromCallable(() -> {
382
if (Math.random() > 0.5) {
383
throw new RuntimeException("Random error");
384
}
385
return "Success";
386
})
387
.subscribeOn(Schedulers.io())
388
.observeOn(Schedulers.computation(), true) // delayError = true
389
.retry(3)
390
.subscribe(
391
result -> System.out.println("Result: " + result),
392
error -> System.err.println("Final error: " + error)
393
);
394
```
395
396
## Scheduler Guidelines
397
398
**When to use each scheduler:**
399
400
- **`Schedulers.io()`**: File I/O, network calls, database operations, blocking operations
401
- **`Schedulers.computation()`**: CPU-intensive work, mathematical computations, image processing
402
- **`Schedulers.newThread()`**: When you need guaranteed separate thread (use sparingly)
403
- **`Schedulers.single()`**: Sequential processing, event loops, coordination
404
- **`Schedulers.trampoline()`**: Testing, when you want synchronous execution
405
- **`Schedulers.from(executor)`**: Custom thread pools, specific threading requirements
406
407
**Best Practices:**
408
409
1. Use `subscribeOn()` to specify where the source Observable does its work
410
2. Use `observeOn()` to specify where observers receive notifications
411
3. Avoid blocking operations on the computation scheduler
412
4. Don't forget to dispose of custom schedulers and workers
413
5. Use TestScheduler for time-based testing
414
6. Be careful with thread safety when sharing state between threads
415
416
## Types
417
418
```java { .api }
419
/**
420
* Time unit enumeration
421
*/
422
public enum TimeUnit {
423
NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS
424
}
425
426
/**
427
* Interface for runnable introspection
428
*/
429
public interface SchedulerRunnableIntrospection {
430
Runnable getWrappedRunnable();
431
}
432
433
/**
434
* Timed value wrapper
435
*/
436
public final class Timed<T> {
437
public long time();
438
public TimeUnit unit();
439
public T value();
440
}
441
```