CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-helidon-common--helidon-common-reactive

Helidon Common Reactive Library - A reactive programming library providing Multi and Single abstractions

Pending
Overview
Eval results
Files

single.mddocs/

Single Reactive Values

Single represents reactive streams that emit at most one item, then complete, or signal an error. It implements CompletionStage and provides blocking await operations for integration with existing synchronous code.

Capabilities

Core Single Interface

The main reactive interface for 0-1 item sequences with CompletionStage compatibility.

/**
 * Represents a Flow.Publisher that may: signal one item then completes, complete without
 * an item or signal an error.
 * @param <T> item type
 */
public interface Single<T> extends Subscribable<T>, CompletionStage<T>, Awaitable<T> {
}

Factory Methods - Source Creation

Create Single instances from various sources.

/**
 * Create empty Single that completes immediately
 * @param <T> item type
 * @return empty Single
 */
static <T> Single<T> empty();

/**
 * Create Single that signals error immediately
 * @param <T> item type
 * @param error the error to signal
 * @return error Single
 * @throws NullPointerException if error is null
 */
static <T> Single<T> error(Throwable error);

/**
 * Create Single that never completes
 * @param <T> item type
 * @return never-completing Single
 */
static <T> Single<T> never();

/**
 * Create Single with item
 * @param <T> item type
 * @param item item to emit
 * @return Single emitting the item
 */
static <T> Single<T> just(T item);

/**
 * Create Single from supplier
 * @param <T> item type
 * @param supplier supplier of the item
 * @return Single from supplier
 * @throws NullPointerException if supplier is null
 */
static <T> Single<T> create(Supplier<? extends T> supplier);

/**
 * Deferred Single creation per subscriber
 * @param <T> item type
 * @param supplier supplier function called for each subscriber
 * @return deferred Single
 * @throws NullPointerException if supplier is null
 */
static <T> Single<T> defer(Supplier<? extends Single<? extends T>> supplier);

Factory Methods - From Existing Sources

Convert existing data sources to Single instances.

/**
 * Create Single from CompletionStage
 * @param <T> item type
 * @param completionStage completion stage to convert
 * @return Single from CompletionStage
 * @throws NullPointerException if completionStage is null
 */
static <T> Single<T> create(CompletionStage<? extends T> completionStage);

/**
 * Create Single from CompletionStage with null handling
 * @param <T> item type
 * @param completionStage completion stage to convert
 * @param nullMeansEmpty if true, null result means empty Single
 * @return Single from CompletionStage
 * @throws NullPointerException if completionStage is null
 */
static <T> Single<T> create(CompletionStage<? extends T> completionStage, boolean nullMeansEmpty);

/**
 * Create Single from Publisher (expects 0-1 items)
 * @param <T> item type
 * @param publisher publisher to convert
 * @return Single from Publisher
 * @throws NullPointerException if publisher is null
 */
static <T> Single<T> create(Flow.Publisher<? extends T> publisher);

/**
 * Copy constructor for Single
 * @param <T> item type
 * @param single single to copy
 * @return copied Single
 * @throws NullPointerException if single is null
 */
static <T> Single<T> create(Single<? extends T> single);

Factory Methods - Timing

Create time-based Single emissions.

/**
 * Single delayed emission
 * @param time delay time
 * @param unit time unit
 * @param executor scheduled executor for timing
 * @return Single emitting after delay
 * @throws NullPointerException if unit or executor is null
 */
static Single<Long> timer(long time, TimeUnit unit, ScheduledExecutorService executor);

Transformation Operators - Mapping & Transformation

Transform Single values and convert to other reactive types.

/**
 * Transform the item
 * @param <U> result type
 * @param mapper transformation function
 * @return transformed Single
 * @throws NullPointerException if mapper is null
 */
<U> Single<U> map(Function<? super T, ? extends U> mapper);

/**
 * Transform to Multi
 * @param <U> result type
 * @param mapper function producing publishers
 * @return Multi from transformation
 * @throws NullPointerException if mapper is null
 */
<U> Multi<U> flatMap(Function<? super T, ? extends Flow.Publisher<? extends U>> mapper);

/**
 * Transform to another Single
 * @param <U> result type
 * @param mapper function producing Single
 * @return flattened Single
 * @throws NullPointerException if mapper is null
 */
<U> Single<U> flatMapSingle(Function<? super T, ? extends Single<? extends U>> mapper);

/**
 * Transform to CompletionStage
 * @param <U> result type
 * @param mapper function producing CompletionStage
 * @return Single from CompletionStage
 * @throws NullPointerException if mapper is null
 */
