Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
—
RxJava's Scheduler system provides abstraction over different execution contexts and threading models. Schedulers control where and when reactive streams execute, enabling proper concurrency management and thread switching.
Pre-configured schedulers for common execution contexts.
/**
* Scheduler for computation-intensive work on a fixed thread pool
* @return Scheduler optimized for CPU-intensive tasks
*/
public static Scheduler computation();
/**
* Scheduler for I/O-bound work on a dynamically-sized thread pool
* @return Scheduler optimized for I/O operations
*/
public static Scheduler io();
/**
* Single-threaded scheduler for sequential execution
* @return Scheduler that executes tasks sequentially on one thread
*/
public static Scheduler single();
/**
* Scheduler that executes immediately on the current thread
* @return Scheduler for immediate execution
*/
public static Scheduler trampoline();
/**
* Scheduler that creates a new thread for each task
* @return Scheduler that spawns new threads
*/
public static Scheduler newThread();
/**
* Create a scheduler from an existing Executor
* @param executor the Executor to wrap
* @return Scheduler backed by the provided Executor
*/
public static Scheduler from(Executor executor);
/**
* Create a scheduler from an ExecutorService
* @param executor the ExecutorService to wrap
* @param interruptibleWorker whether to support interruption
* @return Scheduler backed by the provided ExecutorService
*/
public static Scheduler from(ExecutorService executor, boolean interruptibleWorker);Core operations available on all Schedulers.
/**
* Abstract base class for all Schedulers
*/
public abstract class Scheduler {
/**
* Create a Worker for this Scheduler
* @return Worker instance for sequential task execution
*/
public abstract Worker createWorker();
/**
* Schedule a task for immediate execution
* @param run the Runnable to execute
* @return Disposable for canceling the scheduled task
*/
public Disposable scheduleDirect(Runnable run);
/**
* Schedule a task with a delay
* @param run the Runnable to execute
* @param delay the delay before execution
* @param unit the time unit for the delay
* @return Disposable for canceling the scheduled task
*/
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit);
/**
* Schedule a task periodically
* @param run the Runnable to execute repeatedly
* @param initialDelay delay before first execution
* @param period period between subsequent executions
* @param unit the time unit for delays and period
* @return Disposable for canceling the scheduled task
*/
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit);
/**
* Get the current time in milliseconds
* @param unit the time unit to return
* @return current time in the specified unit
*/
public long now(TimeUnit unit);
/**
* Start the Scheduler (if applicable)
*/
public void start();
/**
* Shutdown the Scheduler and clean up resources
*/
public void shutdown();
}Workers provide sequential task execution within a Scheduler.
/**
* Abstract base class for Scheduler Workers
*/
public abstract class Worker implements Disposable {
/**
* Schedule a task for immediate execution
* @param run the Runnable to execute
* @return Disposable for canceling the scheduled task
*/
public abstract Disposable schedule(Runnable run);
/**
* Schedule a task with a delay
* @param run the Runnable to execute
* @param delay the delay before execution
* @param unit the time unit for the delay
* @return Disposable for canceling the scheduled task
*/
public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit);
/**
* Schedule a task periodically
* @param run the Runnable to execute repeatedly
* @param initialDelay delay before first execution
* @param period period between subsequent executions
* @param unit the time unit for delays and period
* @return Disposable for canceling the scheduled task
*/
public Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit);
/**
* Get the current time in milliseconds
* @param unit the time unit to return
* @return current time in the specified unit
*/
public long now(TimeUnit unit);
/**
* Dispose of this Worker and cancel all scheduled tasks
*/
public abstract void dispose();
/**
* Check if this Worker has been disposed
* @return true if disposed, false otherwise
*/
public abstract boolean isDisposed();
}Common patterns for using schedulers with reactive streams.
/**
* Apply scheduler to subscription (where subscription happens)
* Available on all reactive types (Observable, Flowable, Single, Maybe, Completable)
*/
public final ReactiveType<T> subscribeOn(Scheduler scheduler);
/**
* Apply scheduler to observation (where results are observed)
* Available on all reactive types (Observable, Flowable, Single, Maybe, Completable)
*/
public final ReactiveType<T> observeOn(Scheduler scheduler);
/**
* Apply scheduler with buffer size for observeOn
* Available on Observable and Flowable
*/
public final ReactiveType<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize);Create custom schedulers for specific use cases.
/**
* Create a custom Scheduler from scratch
*/
public abstract class Scheduler {
/**
* Create a new Scheduler instance
* @param threadFactory factory for creating threads
* @return custom Scheduler instance
*/
public static Scheduler from(ExecutorService executor);
/**
* Create a single-threaded Scheduler
* @param threadFactory factory for the single thread
* @return single-threaded Scheduler
*/
public static Scheduler single(ThreadFactory threadFactory);
}
/**
* Test Scheduler for unit testing with virtual time
*/
public final class TestScheduler extends Scheduler {
/**
* Create a TestScheduler instance
*/
public TestScheduler();
/**
* Trigger all scheduled tasks up to the specified time
* @param delayTime time to advance to
* @param unit time unit
*/
public void advanceTimeBy(long delayTime, TimeUnit unit);
/**
* Trigger all scheduled tasks
*/
public void triggerActions();
}/**
* Represents a unit of work that can be scheduled
*/
public interface SchedulerRunnableIntrospection {
/**
* Get the wrapped Runnable
* @return the underlying Runnable
*/
Runnable getWrappedRunnable();
}
/**
* Hook interface for Scheduler plugins
*/
public interface SchedulerSupplier extends Supplier<Scheduler> {
/**
* Supply a Scheduler instance
* @return Scheduler instance
*/
Scheduler get();
}Usage Examples:
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.*;
// Basic scheduler usage
Observable.just("Hello")
.subscribeOn(Schedulers.io()) // Subscribe on I/O thread
.observeOn(Schedulers.single()) // Observe on single thread
.subscribe(System.out::println);
// Computation-intensive work
Observable.range(1, 1000000)
.subscribeOn(Schedulers.computation())
.map(x -> x * x) // CPU-intensive operation
.observeOn(Schedulers.single())
.subscribe(System.out::println);
// I/O operations
Single.fromCallable(() -> {
// Simulate database call
Thread.sleep(1000);
return "Database result";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println);
// Custom executor integration
ExecutorService customExecutor = Executors.newFixedThreadPool(4);
Scheduler customScheduler = Schedulers.from(customExecutor);
Observable.range(1, 10)
.subscribeOn(customScheduler)
.subscribe(System.out::println);
// Direct scheduling
Scheduler scheduler = Schedulers.io();
Disposable task = scheduler.scheduleDirect(() -> {
System.out.println("Scheduled task executed");
}, 1, TimeUnit.SECONDS);
// Worker usage for sequential tasks
Scheduler.Worker worker = Schedulers.computation().createWorker();
worker.schedule(() -> System.out.println("Task 1"));
worker.schedule(() -> System.out.println("Task 2"), 100, TimeUnit.MILLISECONDS);
// Clean up
worker.dispose();
// Test scheduling for unit tests
TestScheduler testScheduler = new TestScheduler();
Observable.interval(1, TimeUnit.SECONDS, testScheduler)
.take(3)
.subscribe(System.out::println);
// Advance virtual time
testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);
// Periodic scheduling
Disposable periodicTask = Schedulers.single()
.schedulePeriodicallyDirect(
() -> System.out.println("Periodic task"),
0, // Initial delay
1, // Period
TimeUnit.SECONDS
);
// Cancel after some time
Thread.sleep(5000);
periodicTask.dispose();Schedulers.computation(): CPU-intensive work, mathematical computations, image processingSchedulers.io(): I/O operations, network calls, file operations, database accessSchedulers.single(): Sequential operations, updating UI, maintaining orderSchedulers.trampoline(): Immediate execution, testing, avoiding stack overflowSchedulers.newThread(): Long-running operations that need dedicated threadssubscribeOn() to control where the source operatesobserveOn() to control where downstream operators and observers runTestScheduler for deterministic testing with virtual timeInstall with Tessl CLI
npx tessl i tessl/maven-io-reactivex-rxjava3--rxjava