Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
—
Observable is RxJava's implementation of the non-backpressured reactive stream pattern, designed for handling sequences of 0 to N items. It provides comprehensive operator support for transformation, filtering, combination, and error handling without built-in flow control.
Factory methods for creating Observable instances from various sources.
// Required imports:
// import java.util.concurrent.Callable;
// import java.util.concurrent.Future;
// import java.util.concurrent.TimeUnit;
// import java.util.function.Consumer;
// import java.util.function.BiConsumer;
// import java.util.function.Supplier;
// import org.reactivestreams.Publisher;
/**
* Creates an Observable that emits a single item
* @param item the item to emit
* @return Observable that emits the single item
*/
public static <T> Observable<T> just(T item);
/**
* Creates an Observable that emits two items
* @param item1 first item to emit
* @param item2 second item to emit
* @return Observable that emits the two items
*/
public static <T> Observable<T> just(T item1, T item2);
/**
* Creates an Observable that emits three items
* @param item1 first item to emit
* @param item2 second item to emit
* @param item3 third item to emit
* @return Observable that emits the three items
*/
public static <T> Observable<T> just(T item1, T item2, T item3);
/**
* Creates an Observable that emits up to ten items
* @param items the items to emit (up to 10 items supported)
* @return Observable that emits all provided items
*/
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10);
/**
* Creates an Observable from an Iterable source
* @param source the Iterable to convert
* @return Observable that emits items from the Iterable
*/
public static <T> Observable<T> fromIterable(Iterable<? extends T> source);
/**
* Creates an Observable from an array
* @param array the array to convert
* @return Observable that emits items from the array
*/
public static <T> Observable<T> fromArray(T... array);
/**
* Creates an Observable using a custom emitter function
* @param source the ObservableOnSubscribe function
* @return Observable created from the custom emitter
*/
public static <T> Observable<T> create(ObservableOnSubscribe<T> source);
/**
* Creates an Observable that emits sequential integers
* @param start the starting value
* @param count the number of items to emit
* @return Observable emitting integers from start to start+count-1
*/
public static Observable<Integer> range(int start, int count);
/**
* Creates an Observable that emits at specified intervals
* @param period the emission interval
* @param unit the time unit
* @return Observable emitting sequential longs at intervals
*/
public static Observable<Long> interval(long period, TimeUnit unit);
/**
* Creates an Observable that emits after a delay
* @param delay the delay duration
* @param unit the time unit
* @return Observable that emits 0L after the delay
*/
public static Observable<Long> timer(long delay, TimeUnit unit);
/**
* Creates an empty Observable that only calls onComplete
* @return Observable that completes immediately
*/
public static <T> Observable<T> empty();
/**
* Creates an Observable that never emits anything
* @return Observable that never calls any observer methods
*/
public static <T> Observable<T> never();
/**
* Creates an Observable that only calls onError
* @param error the error to emit
* @return Observable that emits the error
*/
public static <T> Observable<T> error(Throwable error);
/**
* Creates an Observable from a Callable
* @param callable the Callable to invoke for each subscription
* @return Observable that emits the result of the Callable
*/
public static <T> Observable<T> fromCallable(Callable<? extends T> callable);
/**
* Creates an Observable from a CompletableSource
* @param completableSource the CompletableSource to convert
* @return Observable that completes when the CompletableSource completes
*/
public static <T> Observable<T> fromCompletable(CompletableSource completableSource);
/**
* Creates an Observable from a Future
* @param future the Future to convert
* @return Observable that emits the Future result
*/
public static <T> Observable<T> fromFuture(Future<? extends T> future);
/**
* Creates an Observable from a MaybeSource
* @param maybe the MaybeSource to convert
* @return Observable that emits the Maybe result or completes
*/
public static <T> Observable<T> fromMaybe(MaybeSource<T> maybe);
/**
* Creates an Observable from a Publisher (Reactive Streams)
* @param publisher the Publisher to convert
* @return Observable that emits items from the Publisher
*/
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher);
/**
* Creates an Observable from a Runnable
* @param run the Runnable to execute
* @return Observable that completes after running the Runnable
*/
public static <T> Observable<T> fromRunnable(Runnable run);
/**
* Creates an Observable from a SingleSource
* @param source the SingleSource to convert
* @return Observable that emits the Single result
*/
public static <T> Observable<T> fromSingle(SingleSource<T> source);
/**
* Creates an Observable from a Supplier
* @param supplier the Supplier to invoke for each subscription
* @return Observable that emits the result of the Supplier
*/
public static <T> Observable<T> fromSupplier(Supplier<? extends T> supplier);
/**
* Creates an Observable that defers creation until subscription
* @param supplier function that returns an ObservableSource
* @return Observable that creates the actual source on subscription
*/
public static <T> Observable<T> defer(Supplier<? extends ObservableSource<? extends T>> supplier);
/**
* Creates an Observable that emits sequential long values in a range
* @param start the starting value
* @param count the number of items to emit
* @return Observable emitting longs from start to start+count-1
*/
public static Observable<Long> rangeLong(long start, long count);
/**
* Creates an Observable that emits at intervals starting after an initial delay
* @param initialDelay the initial delay before the first emission
* @param period the emission interval
* @param unit the time unit
* @return Observable emitting sequential longs at intervals
*/
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit);
/**
* Creates an Observable that emits sequential longs over time in a range
* @param start the starting value
* @param count the number of items to emit
* @param initialDelay the initial delay before the first emission
* @param period the emission interval
* @param unit the time unit
* @return Observable emitting sequential longs in range over time
*/
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit);
/**
* Mirrors the first Observable to emit or complete from multiple sources
* @param sources the Iterable of ObservableSource instances
* @return Observable that mirrors the first source to emit
*/
public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources);
/**
* Mirrors the first Observable to emit or complete from multiple sources
* @param sources the array of ObservableSource instances
* @return Observable that mirrors the first source to emit
*/
public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources);
/**
* Generates values on demand using a generator function
* @param generator the generator function that emits values
* @return Observable that generates values on demand
*/
public static <T> Observable<T> generate(Consumer<Emitter<T>> generator);
/**
* Generates values on demand with state
* @param initialState the initial state supplier
* @param generator the generator function with state
* @return Observable that generates values with state
*/
public static <T, S> Observable<T> generate(Supplier<S> initialState, BiConsumer<S, Emitter<T>> generator);
/**
* Compares two Observable sequences for equality
* @param source1 the first Observable sequence
* @param source2 the second Observable sequence
* @return Single that emits true if sequences are equal
*/
public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2);
/**
* Switches to new inner Observables as they arrive
* @param sources Observable of Observable sources
* @return Observable that switches to the latest inner Observable
*/
public static <T> Observable<T> switchOnNext(ObservableSource<? extends ObservableSource<? extends T>> sources);
/**
* Creates an Observable with resource management
* @param resourceSupplier supplier for the resource
* @param sourceSupplier function that creates the Observable from the resource
* @param resourceCleanup function to clean up the resource
* @return Observable with automatic resource management
*/
public static <T, D> Observable<T> using(
Supplier<? extends D> resourceSupplier,
Function<? super D, ? extends ObservableSource<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup
);
/**
* Wraps an ObservableSource to make it an Observable
* @param source the ObservableSource to wrap
* @return Observable wrapping the source
*/
public static <T> Observable<T> wrap(ObservableSource<T> source);Factory methods for integrating with Java 8+ features.
// Required imports:
// import java.util.Optional;
// import java.util.concurrent.CompletionStage;
// import java.util.stream.Stream;
/**
* Creates an Observable from an Optional (Java 8+)
* @param optional the Optional to convert
* @return Observable that emits the Optional value or completes if empty
*/
public static <T> Observable<T> fromOptional(Optional<T> optional);
/**
* Creates an Observable from a CompletionStage (Java 8+)
* @param stage the CompletionStage to convert
* @return Observable that emits the CompletionStage result
*/
public static <T> Observable<T> fromCompletionStage(CompletionStage<T> stage);
/**
* Creates an Observable from a Stream (Java 8+)
* @param stream the Stream to convert
* @return Observable that emits the Stream elements
*/
public static <T> Observable<T> fromStream(Stream<T> stream);Transform emitted items using various mapping functions.
/**
* Transform items using a mapping function
* @param mapper function to transform each item
* @return Observable with transformed items
*/
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper);
/**
* Transform items to Observables and flatten the results
* @param mapper function returning Observable for each item
* @return Observable with flattened results
*/
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
/**
* Transform items to Observables and concatenate them in order
* @param mapper function returning Observable for each item
* @return Observable with concatenated results in order
*/
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
/**
* Transform items to Singles and merge the results
* @param mapper function returning Single for each item
* @return Observable with Single results
*/
public final <R> Observable<R> flatMapSingle(Function<? super T, ? extends SingleSource<? extends R>> mapper);
/**
* Emit only items that pass a predicate test
* @param predicate function to test each item
* @return Observable with filtered items
*/
public final Observable<T> filter(Predicate<? super T> predicate);
/**
* Emit only distinct items compared to previous emissions
* @return Observable with distinct consecutive items
*/
public final Observable<T> distinctUntilChanged();
/**
* Skip the first n items
* @param count number of items to skip
* @return Observable skipping the first count items
*/
public final Observable<T> skip(long count);
/**
* Take only the first n items
* @param count number of items to take
* @return Observable emitting only the first count items
*/
public final Observable<T> take(long count);Combine multiple Observable sources.
/**
* Merge multiple Observables into one
* @param sources array of Observable sources
* @return Observable merging all source emissions
*/
public static <T> Observable<T> merge(ObservableSource<? extends T>... sources);
/**
* Concatenate multiple Observables in sequence
* @param sources array of Observable sources
* @return Observable concatenating all sources in order
*/
public static <T> Observable<T> concat(ObservableSource<? extends T>... sources);
/**
* Combine the latest values from multiple sources
* @param source1 first Observable source
* @param source2 second Observable source
* @param combiner function to combine the latest values
* @return Observable emitting combined values
*/
public static <T1, T2, R> Observable<R> combineLatest(
ObservableSource<T1> source1,
ObservableSource<T2> source2,
BiFunction<? super T1, ? super T2, ? extends R> combiner
);
/**
* Zip multiple sources together
* @param source1 first Observable source
* @param source2 second Observable source
* @param zipper function to zip values together
* @return Observable emitting zipped values
*/
public static <T1, T2, R> Observable<R> zip(
ObservableSource<T1> source1,
ObservableSource<T2> source2,
BiFunction<? super T1, ? super T2, ? extends R> zipper
);
/**
* Start with additional items before the source emissions
* @param items items to emit first
* @return Observable starting with the specified items
*/
public final Observable<T> startWith(T... items);
/**
* Concatenate with another Observable
* @param other Observable to concatenate after this one
* @return Observable concatenating this and other
*/
public final Observable<T> concatWith(ObservableSource<? extends T> other);Control when and how Observable emissions are observed.
/**
* Subscribe with a simple onNext callback
* @param onNext function called for each emitted item
* @return Disposable for managing the subscription
*/
public final Disposable subscribe(Consumer<? super T> onNext);
/**
* Subscribe with onNext and onError callbacks
* @param onNext function called for each emitted item
* @param onError function called on error
* @return Disposable for managing the subscription
*/
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);
/**
* Subscribe with full Observer interface
* @param observer the Observer to receive emissions
*/
public final void subscribe(Observer<? super T> observer);
/**
* Specify the Scheduler for subscription (source) operations
* @param scheduler the Scheduler to use for subscriptions
* @return Observable operating on the specified scheduler
*/
public final Observable<T> subscribeOn(Scheduler scheduler);
/**
* Specify the Scheduler for observation (downstream) operations
* @param scheduler the Scheduler to use for observations
* @return Observable observing on the specified scheduler
*/
public final Observable<T> observeOn(Scheduler scheduler);Manage errors in the Observable stream.
/**
* Return a default item when an error occurs
* @param defaultItem the item to emit on error
* @return Observable that emits defaultItem on error
*/
public final Observable<T> onErrorReturn(T defaultItem);
/**
* Resume with another Observable when an error occurs
* @param resumeSource Observable to switch to on error
* @return Observable that switches to resumeSource on error
*/
public final Observable<T> onErrorResumeNext(ObservableSource<? extends T> resumeSource);
/**
* Retry the subscription when an error occurs
* @param times maximum number of retry attempts
* @return Observable that retries up to the specified times
*/
public final Observable<T> retry(long times);
/**
* Perform side-effect action when an error occurs
* @param onError action to perform on error
* @return Observable that performs the action on error
*/
public final Observable<T> doOnError(Consumer<? super Throwable> onError);Perform side-effect actions without modifying the stream.
/**
* Perform an action for each emitted item
* @param onNext action to perform for each item
* @return Observable that performs the action for each item
*/
public final Observable<T> doOnNext(Consumer<? super T> onNext);
/**
* Perform an action when the Observable completes
* @param onComplete action to perform on completion
* @return Observable that performs the action on completion
*/
public final Observable<T> doOnComplete(Action onComplete);
/**
* Perform an action when subscription occurs
* @param onSubscribe action to perform on subscription
* @return Observable that performs the action on subscription
*/
public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe);
/**
* Perform an action when disposal occurs
* @param onDispose action to perform on disposal
* @return Observable that performs the action on disposal
*/
public final Observable<T> doOnDispose(Action onDispose);/**
* Interface for creating custom Observable sources
*/
public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> emitter) throws Throwable;
}
/**
* Emitter interface for custom Observable creation
*/
public interface ObservableEmitter<T> extends Emitter<T> {
void onNext(T value);
void onError(Throwable error);
void onComplete();
boolean isDisposed();
}
/**
* Base interface for Observable sources
*/
public interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}Usage Examples:
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
// Basic Observable creation and subscription
Observable.just("Hello", "World")
.subscribe(System.out::println);
// Complex transformation chain
Observable.range(1, 10)
.filter(x -> x % 2 == 0)
.map(x -> x * x)
.take(3)
.subscribe(System.out::println);
// Async operations with scheduling
Observable.fromCallable(() -> {
Thread.sleep(1000);
return "Computed result";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(
result -> System.out.println("Result: " + result),
error -> System.err.println("Error: " + error)
);
// Error handling
Observable.just(1, 2, 0, 4)
.map(x -> 10 / x)
.onErrorReturn(-1)
.subscribe(System.out::println);Install with Tessl CLI
npx tessl i tessl/maven-io-reactivex-rxjava3--rxjava