CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-reactivex-rxjava3--rxjava

Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

Pending
Overview
Eval results
Files

observable.mddocs/

Observable API

Observable is RxJava's implementation of the non-backpressured reactive stream pattern, designed for handling sequences of 0 to N items. It provides comprehensive operator support for transformation, filtering, combination, and error handling without built-in flow control.

Capabilities

Observable Creation

Factory methods for creating Observable instances from various sources.

// Required imports:
// import java.util.concurrent.Callable;
// import java.util.concurrent.Future;
// import java.util.concurrent.TimeUnit;
// import java.util.function.Consumer;
// import java.util.function.BiConsumer;
// import java.util.function.Supplier;
// import org.reactivestreams.Publisher;
/**
 * Creates an Observable that emits a single item
 * @param item the item to emit
 * @return Observable that emits the single item
 */
public static <T> Observable<T> just(T item);

/**
 * Creates an Observable that emits two items
 * @param item1 first item to emit
 * @param item2 second item to emit
 * @return Observable that emits the two items
 */
public static <T> Observable<T> just(T item1, T item2);

/**
 * Creates an Observable that emits three items
 * @param item1 first item to emit
 * @param item2 second item to emit
 * @param item3 third item to emit
 * @return Observable that emits the three items
 */
public static <T> Observable<T> just(T item1, T item2, T item3);

/**
 * Creates an Observable that emits up to ten items
 * @param items the items to emit (up to 10 items supported)
 * @return Observable that emits all provided items
 */
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10);

/**
 * Creates an Observable from an Iterable source
 * @param source the Iterable to convert
 * @return Observable that emits items from the Iterable
 */
public static <T> Observable<T> fromIterable(Iterable<? extends T> source);

/**
 * Creates an Observable from an array
 * @param array the array to convert
 * @return Observable that emits items from the array
 */
public static <T> Observable<T> fromArray(T... array);

/**
 * Creates an Observable using a custom emitter function
 * @param source the ObservableOnSubscribe function
 * @return Observable created from the custom emitter
 */
public static <T> Observable<T> create(ObservableOnSubscribe<T> source);

/**
 * Creates an Observable that emits sequential integers
 * @param start the starting value
 * @param count the number of items to emit
 * @return Observable emitting integers from start to start+count-1
 */
public static Observable<Integer> range(int start, int count);

/**
 * Creates an Observable that emits at specified intervals
 * @param period the emission interval
 * @param unit the time unit
 * @return Observable emitting sequential longs at intervals
 */
public static Observable<Long> interval(long period, TimeUnit unit);

/**
 * Creates an Observable that emits after a delay
 * @param delay the delay duration
 * @param unit the time unit
 * @return Observable that emits 0L after the delay
 */
public static Observable<Long> timer(long delay, TimeUnit unit);

/**
 * Creates an empty Observable that only calls onComplete
 * @return Observable that completes immediately
 */
public static <T> Observable<T> empty();

/**
 * Creates an Observable that never emits anything
 * @return Observable that never calls any observer methods
 */
public static <T> Observable<T> never();

/**
 * Creates an Observable that only calls onError
 * @param error the error to emit
 * @return Observable that emits the error
 */
public static <T> Observable<T> error(Throwable error);

/**
 * Creates an Observable from a Callable
 * @param callable the Callable to invoke for each subscription
 * @return Observable that emits the result of the Callable
 */
public static <T> Observable<T> fromCallable(Callable<? extends T> callable);

/**
 * Creates an Observable from a CompletableSource
 * @param completableSource the CompletableSource to convert
 * @return Observable that completes when the CompletableSource completes
 */
public static <T> Observable<T> fromCompletable(CompletableSource completableSource);

/**
 * Creates an Observable from a Future
 * @param future the Future to convert
 * @return Observable that emits the Future result
 */
public static <T> Observable<T> fromFuture(Future<? extends T> future);

/**
 * Creates an Observable from a MaybeSource
 * @param maybe the MaybeSource to convert
 * @return Observable that emits the Maybe result or completes
 */
