RxJava: Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
—
Reactive type that emits exactly one value or an error. Single is perfect for async operations that return a single result like HTTP requests, database queries, or any operation that produces one outcome.
Factory methods for creating Single instances.
/**
* Creates a Single that emits the provided item then completes
*/
public static <T> Single<T> just(T item);
/**
* Creates a Single from a Callable that will be called for each observer
*/
public static <T> Single<T> fromCallable(Callable<? extends T> callable);
/**
* Creates a Single from a Future
*/
public static <T> Single<T> fromFuture(Future<? extends T> future);
/**
* Creates a Single using the provided SingleOnSubscribe function
*/
public static <T> Single<T> create(SingleOnSubscribe<T> source);
/**
* Creates a Single that emits after a delay
*/
public static Single<Long> timer(long delay, TimeUnit unit);
public static Single<Long> timer(long delay, TimeUnit unit, Scheduler scheduler);
/**
* Creates a Single that only calls onError
*/
public static <T> Single<T> error(Throwable exception);
public static <T> Single<T> error(Callable<? extends Throwable> errorSupplier);
/**
* Creates a Single that never emits
*/
public static <T> Single<T> never();
/**
* Defers Single creation until subscription
*/
public static <T> Single<T> defer(Callable<? extends SingleSource<? extends T>> singleSupplier);
/**
* Creates a Single from an ObservableSource that emits exactly one item
*/
public static <T> Single<T> fromObservable(ObservableSource<T> observableSource);Transform the value emitted by a Single.
/**
* Transforms the item using a function
*/
public final <R> Single<R> map(Function<? super T, ? extends R> mapper);
/**
* Transforms the item into another Single and flattens the result
*/
public final <R> Single<R> flatMap(Function<? super T, ? extends SingleSource<? extends R>> mapper);
/**
* Transforms the item into an Observable
*/
public final <R> Observable<R> flatMapObservable(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
/**
* Transforms the item into a Flowable
*/
public final <R> Flowable<R> flatMapFlowable(Function<? super T, ? extends Publisher<? extends R>> mapper);
/**
* Transforms the item into a Maybe
*/
public final <R> Maybe<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper);
/**
* Transforms the item into a Completable
*/
public final Completable flatMapCompletable(Function<? super T, ? extends CompletableSource> mapper);
/**
* Casts the item to the specified type
*/
public final <U> Single<U> cast(Class<U> clazz);Apply conditions to Single operations.
/**
* Converts Single to Maybe, filtering based on predicate
*/
public final Maybe<T> filter(Predicate<? super T> predicate);
/**
* Tests whether the Single emits an item that satisfies a condition
*/
public final Single<Boolean> contains(Object item);Handle errors in Single operations.
/**
* Returns a Single that emits a specified item if the source emits an error
*/
public final Single<T> onErrorReturn(Function<? super Throwable, ? extends T> valueSupplier);
public final Single<T> onErrorReturn(T value);
/**
* Returns a Single that switches to another Single if the source emits an error
*/
public final Single<T> onErrorResumeNext(Function<? super Throwable, ? extends SingleSource<? extends T>> resumeFunction);
public final Single<T> onErrorResumeNext(SingleSource<? extends T> resumeSingleSource);
/**
* Retry on error
*/
public final Single<T> retry();
public final Single<T> retry(long times);
public final Single<T> retry(BiPredicate<? super Integer, ? super Throwable> predicate);
public final Single<T> retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler);Combine multiple Singles.
/**
* Combines two Singles using a combiner function
*/
public static <T1, T2, R> Single<R> zip(
SingleSource<? extends T1> source1,
SingleSource<? extends T2> source2,
BiFunction<? super T1, ? super T2, ? extends R> zipper
);
/**
* Zips up to 9 Singles
*/
public static <T1, T2, T3, R> Single<R> zip(
SingleSource<? extends T1> source1,
SingleSource<? extends T2> source2,
SingleSource<? extends T3> source3,
Function3<? super T1, ? super T2, ? super T3, ? extends R> zipper
);
/**
* Returns the first Single to emit successfully
*/
public static <T> Single<T> amb(SingleSource<? extends T>... sources);
public static <T> Single<T> amb(Iterable<? extends SingleSource<? extends T>> sources);
/**
* Concatenates Singles sequentially
*/
public static <T> Observable<T> concat(SingleSource<? extends T>... sources);
public static <T> Observable<T> concat(Iterable<? extends SingleSource<? extends T>> sources);
/**
* Merges Singles into an Observable
*/
public static <T> Observable<T> merge(SingleSource<? extends T>... sources);
public static <T> Observable<T> merge(Iterable<? extends SingleSource<? extends T>> sources);Control execution context for Singles.
/**
* Specifies the Scheduler on which the Single will operate
*/
public final Single<T> subscribeOn(Scheduler scheduler);
/**
* Specifies the Scheduler on which observers will be notified
*/
public final Single<T> observeOn(Scheduler scheduler);Add delays and timeouts to Singles.
/**
* Delays the emission of the success signal
*/
public final Single<T> delay(long time, TimeUnit unit);
public final Single<T> delay(long time, TimeUnit unit, Scheduler scheduler);
/**
* Adds a timeout to the Single
*/
public final Single<T> timeout(long timeout, TimeUnit timeUnit);
public final Single<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler);
public final Single<T> timeout(long timeout, TimeUnit timeUnit, SingleSource<? extends T> other);Convert Single to other reactive types.
/**
* Converts Single to Observable
*/
public final Observable<T> toObservable();
/**
* Converts Single to Flowable
*/
public final Flowable<T> toFlowable();
/**
* Converts Single to Maybe
*/
public final Maybe<T> toMaybe();
/**
* Converts Single to Completable (ignoring the value)
*/
public final Completable toCompletable();Subscribe to a Single and consume the emitted value.
/**
* Subscribes with separate callbacks
*/
public final Disposable subscribe();
public final Disposable subscribe(Consumer<? super T> onSuccess);
public final Disposable subscribe(Consumer<? super T> onSuccess, Consumer<? super Throwable> onError);
/**
* Subscribes with a SingleObserver
*/
public final void subscribe(SingleObserver<? super T> observer);
/**
* Blocking operations - use with caution
*/
public final T blockingGet();Additional utility operations for Singles.
/**
* Caches the result of the Single
*/
public final Single<T> cache();
/**
* Performs side-effects without affecting the Single
*/
public final Single<T> doOnSuccess(Consumer<? super T> onSuccess);
public final Single<T> doOnError(Consumer<? super Throwable> onError);
public final Single<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe);
public final Single<T> doOnDispose(Action onDispose);
public final Single<T> doFinally(Action onFinally);
/**
* Repeats the Single subscription
*/
public final Observable<T> repeat();
public final Observable<T> repeat(long times);Basic Single Creation and Subscription:
import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.disposables.Disposable;
// Simple Single
Single<String> source = Single.just("Hello Single");
source.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Subscribed");
}
@Override
public void onSuccess(String value) {
System.out.println("Success: " + value);
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
});
// Lambda-style subscription
source.subscribe(
value -> System.out.println("Got: " + value),
error -> error.printStackTrace()
);Async Operations with Single:
import java.util.concurrent.Callable;
// HTTP request simulation
Single<String> httpRequest = Single.fromCallable(() -> {
// Simulate network call
Thread.sleep(1000);
return "Response from server";
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()); // or Schedulers.computation()
httpRequest.subscribe(
response -> System.out.println("HTTP Response: " + response),
error -> System.err.println("HTTP Error: " + error)
);Error Handling with Single:
Single<Integer> riskyOperation = Single.fromCallable(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Random failure");
}
return 42;
});
// Handle errors with fallback value
riskyOperation
.onErrorReturn(throwable -> {
System.out.println("Error occurred: " + throwable.getMessage());
return -1; // Fallback value
})
.subscribe(result -> System.out.println("Result: " + result));
// Retry on failure
riskyOperation
.retry(3) // Retry up to 3 times
.subscribe(
result -> System.out.println("Success: " + result),
error -> System.out.println("Failed after retries: " + error)
);Combining Singles:
Single<String> firstName = Single.just("John");
Single<String> lastName = Single.just("Doe");
// Zip two Singles together
Single<String> fullName = Single.zip(firstName, lastName,
(first, last) -> first + " " + last);
fullName.subscribe(name -> System.out.println("Full name: " + name));
// Chain Singles
Single<String> greeting = firstName
.flatMap(name -> Single.just("Hello, " + name + "!"));
greeting.subscribe(msg -> System.out.println(msg));Transforming Single to Other Types:
Single<Integer> source = Single.just(5);
// Convert to Observable
Observable<Integer> observable = source.toObservable();
observable.subscribe(System.out::println);
// Convert to Maybe
Maybe<Integer> maybe = source.toMaybe();
maybe.subscribe(System.out::println);
// Convert to Completable (ignoring value)
Completable completable = source.toCompletable();
completable.subscribe(() -> System.out.println("Completed"));Custom Single Creation:
Single<String> customSingle = Single.create(emitter -> {
try {
// Simulate some work
String result = performSomeOperation();
emitter.onSuccess(result);
} catch (Exception e) {
emitter.onError(e);
}
});
private String performSomeOperation() {
// Simulate work
return "Operation completed";
}/**
* Observer interface for Singles
*/
public interface SingleObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T t);
void onError(Throwable e);
}
/**
* Functional interface for creating Singles
*/
public interface SingleOnSubscribe<T> {
void subscribe(SingleEmitter<T> emitter) throws Exception;
}
/**
* Emitter for SingleOnSubscribe
*/
public interface SingleEmitter<T> {
void onSuccess(T t);
void onError(Throwable t);
void setDisposable(Disposable d);
void setCancellable(Cancellable c);
boolean isDisposed();
}
/**
* Base interface for Single sources
*/
public interface SingleSource<T> {
void subscribe(SingleObserver<? super T> observer);
}Install with Tessl CLI
npx tessl i tessl/maven-io-reactivex-rxjava2--rxjava