<U> Single<U> flatMapCompletionStage(Function<? super T, ? extends CompletionStage<? extends U>> mapper);

/**
 * Transform to Multi via Iterable
 * @param <U> result type
 * @param mapper function producing Iterable
 * @return Multi from Iterable
 * @throws NullPointerException if mapper is null
 */
<U> Multi<U> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> mapper);

/**
 * Transform via Optional
 * @param <U> result type
 * @param mapper function producing Optional
 * @return Single from Optional (empty if Optional is empty)
 * @throws NullPointerException if mapper is null
 */
<U> Single<U> flatMapOptional(Function<? super T, ? extends Optional<? extends U>> mapper);

Flow Control & Default Value Operators

Control Single behavior and provide fallback values.

/**
 * Provide default item if empty
 * @param defaultItem default item to emit
 * @return Single with default value
 */
Single<T> defaultIfEmpty(T defaultItem);

/**
 * Provide default item via supplier if empty
 * @param supplier supplier of default item
 * @return Single with default value from supplier
 * @throws NullPointerException if supplier is null
 */
Single<T> defaultIfEmpty(Supplier<? extends T> supplier);

/**
 * Switch to other Single if empty
 * @param other alternative Single
 * @return Single switching to alternative if empty
 * @throws NullPointerException if other is null
 */
Single<T> switchIfEmpty(Single<? extends T> other);

/**
 * Execute action if empty
 * @param emptyAction action to execute
 * @return Single executing action if empty
 * @throws NullPointerException if emptyAction is null
 */
Single<T> ifEmpty(Runnable emptyAction);

Error Handling Operators

Handle errors and implement retry logic for Single values.

/**
 * Resume with item on error
 * @param resumeFunction function providing resume value
 * @return Single with error handling
 * @throws NullPointerException if resumeFunction is null
 */
Single<T> onErrorResume(Function<? super Throwable, ? extends T> resumeFunction);

/**
 * Resume with Single on error
 * @param resumeFunction function providing resume Single
 * @return Single with error handling
 * @throws NullPointerException if resumeFunction is null
 */
Single<T> onErrorResumeWithSingle(Function<? super Throwable, ? extends Single<? extends T>> resumeFunction);

/**
 * Resume with Publisher on error (returns Multi)
 * @param resumeFunction function providing resume Publisher
 * @return Multi with error handling
 * @throws NullPointerException if resumeFunction is null
 */
Multi<T> onErrorResumeWith(Function<? super Throwable, ? extends Flow.Publisher<? extends T>> resumeFunction);

/**
 * Retry N times
 * @param count number of retries
 * @return Single with retry logic
 */
Single<T> retry(long count);

/**
 * Conditional retry
 * @param retryPredicate predicate testing if retry should occur
 * @return Single with conditional retry
 * @throws NullPointerException if retryPredicate is null
 */
Single<T> retry(BiPredicate<? super Throwable, ? super Long> retryPredicate);

/**
 * Advanced retry control
 * @param <U> signal type
 * @param whenRetryFunction function controlling retry timing
 * @return Single with advanced retry control
 * @throws NullPointerException if whenRetryFunction is null
 */
<U> Single<T> retryWhen(BiFunction<? super Throwable, ? super Long, ? extends Flow.Publisher<U>> whenRetryFunction);

Completion Handling Operators

Handle completion events and append additional items or streams.

/**
 * Append item after completion (returns Multi)
 * @param resumeValue item to append
 * @return Multi with appended item
 */
Multi<T> onCompleteResume(T resumeValue);

/**
 * Append publisher after completion (returns Multi)
 * @param resumePublisher publisher to append
 * @return Multi with appended publisher
 * @throws NullPointerException if resumePublisher is null
 */
Multi<T> onCompleteResumeWith(Flow.Publisher<? extends T> resumePublisher);

/**
 * Resume with Single based on completion state
 * @param resumeFunction function receiving Optional of completed value
 * @return Single with conditional resume
 * @throws NullPointerException if resumeFunction is null
 */
Single<T> onCompleteResumeWithSingle(Function<Optional<T>, Single<T>> resumeFunction);

Side Effect Operators

Observe and react to Single events without modifying the value.

/**
 * Observe item without modification
 * @param consumer observer function
 * @return Single with side effect
 * @throws NullPointerException if consumer is null
 */
Single<T> peek(Consumer<? super T> consumer);