public static <T> Observable<T> fromMaybe(MaybeSource<T> maybe);

/**
 * Creates an Observable from a Publisher (Reactive Streams)
 * @param publisher the Publisher to convert
 * @return Observable that emits items from the Publisher
 */
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher);

/**
 * Creates an Observable from a Runnable
 * @param run the Runnable to execute
 * @return Observable that completes after running the Runnable
 */
public static <T> Observable<T> fromRunnable(Runnable run);

/**
 * Creates an Observable from a SingleSource
 * @param source the SingleSource to convert
 * @return Observable that emits the Single result
 */
public static <T> Observable<T> fromSingle(SingleSource<T> source);

/**
 * Creates an Observable from a Supplier
 * @param supplier the Supplier to invoke for each subscription
 * @return Observable that emits the result of the Supplier
 */
public static <T> Observable<T> fromSupplier(Supplier<? extends T> supplier);

/**
 * Creates an Observable that defers creation until subscription
 * @param supplier function that returns an ObservableSource
 * @return Observable that creates the actual source on subscription
 */
public static <T> Observable<T> defer(Supplier<? extends ObservableSource<? extends T>> supplier);

/**
 * Creates an Observable that emits sequential long values in a range
 * @param start the starting value
 * @param count the number of items to emit
 * @return Observable emitting longs from start to start+count-1
 */
public static Observable<Long> rangeLong(long start, long count);

/**
 * Creates an Observable that emits at intervals starting after an initial delay
 * @param initialDelay the initial delay before the first emission
 * @param period the emission interval
 * @param unit the time unit
 * @return Observable emitting sequential longs at intervals
 */
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit);

/**
 * Creates an Observable that emits sequential longs over time in a range
 * @param start the starting value
 * @param count the number of items to emit
 * @param initialDelay the initial delay before the first emission
 * @param period the emission interval
 * @param unit the time unit
 * @return Observable emitting sequential longs in range over time
 */
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit);

/**
 * Mirrors the first Observable to emit or complete from multiple sources
 * @param sources the Iterable of ObservableSource instances
 * @return Observable that mirrors the first source to emit
 */
public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources);

/**
 * Mirrors the first Observable to emit or complete from multiple sources
 * @param sources the array of ObservableSource instances
 * @return Observable that mirrors the first source to emit
 */
public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources);

/**
 * Generates values on demand using a generator function
 * @param generator the generator function that emits values
 * @return Observable that generates values on demand
 */
public static <T> Observable<T> generate(Consumer<Emitter<T>> generator);

/**
 * Generates values on demand with state
 * @param initialState the initial state supplier
 * @param generator the generator function with state
 * @return Observable that generates values with state
 */
public static <T, S> Observable<T> generate(Supplier<S> initialState, BiConsumer<S, Emitter<T>> generator);

/**
 * Compares two Observable sequences for equality
 * @param source1 the first Observable sequence
 * @param source2 the second Observable sequence
 * @return Single that emits true if sequences are equal
 */
public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2);

/**
 * Switches to new inner Observables as they arrive
 * @param sources Observable of Observable sources
 * @return Observable that switches to the latest inner Observable
 */
public static <T> Observable<T> switchOnNext(ObservableSource<? extends ObservableSource<? extends T>> sources);

/**
 * Creates an Observable with resource management
 * @param resourceSupplier supplier for the resource
 * @param sourceSupplier function that creates the Observable from the resource
 * @param resourceCleanup function to clean up the resource
 * @return Observable with automatic resource management
 */
public static <T, D> Observable<T> using(
    Supplier<? extends D> resourceSupplier,
    Function<? super D, ? extends ObservableSource<? extends T>> sourceSupplier,
    Consumer<? super D> resourceCleanup
);

/**
 * Wraps an ObservableSource to make it an Observable
 * @param source the ObservableSource to wrap
 * @return Observable wrapping the source
 */
public static <T> Observable<T> wrap(ObservableSource<T> source);

Java 8+ Integration

Factory methods for integrating with Java 8+ features.

