or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

completable.mddisposables.mdflowable.mdindex.mdmaybe.mdobservable.mdschedulers.mdsingle.mdsubjects.md

schedulers.mddocs/

0

# Schedulers and Concurrency

1

2

RxJava's Scheduler system provides abstraction over different execution contexts and threading models. Schedulers control where and when reactive streams execute, enabling proper concurrency management and thread switching.

3

4

## Capabilities

5

6

### Built-in Schedulers

7

8

Pre-configured schedulers for common execution contexts.

9

10

```java { .api }

11

/**

12

* Scheduler for computation-intensive work on a fixed thread pool

13

* @return Scheduler optimized for CPU-intensive tasks

14

*/

15

public static Scheduler computation();

16

17

/**

18

* Scheduler for I/O-bound work on a dynamically-sized thread pool

19

* @return Scheduler optimized for I/O operations

20

*/

21

public static Scheduler io();

22

23

/**

24

* Single-threaded scheduler for sequential execution

25

* @return Scheduler that executes tasks sequentially on one thread

26

*/

27

public static Scheduler single();

28

29

/**

30

* Scheduler that executes immediately on the current thread

31

* @return Scheduler for immediate execution

32

*/

33

public static Scheduler trampoline();

34

35

/**

36

* Scheduler that creates a new thread for each task

37

* @return Scheduler that spawns new threads

38

*/

39

public static Scheduler newThread();

40

41

/**

42

* Create a scheduler from an existing Executor

43

* @param executor the Executor to wrap

44

* @return Scheduler backed by the provided Executor

45

*/

46

public static Scheduler from(Executor executor);

47

48

/**

49

* Create a scheduler from an ExecutorService

50

* @param executor the ExecutorService to wrap

51

* @param interruptibleWorker whether to support interruption

52

* @return Scheduler backed by the provided ExecutorService

53

*/

54

public static Scheduler from(ExecutorService executor, boolean interruptibleWorker);

55

```

56

57

### Scheduler Operations

58

59

Core operations available on all Schedulers.

60

61

```java { .api }

62

/**

63

* Abstract base class for all Schedulers

64

*/

65

public abstract class Scheduler {

66

67

/**

68

* Create a Worker for this Scheduler

69

* @return Worker instance for sequential task execution

70

*/

71

public abstract Worker createWorker();

72

73

/**

74

* Schedule a task for immediate execution

75

* @param run the Runnable to execute

76

* @return Disposable for canceling the scheduled task

77

*/

78

public Disposable scheduleDirect(Runnable run);

79

80

/**

81

* Schedule a task with a delay

82

* @param run the Runnable to execute

83

* @param delay the delay before execution

84

* @param unit the time unit for the delay

85

* @return Disposable for canceling the scheduled task

86

*/

87

public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit);

88

89

/**

90

* Schedule a task periodically

91

* @param run the Runnable to execute repeatedly

92

* @param initialDelay delay before first execution

93

* @param period period between subsequent executions

94

* @param unit the time unit for delays and period

95

* @return Disposable for canceling the scheduled task

96

*/

97

public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit);

98

99

/**

100

* Get the current time in milliseconds

101

* @param unit the time unit to return

102

* @return current time in the specified unit

103

*/

104

public long now(TimeUnit unit);

105

106

/**

107

* Start the Scheduler (if applicable)

108

*/

109

public void start();

110

111

/**

112

* Shutdown the Scheduler and clean up resources

113

*/

114

public void shutdown();

115

}

116

```

117

118

### Worker Operations

119

120

Workers provide sequential task execution within a Scheduler.

121

122

```java { .api }

123

/**

124

* Abstract base class for Scheduler Workers

125

*/

126

public abstract class Worker implements Disposable {

127

128

/**

129

* Schedule a task for immediate execution

130

* @param run the Runnable to execute

131

* @return Disposable for canceling the scheduled task

132

*/

133

public abstract Disposable schedule(Runnable run);

134

135

/**

136

* Schedule a task with a delay

137

* @param run the Runnable to execute

138

* @param delay the delay before execution

139

* @param unit the time unit for the delay

140

* @return Disposable for canceling the scheduled task

141

*/

142

public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit);

143

144

/**

145

* Schedule a task periodically

146

* @param run the Runnable to execute repeatedly

147

* @param initialDelay delay before first execution

148

* @param period period between subsequent executions

149

* @param unit the time unit for delays and period

150

* @return Disposable for canceling the scheduled task

151

*/

152

public Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit);

153

154

/**

155

* Get the current time in milliseconds

156

* @param unit the time unit to return

157

* @return current time in the specified unit

158

*/

159

public long now(TimeUnit unit);

160

161

/**

162

* Dispose of this Worker and cancel all scheduled tasks

163

*/

164

public abstract void dispose();

165

166

/**

167

* Check if this Worker has been disposed

168

* @return true if disposed, false otherwise

169

*/

170

public abstract boolean isDisposed();

171

}

172

```

173

174

### Scheduler Usage Patterns

175

176

Common patterns for using schedulers with reactive streams.

177

178

```java { .api }

179

/**

180

* Apply scheduler to subscription (where subscription happens)

181

* Available on all reactive types (Observable, Flowable, Single, Maybe, Completable)

182

*/

183

public final ReactiveType<T> subscribeOn(Scheduler scheduler);

184

185

/**

186

* Apply scheduler to observation (where results are observed)

187

* Available on all reactive types (Observable, Flowable, Single, Maybe, Completable)

188

*/

189

public final ReactiveType<T> observeOn(Scheduler scheduler);

190

191

/**

192

* Apply scheduler with buffer size for observeOn

193

* Available on Observable and Flowable

194

*/

195

public final ReactiveType<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize);

196

```

