RxJava: Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
—
Control execution context and threading for reactive streams. Schedulers abstract away threading details and provide a way to specify where operations should run and where observers should be notified.
RxJava provides several built-in schedulers for common use cases.
/**
* Factory class for built-in schedulers
*/
public final class Schedulers {
/**
* IO scheduler for blocking I/O operations (file, network, database)
* Uses unbounded thread pool that grows as needed
*/
public static Scheduler io();
/**
* Computation scheduler for CPU-intensive work
* Thread pool size equals number of available processors
*/
public static Scheduler computation();
/**
* Creates a new thread for each scheduled task
*/
public static Scheduler newThread();
/**
* Single-threaded scheduler with FIFO execution
* Useful for event loops and sequential processing
*/
public static Scheduler single();
/**
* Trampoline scheduler that queues work on current thread
* Executes immediately if no other work is queued
*/
public static Scheduler trampoline();
/**
* Creates scheduler from custom Executor
*/
public static Scheduler from(Executor executor);
/**
* Test scheduler for testing with virtual time
*/
public static TestScheduler test();
}Abstract base class for all schedulers.
/**
* Abstract scheduler that coordinates scheduling across time
*/
public abstract class Scheduler {
/**
* Creates a Worker for scheduling tasks
* Each Worker operates on a single thread
*/
public abstract Worker createWorker();
/**
* Schedules a task to run immediately
*/
public Disposable scheduleDirect(Runnable run);
/**
* Schedules a task to run after a delay
*/
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit);
/**
* Schedules a task to run periodically
*/
public Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit);
/**
* Returns current time in milliseconds
*/
public long now(TimeUnit unit);
/**
* Starts the scheduler (for lifecycle management)
*/
public void start();
/**
* Shuts down the scheduler
*/
public void shutdown();
}Worker represents a sequential scheduler that executes tasks on a single thread.
/**
* Sequential scheduler that executes tasks on a single thread
*/
public abstract static class Worker implements Disposable {
/**
* Schedules a task to run immediately
*/
public abstract Disposable schedule(Runnable run);
/**
* Schedules a task to run after a delay
*/
public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit);
/**
* Schedules a task to run periodically
*/
public Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit);
/**
* Returns current time in milliseconds
*/
public long now(TimeUnit unit);
/**
* Cancels all scheduled tasks and cleans up resources
*/
public abstract void dispose();
/**
* Returns true if this worker has been disposed
*/
public abstract boolean isDisposed();
}Control which scheduler reactive streams use for subscription and observation.
/**
* Available on all reactive types (Observable, Flowable, Single, Maybe, Completable)
*/
/**
* Specifies the Scheduler on which the source will operate
* Affects where the subscription and upstream operations run
*/
public final T subscribeOn(Scheduler scheduler);
/**
* Specifies the Scheduler on which observers will be notified
* Affects where downstream operations and subscription callbacks run
*/
public final T observeOn(Scheduler scheduler);
public final T observeOn(Scheduler scheduler, boolean delayError);
public final T observeOn(Scheduler scheduler, boolean delayError, int bufferSize);Special scheduler for testing with virtual time control.
/**
* Scheduler for testing that allows manual time control
*/
public final class TestScheduler extends Scheduler {
/**
* Advances virtual time by the specified amount
*/
public void advanceTimeBy(long delayTime, TimeUnit unit);
/**
* Advances virtual time to the specified point
*/
public void advanceTimeTo(long delayTime, TimeUnit unit);
/**
* Triggers all tasks scheduled for the current virtual time
*/
public void triggerActions();
/**
* Returns current virtual time
*/
public long now(TimeUnit unit);
/**
* Creates a Worker bound to this test scheduler
*/
public Worker createWorker();
}Basic Threading with subscribeOn and observeOn:
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
Observable<String> source = Observable.fromCallable(() -> {
// This runs on IO thread due to subscribeOn
System.out.println("Source thread: " + Thread.currentThread().getName());
Thread.sleep(1000); // Simulate blocking I/O
return "Data from server";
})
.subscribeOn(Schedulers.io()) // Source runs on IO scheduler
.observeOn(Schedulers.computation()); // Observer notified on computation scheduler
source.subscribe(data -> {
// This runs on computation thread due to observeOn
System.out.println("Observer thread: " + Thread.currentThread().getName());
System.out.println("Received: " + data);
});Choosing the Right Scheduler:
// I/O operations (network, file, database)
Observable<String> networkCall = Observable.fromCallable(() -> fetchFromNetwork())
.subscribeOn(Schedulers.io());
// CPU-intensive computations
Observable<Integer> computation = Observable.range(1, 1000000)
.map(i -> heavyComputation(i))
.subscribeOn(Schedulers.computation());
// UI updates (Android example)
networkCall
.observeOn(AndroidSchedulers.mainThread()) // Android-specific
.subscribe(data -> updateUI(data));
// Sequential processing
Observable<String> sequential = Observable.just("task1", "task2", "task3")
.subscribeOn(Schedulers.single())
.doOnNext(task -> System.out.println("Processing: " + task));Custom Scheduler from Executor:
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
// Create custom thread pool
ExecutorService customExecutor = Executors.newFixedThreadPool(4);
Scheduler customScheduler = Schedulers.from(customExecutor);
Observable.range(1, 10)
.subscribeOn(customScheduler)
.subscribe(
value -> System.out.println("Value: " + value + " on " + Thread.currentThread().getName()),
error -> error.printStackTrace(),
() -> {
System.out.println("Completed");
customExecutor.shutdown(); // Don't forget to shutdown
}
);Direct Scheduling with Scheduler:
import io.reactivex.disposables.Disposable;
Scheduler scheduler = Schedulers.io();
// Schedule immediate task
Disposable task1 = scheduler.scheduleDirect(() -> {
System.out.println("Immediate task executed");
});
// Schedule delayed task
Disposable task2 = scheduler.scheduleDirect(() -> {
System.out.println("Delayed task executed");
}, 2, TimeUnit.SECONDS);
// Schedule periodic task
Disposable task3 = scheduler.schedulePeriodically(() -> {
System.out.println("Periodic task: " + System.currentTimeMillis());
}, 1, 3, TimeUnit.SECONDS);
// Cancel tasks when done
Thread.sleep(10000);
task3.dispose();Using Worker for Sequential Tasks:
Scheduler.Worker worker = Schedulers.io().createWorker();
// All tasks scheduled on this worker run sequentially on the same thread
worker.schedule(() -> System.out.println("Task 1"));
worker.schedule(() -> System.out.println("Task 2"), 1, TimeUnit.SECONDS);
worker.schedule(() -> System.out.println("Task 3"), 2, TimeUnit.SECONDS);
// Clean up worker when done
Thread.sleep(5000);
worker.dispose();Testing with TestScheduler:
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.TestScheduler;
TestScheduler testScheduler = new TestScheduler();
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS, testScheduler)
.take(3);
TestObserver<Long> testObserver = source.test();
// Initially no emissions
testObserver.assertValueCount(0);
// Advance time by 1 second
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
testObserver.assertValueCount(1);
testObserver.assertValues(0L);
// Advance time by 2 more seconds
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
testObserver.assertValueCount(3);
testObserver.assertValues(0L, 1L, 2L);
testObserver.assertComplete();Complex Threading Example:
Observable<String> pipeline = Observable.fromCallable(() -> {
// Heavy I/O operation
System.out.println("Fetching data on: " + Thread.currentThread().getName());
Thread.sleep(2000);
return "raw-data";
})
.subscribeOn(Schedulers.io()) // I/O operation on IO scheduler
.map(data -> {
// CPU intensive transformation
System.out.println("Processing data on: " + Thread.currentThread().getName());
return data.toUpperCase() + "-processed";
})
.observeOn(Schedulers.computation()) // Switch to computation for CPU work
.flatMap(processed -> {
// Another I/O operation
return Observable.fromCallable(() -> {
System.out.println("Saving data on: " + Thread.currentThread().getName());
Thread.sleep(1000);
return processed + "-saved";
}).subscribeOn(Schedulers.io()); // Back to I/O for saving
})
.observeOn(Schedulers.single()); // Final result on single thread
pipeline.subscribe(
result -> System.out.println("Final result on: " + Thread.currentThread().getName() + " -> " + result),
error -> error.printStackTrace()
);Error Handling with Schedulers:
Observable.fromCallable(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Random error");
}
return "Success";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation(), true) // delayError = true
.retry(3)
.subscribe(
result -> System.out.println("Result: " + result),
error -> System.err.println("Final error: " + error)
);When to use each scheduler:
Schedulers.io(): File I/O, network calls, database operations, blocking operationsSchedulers.computation(): CPU-intensive work, mathematical computations, image processingSchedulers.newThread(): When you need guaranteed separate thread (use sparingly)Schedulers.single(): Sequential processing, event loops, coordinationSchedulers.trampoline(): Testing, when you want synchronous executionSchedulers.from(executor): Custom thread pools, specific threading requirementsBest Practices:
subscribeOn() to specify where the source Observable does its workobserveOn() to specify where observers receive notifications/**
* Time unit enumeration
*/
public enum TimeUnit {
NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS
}
/**
* Interface for runnable introspection
*/
public interface SchedulerRunnableIntrospection {
Runnable getWrappedRunnable();
}
/**
* Timed value wrapper
*/
public final class Timed<T> {
public long time();
public TimeUnit unit();
public T value();
}Install with Tessl CLI
npx tessl i tessl/maven-io-reactivex-rxjava2--rxjava