Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
—
Single is RxJava's reactive type for operations that emit exactly one item or an error. It's ideal for single-value async operations like network requests, database queries, or computations that produce one result.
Factory methods for creating Single instances.
/**
* Creates a Single that emits a single item
* @param item the item to emit
* @return Single that emits the single item
*/
public static <T> Single<T> just(T item);
/**
* Creates a Single from a callable, executed lazily
* @param callable the Callable to execute
* @return Single that emits the callable result
*/
public static <T> Single<T> fromCallable(Callable<? extends T> callable);
/**
* Creates a Single using a custom emitter function
* @param source the SingleOnSubscribe function
* @return Single created from the custom emitter
*/
public static <T> Single<T> create(SingleOnSubscribe<T> source);
/**
* Creates a Single that emits after a delay
* @param delay the delay duration
* @param unit the time unit
* @return Single that emits 0L after the delay
*/
public static Single<Long> timer(long delay, TimeUnit unit);
/**
* Creates a Single that only calls onError
* @param error the error to emit
* @return Single that emits the error
*/
public static <T> Single<T> error(Throwable error);
/**
* Creates a Single that defers creation until subscription
* @param singleSupplier supplier of Single to defer
* @return Single that defers to supplied Single
*/
public static <T> Single<T> defer(Supplier<? extends SingleSource<? extends T>> singleSupplier);
/**
* Creates a Single from a Future
* @param future the Future to convert
* @return Single that emits the Future result
*/
public static <T> Single<T> fromFuture(Future<? extends T> future);
/**
* Creates a Single from an Observable source (takes first item)
* @param source the ObservableSource to convert
* @return Single with the first emitted item
*/
public static <T> Single<T> fromObservable(ObservableSource<T> source);Transform the emitted item or chain with other Singles.
/**
* Transform the item using a mapping function
* @param mapper function to transform the item
* @return Single with transformed item
*/
public final <R> Single<R> map(Function<? super T, ? extends R> mapper);
/**
* Transform the item to another Single and flatten the result
* @param mapper function returning Single for the item
* @return Single from the flattened result
*/
public final <R> Single<R> flatMap(Function<? super T, ? extends SingleSource<? extends R>> mapper);
/**
* Transform the item to an Observable
* @param mapper function returning ObservableSource for the item
* @return Observable from the mapped result
*/
public final <R> Observable<R> flatMapObservable(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
/**
* Transform the item to a Flowable
* @param mapper function returning Publisher for the item
* @return Flowable from the mapped result
*/
public final <R> Flowable<R> flatMapPublisher(Function<? super T, ? extends Publisher<? extends R>> mapper);
/**
* Transform the item to a Maybe
* @param mapper function returning MaybeSource for the item
* @return Maybe from the mapped result
*/
public final <R> Maybe<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper);
/**
* Transform the item to a Completable
* @param mapper function returning CompletableSource for the item
* @return Completable from the mapped result
*/
public final Completable flatMapCompletable(Function<? super T, ? extends CompletableSource> mapper);
/**
* Cast the emitted item to a different type
* @param clazz the target class to cast to
* @return Single with cast item
*/
public final <U> Single<U> cast(Class<U> clazz);Combine multiple Single sources.
/**
* Zip two Singles together
* @param other the other Single to zip with
* @param zipper function to combine the two items
* @return Single emitting the zipped result
*/
public final <U, R> Single<R> zipWith(SingleSource<? extends U> other, BiFunction<? super T, ? super U, ? extends R> zipper);
/**
* Merge multiple Singles (emit items as they arrive)
* @param sources array of Single sources
* @return Observable merging all Single emissions
*/
public static <T> Observable<T> merge(SingleSource<? extends T>... sources);
/**
* Concatenate multiple Singles in sequence
* @param sources array of Single sources
* @return Observable concatenating all Singles
*/
public static <T> Observable<T> concat(SingleSource<? extends T>... sources);
/**
* Concatenate this Single with another
* @param other Single to concatenate after this one
* @return Observable concatenating both Singles
*/
public final Observable<T> concatWith(SingleSource<? extends T> other);
/**
* Merge this Single with another (race condition)
* @param other Single to merge with this one
* @return Single emitting the first result
*/
public final Single<T> ambWith(SingleSource<? extends T> other);Control subscription behavior and execution context.
/**
* Subscribe with a simple onSuccess callback
* @param onSuccess function called when item is emitted
* @return Disposable for managing the subscription
*/
public final Disposable subscribe(Consumer<? super T> onSuccess);
/**
* Subscribe with onSuccess and onError callbacks
* @param onSuccess function called when item is emitted
* @param onError function called on error
* @return Disposable for managing the subscription
*/
public final Disposable subscribe(Consumer<? super T> onSuccess, Consumer<? super Throwable> onError);
/**
* Subscribe with full SingleObserver interface
* @param observer the SingleObserver to receive emissions
*/
public final void subscribe(SingleObserver<? super T> observer);
/**
* Subscribe and block until completion, returning the item
* @return the emitted item
*/
public final T blockingGet();
/**
* Specify the Scheduler for subscription operations
* @param scheduler the Scheduler to use for subscriptions
* @return Single operating on the specified scheduler
*/
public final Single<T> subscribeOn(Scheduler scheduler);
/**
* Specify the Scheduler for observation operations
* @param scheduler the Scheduler to use for observations
* @return Single observing on the specified scheduler
*/
public final Single<T> observeOn(Scheduler scheduler);
/**
* Add a delay before emitting the item
* @param delay the delay duration
* @param unit the time unit
* @return Single that emits after the delay
*/
public final Single<T> delay(long delay, TimeUnit unit);Handle errors in the Single stream.
/**
* Return a default item when an error occurs
* @param defaultItem the item to emit on error
* @return Single that emits defaultItem on error
*/
public final Single<T> onErrorReturn(T defaultItem);
/**
* Return a default item using a function when an error occurs
* @param resumeFunction function to generate default item from error
* @return Single that emits result of resumeFunction on error
*/
public final Single<T> onErrorReturn(Function<? super Throwable, ? extends T> resumeFunction);
/**
* Resume with another Single when an error occurs
* @param resumeSource Single to switch to on error
* @return Single that switches to resumeSource on error
*/
public final Single<T> onErrorResumeNext(SingleSource<? extends T> resumeSource);
/**
* Retry the subscription when an error occurs
* @param times maximum number of retry attempts
* @return Single that retries up to the specified times
*/
public final Single<T> retry(long times);
/**
* Perform side-effect action when an error occurs
* @param onError action to perform on error
* @return Single that performs the action on error
*/
public final Single<T> doOnError(Consumer<? super Throwable> onError);Perform side-effect actions without modifying the stream.
/**
* Perform an action when the item is emitted
* @param onSuccess action to perform when item is emitted
* @return Single that performs the action on success
*/
public final Single<T> doOnSuccess(Consumer<? super T> onSuccess);
/**
* Perform an action when subscription occurs
* @param onSubscribe action to perform on subscription
* @return Single that performs the action on subscription
*/
public final Single<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe);
/**
* Perform an action when disposal occurs
* @param onDispose action to perform on disposal
* @return Single that performs the action on disposal
*/
public final Single<T> doOnDispose(Action onDispose);
/**
* Perform actions on lifecycle events
* @param onSubscribe called on subscription
* @param onSuccess called on successful emission
* @param onError called on error
* @param onDispose called on disposal
* @return Single that performs actions on lifecycle events
*/
public final Single<T> doOnEvent(Consumer<? super Disposable> onSubscribe, BiConsumer<? super T, ? super Throwable> onEvent);Convert to other reactive types.
/**
* Convert to Observable
* @return Observable that emits the Single item
*/
public final Observable<T> toObservable();
/**
* Convert to Flowable
* @return Flowable that emits the Single item
*/
public final Flowable<T> toFlowable();
/**
* Convert to Maybe
* @return Maybe that emits the Single item
*/
public final Maybe<T> toMaybe();
/**
* Convert to Completable (ignores the item)
* @return Completable that signals completion or error
*/
public final Completable ignoreElement();/**
* Interface for creating custom Single sources
*/
public interface SingleOnSubscribe<T> {
void subscribe(SingleEmitter<T> emitter) throws Throwable;
}
/**
* Emitter interface for custom Single creation
*/
public interface SingleEmitter<T> {
void onSuccess(T value);
void onError(Throwable error);
boolean isDisposed();
}
/**
* Observer interface for Single
*/
public interface SingleObserver<T> extends Observer<T> {
void onSubscribe(Disposable d);
void onSuccess(T t);
void onError(Throwable e);
}
/**
* Base interface for Single sources
*/
public interface SingleSource<T> {
void subscribe(SingleObserver<? super T> observer);
}Usage Examples:
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
// Basic Single creation and subscription
Single.just("Hello World")
.subscribe(System.out::println);
// Async computation
Single.fromCallable(() -> {
Thread.sleep(1000);
return "Computed result";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(
result -> System.out.println("Result: " + result),
error -> System.err.println("Error: " + error)
);
// Chaining Singles
Single.just(5)
.map(x -> x * x)
.flatMap(x -> Single.just(x + 10))
.subscribe(result -> System.out.println("Final: " + result));
// Error handling
Single.fromCallable(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Random error");
}
return "Success";
})
.onErrorReturn("Default value")
.subscribe(System.out::println);
// Combining Singles
Single<String> single1 = Single.just("Hello");
Single<String> single2 = Single.just("World");
single1.zipWith(single2, (s1, s2) -> s1 + " " + s2)
.subscribe(System.out::println);
// Timeout handling
Single.timer(2, TimeUnit.SECONDS)
.map(tick -> "Delayed result")
.timeout(1, TimeUnit.SECONDS)
.onErrorReturn("Timeout occurred")
.subscribe(System.out::println);Install with Tessl CLI
npx tessl i tessl/maven-io-reactivex-rxjava3--rxjava