197

198

### Custom Scheduler Creation

199

200

Create custom schedulers for specific use cases.

201

202

```java { .api }

203

/**

204

* Create a custom Scheduler from scratch

205

*/

206

public abstract class Scheduler {

207

/**

208

* Create a new Scheduler instance

209

* @param threadFactory factory for creating threads

210

* @return custom Scheduler instance

211

*/

212

public static Scheduler from(ExecutorService executor);

213

214

/**

215

* Create a single-threaded Scheduler

216

* @param threadFactory factory for the single thread

217

* @return single-threaded Scheduler

218

*/

219

public static Scheduler single(ThreadFactory threadFactory);

220

}

221

222

/**

223

* Test Scheduler for unit testing with virtual time

224

*/

225

public final class TestScheduler extends Scheduler {

226

/**

227

* Create a TestScheduler instance

228

*/

229

public TestScheduler();

230

231

/**

232

* Trigger all scheduled tasks up to the specified time

233

* @param delayTime time to advance to

234

* @param unit time unit

235

*/

236

public void advanceTimeBy(long delayTime, TimeUnit unit);

237

238

/**

239

* Trigger all scheduled tasks

240

*/

241

public void triggerActions();

242

}

243

```

244

245

## Types

246

247

```java { .api }

248

/**

249

* Represents a unit of work that can be scheduled

250

*/

251

public interface SchedulerRunnableIntrospection {

252

/**

253

* Get the wrapped Runnable

254

* @return the underlying Runnable

255

*/

256

Runnable getWrappedRunnable();

257

}

258

259

/**

260

* Hook interface for Scheduler plugins

261

*/

262

public interface SchedulerSupplier extends Supplier<Scheduler> {

263

/**

264

* Supply a Scheduler instance

265

* @return Scheduler instance

266

*/

267

Scheduler get();

268

}

269

```

270

271

**Usage Examples:**

272

273

```java

274

import io.reactivex.rxjava3.core.*;

275

import io.reactivex.rxjava3.schedulers.Schedulers;

276

import java.util.concurrent.*;

277

278

// Basic scheduler usage

279

Observable.just("Hello")

280

.subscribeOn(Schedulers.io()) // Subscribe on I/O thread

281

.observeOn(Schedulers.single()) // Observe on single thread

282

.subscribe(System.out::println);

283

284

// Computation-intensive work

285

Observable.range(1, 1000000)

286

.subscribeOn(Schedulers.computation())

287

.map(x -> x * x) // CPU-intensive operation

288

.observeOn(Schedulers.single())

289

.subscribe(System.out::println);

290

291

// I/O operations

292

Single.fromCallable(() -> {

293

// Simulate database call

294

Thread.sleep(1000);

295

return "Database result";

296

})

297

.subscribeOn(Schedulers.io())

298

.observeOn(Schedulers.single())

299

.subscribe(System.out::println);

300

301

// Custom executor integration

302

ExecutorService customExecutor = Executors.newFixedThreadPool(4);

303

Scheduler customScheduler = Schedulers.from(customExecutor);

304

305

Observable.range(1, 10)

306

.subscribeOn(customScheduler)

307

.subscribe(System.out::println);

308

309

// Direct scheduling

310

Scheduler scheduler = Schedulers.io();

311

Disposable task = scheduler.scheduleDirect(() -> {

312

System.out.println("Scheduled task executed");

313

}, 1, TimeUnit.SECONDS);

314

315

// Worker usage for sequential tasks

316

Scheduler.Worker worker = Schedulers.computation().createWorker();

317

worker.schedule(() -> System.out.println("Task 1"));

318

worker.schedule(() -> System.out.println("Task 2"), 100, TimeUnit.MILLISECONDS);

319

320

// Clean up

321

worker.dispose();

322

323

// Test scheduling for unit tests

324

TestScheduler testScheduler = new TestScheduler();

325

Observable.interval(1, TimeUnit.SECONDS, testScheduler)

326

.take(3)

327

.subscribe(System.out::println);

328

329

// Advance virtual time

330

testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);

331

332

// Periodic scheduling

333

Disposable periodicTask = Schedulers.single()

334

.schedulePeriodicallyDirect(

335

() -> System.out.println("Periodic task"),

336

0, // Initial delay

337

1, // Period

338

TimeUnit.SECONDS

339

);

340

341

// Cancel after some time

342

Thread.sleep(5000);

343

periodicTask.dispose();

344

```

345

346

## Scheduler Guidelines

347

348

### When to Use Each Scheduler

349

350

- **`Schedulers.computation()`**: CPU-intensive work, mathematical computations, image processing

351

- **`Schedulers.io()`**: I/O operations, network calls, file operations, database access

352

- **`Schedulers.single()`**: Sequential operations, updating UI, maintaining order

353

- **`Schedulers.trampoline()`**: Immediate execution, testing, avoiding stack overflow

354

- **`Schedulers.newThread()`**: Long-running operations that need dedicated threads

355

- **Custom schedulers**: Specialized threading requirements, integration with existing thread pools

356

357

### Best Practices

358

359

1. **Use `subscribeOn()` to control where the source operates**

360

2. **Use `observeOn()` to control where downstream operators and observers run**

361

3. **Dispose of Workers when done to prevent memory leaks**

362

4. **Use `TestScheduler` for deterministic testing with virtual time**

363

5. **Avoid blocking operations on computation scheduler threads**

364

6. **Prefer I/O scheduler for blocking operations like network and database calls**