/**
 * Execute on cancellation
 * @param onCancel action to execute
 * @return Single with cancel handler
 * @throws NullPointerException if onCancel is null
 */
Single<T> onCancel(Runnable onCancel);

/**
 * Execute on completion
 * @param onComplete action to execute
 * @return Single with completion handler
 * @throws NullPointerException if onComplete is null
 */
Single<T> onComplete(Runnable onComplete);

/**
 * Execute on error
 * @param onError action to execute
 * @return Single with error handler
 * @throws NullPointerException if onError is null
 */
Single<T> onError(Consumer<? super Throwable> onError);

/**
 * Execute on any termination (complete/error/cancel)
 * @param onTerminate action to execute
 * @return Single with termination handler
 * @throws NullPointerException if onTerminate is null
 */
Single<T> onTerminate(Runnable onTerminate);

Threading & Timing Operators

Control execution context and implement timeout behavior.

/**
 * Switch execution context
 * @param executor executor for downstream operations
 * @return Single executing on specified executor
 * @throws NullPointerException if executor is null
 */
Single<T> observeOn(Executor executor);

/**
 * Take until other publisher signals
 * @param <U> signal type
 * @param other publisher to signal stop
 * @return Single taking until signal
 * @throws NullPointerException if other is null
 */
<U> Single<T> takeUntil(Flow.Publisher<U> other);

/**
 * Timeout with error
 * @param timeout timeout duration
 * @param unit time unit
 * @param executor scheduled executor for timeout
 * @return Single with timeout
 * @throws NullPointerException if unit or executor is null
 */
Single<T> timeout(long timeout, TimeUnit unit, ScheduledExecutorService executor);

/**
 * Timeout with fallback
 * @param timeout timeout duration
 * @param unit time unit
 * @param executor scheduled executor for timeout
 * @param fallback fallback Single
 * @return Single with timeout and fallback
 * @throws NullPointerException if unit, executor, or fallback is null
 */
Single<T> timeout(long timeout, TimeUnit unit, ScheduledExecutorService executor, Single<? extends T> fallback);

Utility Operators

Debug, compose, and transform Single instances.

/**
 * Apply custom composition
 * @param <U> result type
 * @param composer composition function
 * @return result of composition
 * @throws NullPointerException if composer is null
 */
<U> U compose(Function<? super Single<T>, ? extends U> composer);

/**
 * Terminal transformation
 * @param <U> result type
 * @param converter conversion function
 * @return converted result
 * @throws NullPointerException if converter is null
 */
<U> U to(Function<? super Single<T>, ? extends U> converter);

/**
 * Log all reactive signals
 * @return Single with logging
 */
Single<T> log();

/**
 * Log with specific level
 * @param level logging level
 * @return Single with logging at level
 * @throws NullPointerException if level is null
 */
Single<T> log(Level level);

/**
 * Log with custom logger
 * @param level logging level
 * @param loggerName logger name
 * @return Single with custom logging
 * @throws NullPointerException if level or loggerName is null
 */
Single<T> log(Level level, String loggerName);

/**
 * Log with trace information
 * @param level logging level
 * @param trace include trace information
 * @return Single with trace logging
 * @throws NullPointerException if level is null
 */
Single<T> log(Level level, boolean trace);

CompletionStage Integration

Single implements CompletionStage, providing full async composition capabilities.

Standard CompletionStage Methods

All standard CompletionStage methods return CompletionAwaitable<T> for enhanced functionality.

/**
 * CompletionStage transformation methods
 */
<U> CompletionAwaitable<U> thenApply(Function<? super T, ? extends U> fn);
CompletionAwaitable<Void> thenAccept(Consumer<? super T> action);
CompletionAwaitable<Void> thenRun(Runnable action);

<U> CompletionAwaitable<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);

<U, V> CompletionAwaitable<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn);
<U> CompletionAwaitable<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
CompletionAwaitable<Void> runAfterBoth(CompletionStage<?> other, Runnable action);

<U> CompletionAwaitable<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
CompletionAwaitable<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
CompletionAwaitable<Void> runAfterEither(CompletionStage<?> other, Runnable action);

CompletionAwaitable<T> exceptionally(Function<Throwable, ? extends T> fn);
CompletionAwaitable<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
<U> CompletionAwaitable<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

Additional CompletionStage Methods

Enhanced error handling method specific to CompletionAwaitable.

/**
 * Handle exceptions with consumer (does not transform result)
 * @param exceptionConsumer consumer for exceptions
 * @return CompletionAwaitable with exception handling
 * @throws NullPointerException if exceptionConsumer is null
 */
