0
# Schedulers and Concurrency
1
2
RxJava's Scheduler system provides abstraction over different execution contexts and threading models. Schedulers control where and when reactive streams execute, enabling proper concurrency management and thread switching.
3
4
## Capabilities
5
6
### Built-in Schedulers
7
8
Pre-configured schedulers for common execution contexts.
9
10
```java { .api }
11
/**
12
* Scheduler for computation-intensive work on a fixed thread pool
13
* @return Scheduler optimized for CPU-intensive tasks
14
*/
15
public static Scheduler computation();
16
17
/**
18
* Scheduler for I/O-bound work on a dynamically-sized thread pool
19
* @return Scheduler optimized for I/O operations
20
*/
21
public static Scheduler io();
22
23
/**
24
* Single-threaded scheduler for sequential execution
25
* @return Scheduler that executes tasks sequentially on one thread
26
*/
27
public static Scheduler single();
28
29
/**
30
* Scheduler that executes immediately on the current thread
31
* @return Scheduler for immediate execution
32
*/
33
public static Scheduler trampoline();
34
35
/**
36
* Scheduler that creates a new thread for each task
37
* @return Scheduler that spawns new threads
38
*/
39
public static Scheduler newThread();
40
41
/**
42
* Create a scheduler from an existing Executor
43
* @param executor the Executor to wrap
44
* @return Scheduler backed by the provided Executor
45
*/
46
public static Scheduler from(Executor executor);
47
48
/**
49
* Create a scheduler from an ExecutorService
50
* @param executor the ExecutorService to wrap
51
* @param interruptibleWorker whether to support interruption
52
* @return Scheduler backed by the provided ExecutorService
53
*/
54
public static Scheduler from(ExecutorService executor, boolean interruptibleWorker);
55
```
56
57
### Scheduler Operations
58
59
Core operations available on all Schedulers.
60
61
```java { .api }
62
/**
63
* Abstract base class for all Schedulers
64
*/
65
public abstract class Scheduler {
66
67
/**
68
* Create a Worker for this Scheduler
69
* @return Worker instance for sequential task execution
70
*/
71
public abstract Worker createWorker();
72
73
/**
74
* Schedule a task for immediate execution
75
* @param run the Runnable to execute
76
* @return Disposable for canceling the scheduled task
77
*/
78
public Disposable scheduleDirect(Runnable run);
79
80
/**
81
* Schedule a task with a delay
82
* @param run the Runnable to execute
83
* @param delay the delay before execution
84
* @param unit the time unit for the delay
85
* @return Disposable for canceling the scheduled task
86
*/
87
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit);
88
89
/**
90
* Schedule a task periodically
91
* @param run the Runnable to execute repeatedly
92
* @param initialDelay delay before first execution
93
* @param period period between subsequent executions
94
* @param unit the time unit for delays and period
95
* @return Disposable for canceling the scheduled task
96
*/
97
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit);
98
99
/**
100
* Get the current time in milliseconds
101
* @param unit the time unit to return
102
* @return current time in the specified unit
103
*/
104
public long now(TimeUnit unit);
105
106
/**
107
* Start the Scheduler (if applicable)
108
*/
109
public void start();
110
111
/**
112
* Shutdown the Scheduler and clean up resources
113
*/
114
public void shutdown();
115
}
116
```
117
118
### Worker Operations
119
120
Workers provide sequential task execution within a Scheduler.
121
122
```java { .api }
123
/**
124
* Abstract base class for Scheduler Workers
125
*/
126
public abstract class Worker implements Disposable {
127
128
/**
129
* Schedule a task for immediate execution
130
* @param run the Runnable to execute
131
* @return Disposable for canceling the scheduled task
132
*/
133
public abstract Disposable schedule(Runnable run);
134
135
/**
136
* Schedule a task with a delay
137
* @param run the Runnable to execute
138
* @param delay the delay before execution
139
* @param unit the time unit for the delay
140
* @return Disposable for canceling the scheduled task
141
*/
142
public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit);
143
144
/**
145
* Schedule a task periodically
146
* @param run the Runnable to execute repeatedly
147
* @param initialDelay delay before first execution
148
* @param period period between subsequent executions
149
* @param unit the time unit for delays and period
150
* @return Disposable for canceling the scheduled task
151
*/
152
public Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit);
153
154
/**
155
* Get the current time in milliseconds
156
* @param unit the time unit to return
157
* @return current time in the specified unit
158
*/
159
public long now(TimeUnit unit);
160
161
/**
162
* Dispose of this Worker and cancel all scheduled tasks
163
*/
164
public abstract void dispose();
165
166
/**
167
* Check if this Worker has been disposed
168
* @return true if disposed, false otherwise
169
*/
170
public abstract boolean isDisposed();
171
}
172
```
173
174
### Scheduler Usage Patterns
175
176
Common patterns for using schedulers with reactive streams.
177
178
```java { .api }
179
/**
180
* Apply scheduler to subscription (where subscription happens)
181
* Available on all reactive types (Observable, Flowable, Single, Maybe, Completable)
182
*/
183
public final ReactiveType<T> subscribeOn(Scheduler scheduler);
184
185
/**
186
* Apply scheduler to observation (where results are observed)
187
* Available on all reactive types (Observable, Flowable, Single, Maybe, Completable)
188
*/
189
public final ReactiveType<T> observeOn(Scheduler scheduler);
190
191
/**
192
* Apply scheduler with buffer size for observeOn
193
* Available on Observable and Flowable
194
*/
195
public final ReactiveType<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize);
196
```
197
198
### Custom Scheduler Creation
199
200
Create custom schedulers for specific use cases.
201
202
```java { .api }
203
/**
204
* Create a custom Scheduler from scratch
205
*/
206
public abstract class Scheduler {
207
/**
208
* Create a new Scheduler instance
209
* @param threadFactory factory for creating threads
210
* @return custom Scheduler instance
211
*/
212
public static Scheduler from(ExecutorService executor);
213
214
/**
215
* Create a single-threaded Scheduler
216
* @param threadFactory factory for the single thread
217
* @return single-threaded Scheduler
218
*/
219
public static Scheduler single(ThreadFactory threadFactory);
220
}
221
222
/**
223
* Test Scheduler for unit testing with virtual time
224
*/
225
public final class TestScheduler extends Scheduler {
226
/**
227
* Create a TestScheduler instance
228
*/
229
public TestScheduler();
230
231
/**
232
* Trigger all scheduled tasks up to the specified time
233
* @param delayTime time to advance to
234
* @param unit time unit
235
*/
236
public void advanceTimeBy(long delayTime, TimeUnit unit);
237
238
/**
239
* Trigger all scheduled tasks
240
*/
241
public void triggerActions();
242
}
243
```
244
245
## Types
246
247
```java { .api }
248
/**
249
* Represents a unit of work that can be scheduled
250
*/
251
public interface SchedulerRunnableIntrospection {
252
/**
253
* Get the wrapped Runnable
254
* @return the underlying Runnable
255
*/
256
Runnable getWrappedRunnable();
257
}
258
259
/**
260
* Hook interface for Scheduler plugins
261
*/
262
public interface SchedulerSupplier extends Supplier<Scheduler> {
263
/**
264
* Supply a Scheduler instance
265
* @return Scheduler instance
266
*/
267
Scheduler get();
268
}
269
```
270
271
**Usage Examples:**
272
273
```java
274
import io.reactivex.rxjava3.core.*;
275
import io.reactivex.rxjava3.schedulers.Schedulers;
276
import java.util.concurrent.*;
277
278
// Basic scheduler usage
279
Observable.just("Hello")
280
.subscribeOn(Schedulers.io()) // Subscribe on I/O thread
281
.observeOn(Schedulers.single()) // Observe on single thread
282
.subscribe(System.out::println);
283
284
// Computation-intensive work
285
Observable.range(1, 1000000)
286
.subscribeOn(Schedulers.computation())
287
.map(x -> x * x) // CPU-intensive operation
288
.observeOn(Schedulers.single())
289
.subscribe(System.out::println);
290
291
// I/O operations
292
Single.fromCallable(() -> {
293
// Simulate database call
294
Thread.sleep(1000);
295
return "Database result";
296
})
297
.subscribeOn(Schedulers.io())
298
.observeOn(Schedulers.single())
299
.subscribe(System.out::println);
300
301
// Custom executor integration
302
ExecutorService customExecutor = Executors.newFixedThreadPool(4);
303
Scheduler customScheduler = Schedulers.from(customExecutor);
304
305
Observable.range(1, 10)
306
.subscribeOn(customScheduler)
307
.subscribe(System.out::println);
308
309
// Direct scheduling
310
Scheduler scheduler = Schedulers.io();
311
Disposable task = scheduler.scheduleDirect(() -> {
312
System.out.println("Scheduled task executed");
313
}, 1, TimeUnit.SECONDS);
314
315
// Worker usage for sequential tasks
316
Scheduler.Worker worker = Schedulers.computation().createWorker();
317
worker.schedule(() -> System.out.println("Task 1"));
318
worker.schedule(() -> System.out.println("Task 2"), 100, TimeUnit.MILLISECONDS);
319
320
// Clean up
321
worker.dispose();
322
323
// Test scheduling for unit tests
324
TestScheduler testScheduler = new TestScheduler();
325
Observable.interval(1, TimeUnit.SECONDS, testScheduler)
326
.take(3)
327
.subscribe(System.out::println);
328
329
// Advance virtual time
330
testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);
331
332
// Periodic scheduling
333
Disposable periodicTask = Schedulers.single()
334
.schedulePeriodicallyDirect(
335
() -> System.out.println("Periodic task"),
336
0, // Initial delay
337
1, // Period
338
TimeUnit.SECONDS
339
);
340
341
// Cancel after some time
342
Thread.sleep(5000);
343
periodicTask.dispose();
344
```
345
346
## Scheduler Guidelines
347
348
### When to Use Each Scheduler
349
350
- **`Schedulers.computation()`**: CPU-intensive work, mathematical computations, image processing
351
- **`Schedulers.io()`**: I/O operations, network calls, file operations, database access
352
- **`Schedulers.single()`**: Sequential operations, updating UI, maintaining order
353
- **`Schedulers.trampoline()`**: Immediate execution, testing, avoiding stack overflow
354
- **`Schedulers.newThread()`**: Long-running operations that need dedicated threads
355
- **Custom schedulers**: Specialized threading requirements, integration with existing thread pools
356
357
### Best Practices
358
359
1. **Use `subscribeOn()` to control where the source operates**
360
2. **Use `observeOn()` to control where downstream operators and observers run**
361
3. **Dispose of Workers when done to prevent memory leaks**
362
4. **Use `TestScheduler` for deterministic testing with virtual time**
363
5. **Avoid blocking operations on computation scheduler threads**
364
6. **Prefer I/O scheduler for blocking operations like network and database calls**