RxJava: Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
—
Cold observable sequences for 0-N items without backpressure support. Observable is the most commonly used reactive type in RxJava, ideal for UI events, HTTP requests, and general reactive programming patterns.
Factory methods for creating Observable instances from various sources.
/**
* Creates an Observable that emits the provided items then completes
*/
public static <T> Observable<T> just(T item);
public static <T> Observable<T> just(T item1, T item2);
public static <T> Observable<T> just(T item1, T item2, T item3);
// ... up to 10 items
/**
* Creates an Observable that emits all items from an array
*/
public static <T> Observable<T> fromArray(T... array);
/**
* Creates an Observable that emits all items from an Iterable
*/
public static <T> Observable<T> fromIterable(Iterable<? extends T> source);
/**
* Creates an Observable from a Callable that will be called for each observer
*/
public static <T> Observable<T> fromCallable(Callable<? extends T> callable);
/**
* Creates an Observable from a Future
*/
public static <T> Observable<T> fromFuture(Future<? extends T> future);
/**
* Creates an Observable using the provided ObservableOnSubscribe function
*/
public static <T> Observable<T> create(ObservableOnSubscribe<T> source);
/**
* Creates an Observable that emits sequential numbers every specified interval
*/
public static Observable<Long> interval(long period, TimeUnit unit);
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit);
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler);
/**
* Creates an Observable that emits a range of sequential integers
*/
public static Observable<Integer> range(int start, int count);
/**
* Creates an Observable that emits a single 0L after a delay
*/
public static Observable<Long> timer(long delay, TimeUnit unit);
public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler);
/**
* Creates an empty Observable that only calls onComplete
*/
public static <T> Observable<T> empty();
/**
* Creates an Observable that never emits any items and never terminates
*/
public static <T> Observable<T> never();
/**
* Creates an Observable that only calls onError
*/
public static <T> Observable<T> error(Throwable exception);
public static <T> Observable<T> error(Callable<? extends Throwable> errorSupplier);
/**
* Defers the creation of the Observable until subscription
*/
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> observableSupplier);Transform items emitted by an Observable.
/**
* Transforms items by applying a function to each item
*/
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper);
/**
* Transforms items into Observables and flattens them into a single Observable
*/
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int maxConcurrency);
/**
* Similar to flatMap but maintains the order of the original items
*/
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
/**
* Similar to flatMap but only subscribes to the most recent inner Observable
*/
public final <R> Observable<R> switchMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
/**
* Casts each item to the specified type
*/
public final <U> Observable<U> cast(Class<U> clazz);
/**
* Applies a function to each item and emits the result
*/
public final <R> Observable<R> scan(BiFunction<R, ? super T, R> accumulator);
public final <R> Observable<R> scan(R initialValue, BiFunction<R, ? super T, R> accumulator);
/**
* Groups items by a key selector function
*/
public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector);
/**
* Collects items into buffers
*/
public final Observable<List<T>> buffer(int count);
public final Observable<List<T>> buffer(long timespan, TimeUnit unit);
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, Scheduler scheduler);
/**
* Creates non-overlapping windows of items
*/
public final Observable<Observable<T>> window(int count);
public final Observable<Observable<T>> window(long timespan, TimeUnit unit);Filter items emitted by an Observable.
/**
* Filters items based on a predicate
*/
public final Observable<T> filter(Predicate<? super T> predicate);
/**
* Emits only the first n items
*/
public final Observable<T> take(long count);
/**
* Emits items for a specified duration
*/
public final Observable<T> take(long time, TimeUnit unit);
/**
* Skips the first n items
*/
public final Observable<T> skip(long count);
/**
* Skips items for a specified duration
*/
public final Observable<T> skip(long time, TimeUnit unit);
/**
* Emits only distinct items
*/
public final Observable<T> distinct();
public final <K> Observable<T> distinct(Function<? super T, K> keySelector);
/**
* Emits only items that are different from the previous item
*/
public final Observable<T> distinctUntilChanged();
public final <K> Observable<T> distinctUntilChanged(Function<? super T, K> keySelector);
/**
* Emits only the first item that matches a predicate
*/
public final Observable<T> takeWhile(Predicate<? super T> predicate);
/**
* Skips items while a predicate is true
*/
public final Observable<T> skipWhile(Predicate<? super T> predicate);
/**
* Emits only the first item, or throws NoSuchElementException if empty
*/
public final Observable<T> first(T defaultItem);
/**
* Emits only the last item
*/
public final Observable<T> last(T defaultItem);
/**
* Emits only the single item, or throws exception if more than one
*/
public final Observable<T> single(T defaultItem);
/**
* Ignores all items and only emits completion
*/
public final Completable ignoreElements();Combine multiple Observables.
/**
* Combines two Observables by emitting an item when either emits
*/
public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2);
public static <T> Observable<T> merge(ObservableSource<? extends ObservableSource<? extends T>> sources);
/**
* Concatenates Observables sequentially
*/
public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2);
public static <T> Observable<T> concat(ObservableSource<? extends ObservableSource<? extends T>> sources);
/**
* Combines latest values from multiple Observables
*/
public static <T1, T2, R> Observable<R> combineLatest(
ObservableSource<T1> source1,
ObservableSource<T2> source2,
BiFunction<? super T1, ? super T2, ? extends R> combiner
);
/**
* Zips items from multiple Observables together
*/
public static <T1, T2, R> Observable<R> zip(
ObservableSource<T1> source1,
ObservableSource<T2> source2,
BiFunction<? super T1, ? super T2, ? extends R> zipper
);
/**
* Returns the first Observable to emit or terminate
*/
public static <T> Observable<T> amb(ObservableSource<? extends T>... sources);
/**
* Prepends items to the beginning of an Observable
*/
public final Observable<T> startWith(T item);
public final Observable<T> startWith(T... items);
public final Observable<T> startWithArray(T... items);
/**
* Appends items to the end of an Observable
*/
public final Observable<T> concatWith(ObservableSource<? extends T> other);Control threading and execution context.
/**
* Specifies the Scheduler on which the source Observable will operate
*/
public final Observable<T> subscribeOn(Scheduler scheduler);
/**
* Specifies the Scheduler on which observers will be notified
*/
public final Observable<T> observeOn(Scheduler scheduler);
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError);
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize);Perform side effects without modifying the stream.
/**
* Invokes an action for each item emitted
*/
public final Observable<T> doOnNext(Consumer<? super T> onNext);
/**
* Invokes an action when an error is emitted
*/
public final Observable<T> doOnError(Consumer<? super Throwable> onError);
/**
* Invokes an action when the Observable completes normally
*/
public final Observable<T> doOnComplete(Action onComplete);
/**
* Invokes an action when a subscription occurs
*/
public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe);
/**
* Invokes an action when the Observable terminates (completes or errors)
*/
public final Observable<T> doOnTerminate(Action onTerminate);
/**
* Invokes an action after the Observable terminates
*/
public final Observable<T> doAfterTerminate(Action onFinally);
/**
* Invokes an action when the subscription is disposed
*/
public final Observable<T> doOnDispose(Action onDispose);Control timing and temporal behavior of streams.
/**
* Only emit items if no other item was emitted within a time window
*/
public final Observable<T> debounce(long timeout, TimeUnit unit);
public final Observable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler);
/**
* Emit only the first item in each time window
*/
public final Observable<T> throttleFirst(long windowDuration, TimeUnit unit);
public final Observable<T> throttleFirst(long windowDuration, TimeUnit unit, Scheduler scheduler);
/**
* Emit only the last item in each time window
*/
public final Observable<T> throttleLast(long intervalDuration, TimeUnit unit);
public final Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler);
/**
* Shift emissions forward in time by a specified delay
*/
public final Observable<T> delay(long delay, TimeUnit unit);
public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler);
/**
* Sample items periodically
*/
public final Observable<T> sample(long period, TimeUnit unit);
public final Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler);Utility operations for various purposes.
/**
* Mirror source Observable, but terminate with TimeoutException if no item is emitted within timeout
*/
public final Observable<T> timeout(long timeout, TimeUnit timeUnit);
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler);
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> fallback);
/**
* Cache emissions for replay to all future subscribers
*/
public final Observable<T> cache();
public final Observable<T> cache(int capacityHint);
/**
* Convert notifications into Notification objects
*/
public final Observable<Notification<T>> materialize();
/**
* Emit the specified value if source is empty
*/
public final Observable<T> defaultIfEmpty(T defaultItem);
/**
* Count the number of items emitted
*/
public final Single<Long> count();
/**
* Emit source items and a notification when source completes
*/
public final Observable<T> doFinally(Action onFinally);Subscribe to an Observable and consume emitted items.
/**
* Subscribes with separate callbacks for each event type
*/
public final Disposable subscribe();
public final Disposable subscribe(Consumer<? super T> onNext);
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete);
/**
* Subscribes with an Observer
*/
public final void subscribe(Observer<? super T> observer);
/**
* Blocking operations - use with caution
*/
public final T blockingFirst();
public final T blockingFirst(T defaultItem);
public final T blockingLast();
public final T blockingLast(T defaultItem);
public final T blockingSingle();
public final T blockingSingle(T defaultItem);
public final Iterable<T> blockingIterable();
public final void blockingSubscribe(Consumer<? super T> onNext);Basic Observable Creation and Subscription:
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
// Simple Observable
Observable<String> source = Observable.just("Hello", "World");
Disposable disposable = source.subscribe(
item -> System.out.println("Item: " + item),
error -> System.err.println("Error: " + error),
() -> System.out.println("Complete")
);
// Remember to dispose when done
disposable.dispose();Transformation Chain:
Observable.fromArray(1, 2, 3, 4, 5)
.filter(x -> x % 2 == 0) // Keep even numbers
.map(x -> x * x) // Square them
.subscribe(result -> System.out.println("Result: " + result));Async Operations with Threading:
import io.reactivex.schedulers.Schedulers;
Observable<String> asyncSource = Observable.fromCallable(() -> {
// Simulate expensive operation
Thread.sleep(1000);
return "Async Result";
}).subscribeOn(Schedulers.io()) // Execute on IO thread
.observeOn(Schedulers.computation()); // Observe on computation thread
asyncSource.subscribe(
result -> System.out.println("Got: " + result),
error -> error.printStackTrace()
);Creating Custom Observable:
Observable<Integer> customObservable = Observable.create(emitter -> {
try {
for (int i = 1; i <= 5; i++) {
if (emitter.isDisposed()) {
return;
}
emitter.onNext(i);
}
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
});Combining Multiple Observables:
Observable<String> obs1 = Observable.just("A", "B");
Observable<String> obs2 = Observable.just("1", "2");
Observable.zip(obs1, obs2, (s1, s2) -> s1 + s2)
.subscribe(combined -> System.out.println(combined)); // A1, B2Side Effects and Debugging:
Observable.fromArray(1, 2, 3, 4, 5)
.doOnNext(item -> System.out.println("Processing: " + item))
.filter(x -> x % 2 == 0)
.doOnNext(item -> System.out.println("After filter: " + item))
.map(x -> x * x)
.doOnComplete(() -> System.out.println("Stream completed"))
.subscribe(result -> System.out.println("Final result: " + result));Temporal Operations:
import java.util.concurrent.TimeUnit;
// Debounce - only emit if 300ms pass without another emission
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(10)
.debounce(300, TimeUnit.MILLISECONDS)
.subscribe(item -> System.out.println("Debounced: " + item));
// Delay emissions by 1 second
Observable.just("Delayed", "Message")
.delay(1, TimeUnit.SECONDS)
.subscribe(item -> System.out.println("Got: " + item));Utility Operations:
// Timeout and fallback
Observable.timer(2, TimeUnit.SECONDS)
.timeout(1, TimeUnit.SECONDS, Observable.just("Fallback"))
.subscribe(
result -> System.out.println("Result: " + result),
error -> System.out.println("Timeout occurred")
);
// Cache for replay
Observable<String> cached = Observable.fromCallable(() -> {
System.out.println("Expensive operation executed");
return "Expensive Result";
}).cache();
// Multiple subscriptions will reuse cached result
cached.subscribe(result -> System.out.println("First: " + result));
cached.subscribe(result -> System.out.println("Second: " + result));Install with Tessl CLI
npx tessl i tessl/maven-io-reactivex-rxjava2--rxjava