CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-reactivex-rxjava2--rxjava

RxJava: Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

Pending
Overview
Eval results
Files

schedulers.mddocs/

Schedulers and Threading

Control execution context and threading for reactive streams. Schedulers abstract away threading details and provide a way to specify where operations should run and where observers should be notified.

Capabilities

Built-in Schedulers

RxJava provides several built-in schedulers for common use cases.

/**
 * Factory class for built-in schedulers
 */
public final class Schedulers {
    /**
     * IO scheduler for blocking I/O operations (file, network, database)
     * Uses unbounded thread pool that grows as needed
     */
    public static Scheduler io();
    
    /**
     * Computation scheduler for CPU-intensive work
     * Thread pool size equals number of available processors
     */
    public static Scheduler computation();
    
    /**
     * Creates a new thread for each scheduled task
     */
    public static Scheduler newThread();
    
    /**
     * Single-threaded scheduler with FIFO execution
     * Useful for event loops and sequential processing
     */
    public static Scheduler single();
    
    /**
     * Trampoline scheduler that queues work on current thread
     * Executes immediately if no other work is queued
     */
    public static Scheduler trampoline();
    
    /**
     * Creates scheduler from custom Executor
     */
    public static Scheduler from(Executor executor);
    
    /**
     * Test scheduler for testing with virtual time
     */
    public static TestScheduler test();
}

Core Scheduler Interface

Abstract base class for all schedulers.

/**
 * Abstract scheduler that coordinates scheduling across time
 */
public abstract class Scheduler {
    /**
     * Creates a Worker for scheduling tasks
     * Each Worker operates on a single thread
     */
    public abstract Worker createWorker();
    
    /**
     * Schedules a task to run immediately
     */
    public Disposable scheduleDirect(Runnable run);
    
    /**
     * Schedules a task to run after a delay
     */
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit);
    
    /**
     * Schedules a task to run periodically
     */
    public Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit);
    
    /**
     * Returns current time in milliseconds
     */
    public long now(TimeUnit unit);
    
    /**
     * Starts the scheduler (for lifecycle management)
     */
    public void start();
    
    /**
     * Shuts down the scheduler
     */
    public void shutdown();
}

Worker Interface

Worker represents a sequential scheduler that executes tasks on a single thread.

/**
 * Sequential scheduler that executes tasks on a single thread
 */
public abstract static class Worker implements Disposable {
    /**
     * Schedules a task to run immediately
     */
    public abstract Disposable schedule(Runnable run);
    
    /**
     * Schedules a task to run after a delay
     */
    public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit);
    
    /**
     * Schedules a task to run periodically
     */
    public Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit);
    
    /**
     * Returns current time in milliseconds
     */
    public long now(TimeUnit unit);
    
    /**
     * Cancels all scheduled tasks and cleans up resources
     */
    public abstract void dispose();
    
    /**
     * Returns true if this worker has been disposed
     */
    public abstract boolean isDisposed();
}

Threading Operators

Control which scheduler reactive streams use for subscription and observation.

/**
 * Available on all reactive types (Observable, Flowable, Single, Maybe, Completable)
 */

/**
 * Specifies the Scheduler on which the source will operate
 * Affects where the subscription and upstream operations run
 */
public final T subscribeOn(Scheduler scheduler);

/**
 * Specifies the Scheduler on which observers will be notified
 * Affects where downstream operations and subscription callbacks run
 */
public final T observeOn(Scheduler scheduler);
public final T observeOn(Scheduler scheduler, boolean delayError);
public final T observeOn(Scheduler scheduler, boolean delayError, int bufferSize);

Test Scheduler

Special scheduler for testing with virtual time control.

/**
 * Scheduler for testing that allows manual time control
 */
public final class TestScheduler extends Scheduler {
    /**
     * Advances virtual time by the specified amount
     */
    public void advanceTimeBy(long delayTime, TimeUnit unit);
    
    /**
     * Advances virtual time to the specified point
     */
    public void advanceTimeTo(long delayTime, TimeUnit unit);
    
    /**
     * Triggers all tasks scheduled for the current virtual time
     */
    public void triggerActions();
    
    /**
     * Returns current virtual time
     */
    public long now(TimeUnit unit);
    
    /**
     * Creates a Worker bound to this test scheduler
     */
    public Worker createWorker();
}

Usage Examples

Basic Threading with subscribeOn and observeOn:

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

Observable<String> source = Observable.fromCallable(() -> {
    // This runs on IO thread due to subscribeOn
    System.out.println("Source thread: " + Thread.currentThread().getName());
    Thread.sleep(1000); // Simulate blocking I/O
    return "Data from server";
})
.subscribeOn(Schedulers.io()) // Source runs on IO scheduler
.observeOn(Schedulers.computation()); // Observer notified on computation scheduler

source.subscribe(data -> {
    // This runs on computation thread due to observeOn
    System.out.println("Observer thread: " + Thread.currentThread().getName());
    System.out.println("Received: " + data);
});

Choosing the Right Scheduler:

// I/O operations (network, file, database)
Observable<String> networkCall = Observable.fromCallable(() -> fetchFromNetwork())
    .subscribeOn(Schedulers.io());

// CPU-intensive computations
Observable<Integer> computation = Observable.range(1, 1000000)
    .map(i -> heavyComputation(i))
    .subscribeOn(Schedulers.computation());

// UI updates (Android example)
networkCall
    .observeOn(AndroidSchedulers.mainThread()) // Android-specific
    .subscribe(data -> updateUI(data));