// Required imports:
// import java.util.Optional;
// import java.util.concurrent.CompletionStage; 
// import java.util.stream.Stream;
/**
 * Creates an Observable from an Optional (Java 8+)
 * @param optional the Optional to convert
 * @return Observable that emits the Optional value or completes if empty
 */
public static <T> Observable<T> fromOptional(Optional<T> optional);

/**
 * Creates an Observable from a CompletionStage (Java 8+)
 * @param stage the CompletionStage to convert
 * @return Observable that emits the CompletionStage result
 */
public static <T> Observable<T> fromCompletionStage(CompletionStage<T> stage);

/**
 * Creates an Observable from a Stream (Java 8+)
 * @param stream the Stream to convert
 * @return Observable that emits the Stream elements
 */
public static <T> Observable<T> fromStream(Stream<T> stream);

Transformation Operators

Transform emitted items using various mapping functions.

/**
 * Transform items using a mapping function
 * @param mapper function to transform each item
 * @return Observable with transformed items
 */
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper);

/**
 * Transform items to Observables and flatten the results
 * @param mapper function returning Observable for each item
 * @return Observable with flattened results
 */
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);

/**
 * Transform items to Observables and concatenate them in order
 * @param mapper function returning Observable for each item
 * @return Observable with concatenated results in order
 */
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);

/**
 * Transform items to Singles and merge the results
 * @param mapper function returning Single for each item
 * @return Observable with Single results
 */
public final <R> Observable<R> flatMapSingle(Function<? super T, ? extends SingleSource<? extends R>> mapper);

/**
 * Emit only items that pass a predicate test
 * @param predicate function to test each item
 * @return Observable with filtered items
 */
public final Observable<T> filter(Predicate<? super T> predicate);

/**
 * Emit only distinct items compared to previous emissions
 * @return Observable with distinct consecutive items
 */
public final Observable<T> distinctUntilChanged();

/**
 * Skip the first n items
 * @param count number of items to skip
 * @return Observable skipping the first count items
 */
public final Observable<T> skip(long count);

/**
 * Take only the first n items
 * @param count number of items to take
 * @return Observable emitting only the first count items
 */
public final Observable<T> take(long count);

Combination Operators

Combine multiple Observable sources.

/**
 * Merge multiple Observables into one
 * @param sources array of Observable sources
 * @return Observable merging all source emissions
 */
public static <T> Observable<T> merge(ObservableSource<? extends T>... sources);

/**
 * Concatenate multiple Observables in sequence
 * @param sources array of Observable sources
 * @return Observable concatenating all sources in order
 */
public static <T> Observable<T> concat(ObservableSource<? extends T>... sources);

/**
 * Combine the latest values from multiple sources
 * @param source1 first Observable source
 * @param source2 second Observable source
 * @param combiner function to combine the latest values
 * @return Observable emitting combined values
 */
public static <T1, T2, R> Observable<R> combineLatest(
    ObservableSource<T1> source1,
    ObservableSource<T2> source2,
    BiFunction<? super T1, ? super T2, ? extends R> combiner
);

/**
 * Zip multiple sources together
 * @param source1 first Observable source
 * @param source2 second Observable source
 * @param zipper function to zip values together
 * @return Observable emitting zipped values
 */
public static <T1, T2, R> Observable<R> zip(
    ObservableSource<T1> source1,
    ObservableSource<T2> source2,
    BiFunction<? super T1, ? super T2, ? extends R> zipper
);

/**
 * Start with additional items before the source emissions
 * @param items items to emit first
 * @return Observable starting with the specified items
 */
public final Observable<T> startWith(T... items);

/**
 * Concatenate with another Observable
 * @param other Observable to concatenate after this one
 * @return Observable concatenating this and other
 */
public final Observable<T> concatWith(ObservableSource<? extends T> other);

Subscription and Scheduling

Control when and how Observable emissions are observed.

/**
 * Subscribe with a simple onNext callback
 * @param onNext function called for each emitted item
 * @return Disposable for managing the subscription
 */