CompletionAwaitable<T> exceptionallyAccept(Consumer<? super Throwable> exceptionConsumer);

Blocking Operations & Integration

Single provides convenient blocking operations through the Awaitable interface.

Awaitable Methods

/**
 * Block until completion (unchecked exceptions only)
 * @return the result value
 * @throws RuntimeException for any checked exceptions
 */
T await();

/**
 * Block with timeout (unchecked exceptions)
 * @param timeout timeout duration
 * @return the result value
 * @throws RuntimeException for timeout or checked exceptions
 * @throws NullPointerException if timeout is null
 */
T await(Duration timeout);

/**
 * Block and get result (may throw checked exceptions)
 * @return the result value
 * @throws InterruptedException if interrupted while waiting
 * @throws ExecutionException if computation threw exception
 */
T get() throws InterruptedException, ExecutionException;

/**
 * Block with timeout and get result
 * @param timeout timeout value
 * @param unit timeout unit
 * @return the result value
 * @throws InterruptedException if interrupted while waiting
 * @throws ExecutionException if computation threw exception
 * @throws TimeoutException if timeout elapsed
 * @throws NullPointerException if unit is null
 */
T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

Conversion Methods

/**
 * Convert to CompletionStage
 * @return CompletionStage representation
 */
CompletionStage<T> toStage();

/**
 * Convert to CompletionStage with empty completion control
 * @param completeWithoutValue if true, complete with null for empty Single
 * @return CompletionStage representation
 */
CompletionStage<T> toStage(boolean completeWithoutValue);

/**
 * Convert to Single<Optional<T>>
 * @return Single wrapping result in Optional
 */
Single<Optional<T>> toOptionalSingle();

/**
 * Cancel upstream and return new Single
 * @return cancelled Single
 */
Single<T> cancel();

Terminal Operations

Final operations for consuming Single values.

/**
 * Consume the item if present
 * @param consumer item consumer
 * @throws NullPointerException if consumer is null
 */
void forSingle(Consumer<? super T> consumer);

/**
 * Ignore the item, complete when done
 * @return Single<Void> that completes when original completes
 */
Single<Void> ignoreElement();

Usage Examples

Basic Single Operations

import io.helidon.common.reactive.Single;
import java.util.concurrent.CompletableFuture;

// Create Single from value
Single<String> greeting = Single.just("Hello")
    .map(s -> s + " World!");

String result = greeting.await();
// Result: "Hello World!"

// From CompletionStage
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42);
Single<Integer> value = Single.create(future);

Integer number = value.await();
// Result: 42

Error Handling

Single<String> withErrorHandling = Single.error(new RuntimeException("Error"))
    .onErrorResume(error -> "Fallback value");

String result = withErrorHandling.await();
// Result: "Fallback value"

// Retry with backoff
Single<String> withRetry = Single.error(new RuntimeException("Network error"))
    .retry(3)
    .onErrorResume(error -> "Final fallback");

String final_result = withRetry.await();
// Result: "Final fallback" (after 3 retries)

CompletionStage Integration

Single<String> data = Single.just("data");

// Use as CompletionStage
CompletionStage<String> stage = data
    .thenApply(String::toUpperCase)
    .thenApply(s -> s + "!");

String result = stage.toCompletableFuture().join();
// Result: "DATA!"

// Combine with other CompletionStages
CompletableFuture<String> other = CompletableFuture.completedFuture("other");
CompletionStage<String> combined = data.thenCombine(other, (a, b) -> a + "-" + b);

String combinedResult = combined.toCompletableFuture().join();
// Result: "data-other"

Transformation to Multi

Single<List<String>> items = Single.just(Arrays.asList("a", "b", "c"));

// FlatMap to Multi
Multi<String> stream = items.flatMapIterable(Function.identity());

List<String> result = stream.collectList().await();
// Result: ["a", "b", "c"]

Timeout and Fallback

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Single<String> delayed = Single.timer(5, TimeUnit.SECONDS, executor)
    .map(ignored -> "Delayed result");

Single<String> withTimeout = delayed
    .timeout(2, TimeUnit.SECONDS, executor, Single.just("Timeout fallback"));

String result = withTimeout.await();
// Result: "Timeout fallback" (timeout after 2 seconds)

Install with Tessl CLI

npx tessl i tessl/maven-io-helidon-common--helidon-common-reactive

docs

index.md

io-integration.md

multi.md

single.md

supporting-types.md

tile.json