// Sequential processing
Observable<String> sequential = Observable.just("task1", "task2", "task3")
    .subscribeOn(Schedulers.single())
    .doOnNext(task -> System.out.println("Processing: " + task));

Custom Scheduler from Executor:

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

// Create custom thread pool
ExecutorService customExecutor = Executors.newFixedThreadPool(4);
Scheduler customScheduler = Schedulers.from(customExecutor);

Observable.range(1, 10)
    .subscribeOn(customScheduler)
    .subscribe(
        value -> System.out.println("Value: " + value + " on " + Thread.currentThread().getName()),
        error -> error.printStackTrace(),
        () -> {
            System.out.println("Completed");
            customExecutor.shutdown(); // Don't forget to shutdown
        }
    );

Direct Scheduling with Scheduler:

import io.reactivex.disposables.Disposable;

Scheduler scheduler = Schedulers.io();

// Schedule immediate task
Disposable task1 = scheduler.scheduleDirect(() -> {
    System.out.println("Immediate task executed");
});

// Schedule delayed task
Disposable task2 = scheduler.scheduleDirect(() -> {
    System.out.println("Delayed task executed");
}, 2, TimeUnit.SECONDS);

// Schedule periodic task
Disposable task3 = scheduler.schedulePeriodically(() -> {
    System.out.println("Periodic task: " + System.currentTimeMillis());
}, 1, 3, TimeUnit.SECONDS);

// Cancel tasks when done
Thread.sleep(10000);
task3.dispose();

Using Worker for Sequential Tasks:

Scheduler.Worker worker = Schedulers.io().createWorker();

// All tasks scheduled on this worker run sequentially on the same thread
worker.schedule(() -> System.out.println("Task 1"));
worker.schedule(() -> System.out.println("Task 2"), 1, TimeUnit.SECONDS);
worker.schedule(() -> System.out.println("Task 3"), 2, TimeUnit.SECONDS);

// Clean up worker when done
Thread.sleep(5000);
worker.dispose();

Testing with TestScheduler:

import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.TestScheduler;

TestScheduler testScheduler = new TestScheduler();

Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS, testScheduler)
    .take(3);

TestObserver<Long> testObserver = source.test();

// Initially no emissions
testObserver.assertValueCount(0);

// Advance time by 1 second
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
testObserver.assertValueCount(1);
testObserver.assertValues(0L);

// Advance time by 2 more seconds
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
testObserver.assertValueCount(3);
testObserver.assertValues(0L, 1L, 2L);
testObserver.assertComplete();

Complex Threading Example:

Observable<String> pipeline = Observable.fromCallable(() -> {
    // Heavy I/O operation
    System.out.println("Fetching data on: " + Thread.currentThread().getName());
    Thread.sleep(2000);
    return "raw-data";
})
.subscribeOn(Schedulers.io()) // I/O operation on IO scheduler

.map(data -> {
    // CPU intensive transformation
    System.out.println("Processing data on: " + Thread.currentThread().getName());
    return data.toUpperCase() + "-processed";
})
.observeOn(Schedulers.computation()) // Switch to computation for CPU work

.flatMap(processed -> {
    // Another I/O operation
    return Observable.fromCallable(() -> {
        System.out.println("Saving data on: " + Thread.currentThread().getName());
        Thread.sleep(1000);
        return processed + "-saved";
    }).subscribeOn(Schedulers.io()); // Back to I/O for saving
})

.observeOn(Schedulers.single()); // Final result on single thread

pipeline.subscribe(
    result -> System.out.println("Final result on: " + Thread.currentThread().getName() + " -> " + result),
    error -> error.printStackTrace()
);

Error Handling with Schedulers:

Observable.fromCallable(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("Random error");
    }
    return "Success";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation(), true) // delayError = true
.retry(3)
.subscribe(
    result -> System.out.println("Result: " + result),
    error -> System.err.println("Final error: " + error)
);

Scheduler Guidelines

When to use each scheduler:

  • Schedulers.io(): File I/O, network calls, database operations, blocking operations
  • Schedulers.computation(): CPU-intensive work, mathematical computations, image processing
  • Schedulers.newThread(): When you need guaranteed separate thread (use sparingly)
  • Schedulers.single(): Sequential processing, event loops, coordination
  • Schedulers.trampoline(): Testing, when you want synchronous execution
  • Schedulers.from(executor): Custom thread pools, specific threading requirements

Best Practices:

  1. Use subscribeOn() to specify where the source Observable does its work
  2. Use observeOn() to specify where observers receive notifications
  3. Avoid blocking operations on the computation scheduler
  4. Don't forget to dispose of custom schedulers and workers
  5. Use TestScheduler for time-based testing
  6. Be careful with thread safety when sharing state between threads

Types

/**
 * Time unit enumeration
 */
public enum TimeUnit {
    NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS
}

/**
 * Interface for runnable introspection
 */
public interface SchedulerRunnableIntrospection {
    Runnable getWrappedRunnable();
}

/**
 * Timed value wrapper
 */
public final class Timed<T> {
    public long time();
    public TimeUnit unit();
    public T value();
}

Install with Tessl CLI

npx tessl i tessl/maven-io-reactivex-rxjava2--rxjava

docs

completable.md

disposables.md

error-handling.md

flowable.md

index.md

maybe.md

observable.md

schedulers.md

single.md

subjects.md

tile.json