public final Disposable subscribe(Consumer<? super T> onNext);

/**
 * Subscribe with onNext and onError callbacks
 * @param onNext function called for each emitted item
 * @param onError function called on error
 * @return Disposable for managing the subscription
 */
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);

/**
 * Subscribe with full Observer interface
 * @param observer the Observer to receive emissions
 */
public final void subscribe(Observer<? super T> observer);

/**
 * Specify the Scheduler for subscription (source) operations
 * @param scheduler the Scheduler to use for subscriptions
 * @return Observable operating on the specified scheduler
 */
public final Observable<T> subscribeOn(Scheduler scheduler);

/**
 * Specify the Scheduler for observation (downstream) operations
 * @param scheduler the Scheduler to use for observations
 * @return Observable observing on the specified scheduler
 */
public final Observable<T> observeOn(Scheduler scheduler);

Error Handling

Manage errors in the Observable stream.

/**
 * Return a default item when an error occurs
 * @param defaultItem the item to emit on error
 * @return Observable that emits defaultItem on error
 */
public final Observable<T> onErrorReturn(T defaultItem);

/**
 * Resume with another Observable when an error occurs
 * @param resumeSource Observable to switch to on error
 * @return Observable that switches to resumeSource on error
 */
public final Observable<T> onErrorResumeNext(ObservableSource<? extends T> resumeSource);

/**
 * Retry the subscription when an error occurs
 * @param times maximum number of retry attempts
 * @return Observable that retries up to the specified times
 */
public final Observable<T> retry(long times);

/**
 * Perform side-effect action when an error occurs
 * @param onError action to perform on error
 * @return Observable that performs the action on error
 */
public final Observable<T> doOnError(Consumer<? super Throwable> onError);

Side Effects

Perform side-effect actions without modifying the stream.

/**
 * Perform an action for each emitted item
 * @param onNext action to perform for each item
 * @return Observable that performs the action for each item
 */
public final Observable<T> doOnNext(Consumer<? super T> onNext);

/**
 * Perform an action when the Observable completes
 * @param onComplete action to perform on completion
 * @return Observable that performs the action on completion
 */
public final Observable<T> doOnComplete(Action onComplete);

/**
 * Perform an action when subscription occurs
 * @param onSubscribe action to perform on subscription
 * @return Observable that performs the action on subscription
 */
public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe);

/**
 * Perform an action when disposal occurs
 * @param onDispose action to perform on disposal
 * @return Observable that performs the action on disposal
 */
public final Observable<T> doOnDispose(Action onDispose);

Types

/**
 * Interface for creating custom Observable sources
 */
public interface ObservableOnSubscribe<T> {
    void subscribe(ObservableEmitter<T> emitter) throws Throwable;
}

/**
 * Emitter interface for custom Observable creation
 */
public interface ObservableEmitter<T> extends Emitter<T> {
    void onNext(T value);
    void onError(Throwable error);
    void onComplete();
    boolean isDisposed();
}

/**
 * Base interface for Observable sources
 */
public interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);
}

Usage Examples:

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;

// Basic Observable creation and subscription
Observable.just("Hello", "World")
    .subscribe(System.out::println);

// Complex transformation chain
Observable.range(1, 10)
    .filter(x -> x % 2 == 0)
    .map(x -> x * x)
    .take(3)
    .subscribe(System.out::println);

// Async operations with scheduling
Observable.fromCallable(() -> {
        Thread.sleep(1000);
        return "Computed result";
    })
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.single())
    .subscribe(
        result -> System.out.println("Result: " + result),
        error -> System.err.println("Error: " + error)
    );

// Error handling
Observable.just(1, 2, 0, 4)
    .map(x -> 10 / x)
    .onErrorReturn(-1)
    .subscribe(System.out::println);

Install with Tessl CLI

npx tessl i tessl/maven-io-reactivex-rxjava3--rxjava

docs

completable.md

disposables.md

flowable.md

index.md

maybe.md

observable.md

schedulers.md

single.md

subjects.md

tile.json