RxJava: Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
npx @tessl/cli install tessl/maven-io-reactivex-rxjava2--rxjava@2.2.0RxJava 2.x is a Java VM implementation of Reactive Extensions that provides a comprehensive library for composing asynchronous and event-based programs using observable sequences. It extends the observer pattern to support data/event sequences with operators for declarative composition while abstracting low-level threading, synchronization, and concurrent data structures.
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.21</version>
</dependency>implementation 'io.reactivex.rxjava2:rxjava:2.2.21'import io.reactivex.Observable;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.Maybe;
import io.reactivex.Completable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
// Create and subscribe to an Observable
Observable<String> observable = Observable.just("Hello", "World")
.map(s -> s.toUpperCase())
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation());
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// Handle subscription
}
@Override
public void onNext(String value) {
System.out.println(value);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Completed");
}
});
// Lambda-style subscription
Observable.fromArray(1, 2, 3, 4, 5)
.filter(x -> x % 2 == 0)
.map(x -> x * x)
.subscribe(
value -> System.out.println("Value: " + value),
error -> error.printStackTrace(),
() -> System.out.println("Complete")
);RxJava is built around several key components:
Cold observable sequences for 0-N items without backpressure support. Ideal for UI events, HTTP requests, and general reactive programming patterns.
public abstract class Observable<T> implements ObservableSource<T> {
// Factory methods
public static <T> Observable<T> just(T... items);
public static <T> Observable<T> fromArray(T... array);
public static <T> Observable<T> fromIterable(Iterable<? extends T> source);
public static Observable<Long> interval(long period, TimeUnit unit);
public static <T> Observable<T> create(ObservableOnSubscribe<T> source);
// Transformation operators
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper);
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
public final Observable<T> filter(Predicate<? super T> predicate);
// Threading
public final Observable<T> subscribeOn(Scheduler scheduler);
public final Observable<T> observeOn(Scheduler scheduler);
// Subscription
public final Disposable subscribe(Consumer<? super T> onNext);
public final void subscribe(Observer<? super T> observer);
}Reactive streams with backpressure support for 0-N items. Compatible with Reactive Streams specification for handling fast data producers.
public abstract class Flowable<T> implements Publisher<T> {
// Factory methods with backpressure handling
public static <T> Flowable<T> just(T... items);
public static <T> Flowable<T> fromArray(T... array);
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode);
// Backpressure operators
public final Flowable<T> onBackpressureBuffer();
public final Flowable<T> onBackpressureDrop();
public final Flowable<T> onBackpressureLatest();
// Standard operators
public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper);
public final Flowable<T> filter(Predicate<? super T> predicate);
// Subscription
public final void subscribe(FlowableSubscriber<? super T> subscriber);
}Reactive type that emits exactly one value or an error. Perfect for async operations that return a single result like HTTP requests.
public abstract class Single<T> implements SingleSource<T> {
// Factory methods
public static <T> Single<T> just(T item);
public static <T> Single<T> fromCallable(Callable<? extends T> callable);
public static <T> Single<T> create(SingleOnSubscribe<T> source);
// Transformation
public final <R> Single<R> map(Function<? super T, ? extends R> mapper);
public final <R> Single<R> flatMap(Function<? super T, ? extends SingleSource<? extends R>> mapper);
// Error handling
public final Single<T> onErrorReturn(Function<? super Throwable, ? extends T> valueSupplier);
// Subscription
public final Disposable subscribe(Consumer<? super T> onSuccess);
public final void subscribe(SingleObserver<? super T> observer);
}Reactive type that emits 0 or 1 item or an error. Useful for operations that may or may not return a value.
public abstract class Maybe<T> implements MaybeSource<T> {
// Factory methods
public static <T> Maybe<T> just(T item);
public static <T> Maybe<T> empty();
public static <T> Maybe<T> fromCallable(Callable<? extends T> callable);
// Transformation
public final <R> Maybe<R> map(Function<? super T, ? extends R> mapper);
public final Maybe<T> filter(Predicate<? super T> predicate);
public final Maybe<T> defaultIfEmpty(T defaultItem);
// Subscription
public final Disposable subscribe(Consumer<? super T> onSuccess);
public final void subscribe(MaybeObserver<? super T> observer);
}Reactive type that only signals completion or error without emitting items. Ideal for fire-and-forget operations.
public abstract class Completable implements CompletableSource {
// Factory methods
public static Completable complete();
public static Completable fromAction(Action action);
public static Completable fromRunnable(Runnable runnable);
// Chaining
public final <T> Observable<T> andThen(ObservableSource<T> next);
public final Completable andThen(CompletableSource next);
// Subscription
public final Disposable subscribe(Action onComplete);
public final void subscribe(CompletableObserver observer);
}Control execution context and threading for reactive streams.
public abstract class Scheduler {
public abstract Worker createWorker();
public Disposable scheduleDirect(Runnable run);
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit);
}
public final class Schedulers {
public static Scheduler io();
public static Scheduler computation();
public static Scheduler newThread();
public static Scheduler single();
public static Scheduler trampoline();
public static Scheduler from(Executor executor);
}Hot observables that can multicast to multiple observers and emit items regardless of subscriptions.
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
public abstract boolean hasObservers();
public abstract boolean hasThrowable();
public abstract boolean hasComplete();
}
// Main subject types
public final class PublishSubject<T> extends Subject<T>;
public final class BehaviorSubject<T> extends Subject<T>;
public final class ReplaySubject<T> extends Subject<T>;
public final class AsyncSubject<T> extends Subject<T>;Disposable pattern for managing subscriptions and preventing memory leaks.
public interface Disposable {
void dispose();
boolean isDisposed();
}
public final class CompositeDisposable implements Disposable {
public boolean add(Disposable disposable);
public boolean remove(Disposable disposable);
public void clear();
}Comprehensive error handling and recovery mechanisms.
// Error handling operators available on all reactive types
public final Observable<T> onErrorReturn(Function<? super Throwable, ? extends T> valueSupplier);
public final Observable<T> onErrorResumeNext(Function<? super Throwable, ? extends ObservableSource<? extends T>> resumeFunction);
public final Observable<T> retry();
public final Observable<T> retry(long times);
public final Observable<T> retryWhen(Function<? super Observable<Throwable>, ? extends ObservableSource<?>> handler);// Observer interfaces
public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T t);
void onError(Throwable e);
void onComplete();
}
public interface SingleObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T t);
void onError(Throwable e);
}
public interface MaybeObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T t);
void onError(Throwable e);
void onComplete();
}
public interface CompletableObserver {
void onSubscribe(Disposable d);
void onComplete();
void onError(Throwable e);
}
// Functional interfaces
public interface Consumer<T> {
void accept(T t) throws Exception;
}
public interface Function<T, R> {
R apply(T t) throws Exception;
}
public interface Predicate<T> {
boolean test(T t) throws Exception;
}
public interface Action {
void run() throws Exception;
}
// Backpressure strategies
public enum BackpressureStrategy {
MISSING, ERROR, BUFFER, DROP, LATEST
}