CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-helidon-common--helidon-common-reactive

Helidon Common Reactive Library - A reactive programming library providing Multi and Single abstractions

Pending
Overview
Eval results
Files

multi.mddocs/

Multi Reactive Streams

Multi represents reactive streams that emit zero or more items, optionally followed by an error or completion. It provides comprehensive operators for transformation, filtering, error handling, and collection operations with full backpressure support.

Capabilities

Core Multi Interface

The main reactive streams interface for 0-N item sequences.

/**
 * Represents a Flow.Publisher emitting zero or more items, optionally followed by
 * an error or completion.
 * @param <T> item type
 */
public interface Multi<T> extends Subscribable<T> {
}

Factory Methods - Source Creation

Create Multi instances from various sources and generators.

/**
 * Create empty Multi that completes immediately
 * @param <T> item type
 * @return empty Multi
 */
static <T> Multi<T> empty();

/**
 * Create Multi that signals error immediately
 * @param <T> item type
 * @param error the error to signal
 * @return error Multi
 * @throws NullPointerException if error is null
 */
static <T> Multi<T> error(Throwable error);

/**
 * Create Multi that never completes
 * @param <T> item type
 * @return never-completing Multi
 */
static <T> Multi<T> never();

/**
 * Create Multi from varargs items
 * @param <T> item type
 * @param items items to emit
 * @return Multi emitting the items
 */
static <T> Multi<T> just(T... items);

/**
 * Create Multi from collection items
 * @param <T> item type
 * @param items collection of items to emit
 * @return Multi emitting the items
 */
static <T> Multi<T> just(Collection<? extends T> items);

/**
 * Create Multi with single item
 * @param <T> item type
 * @param item single item to emit
 * @return Multi emitting the single item
 */
static <T> Multi<T> singleton(T item);

/**
 * Deferred Multi creation per subscriber
 * @param <T> item type
 * @param supplier supplier function called for each subscriber
 * @return deferred Multi
 * @throws NullPointerException if supplier is null
 */
static <T> Multi<T> defer(Supplier<? extends Flow.Publisher<? extends T>> supplier);

Factory Methods - From Existing Sources

Convert existing data sources to Multi streams.

/**
 * Wrap existing Publisher as Multi
 * @param <T> item type
 * @param publisher publisher to wrap
 * @return Multi wrapping the publisher
 */
static <T> Multi<T> create(Flow.Publisher<? extends T> publisher);

/**
 * Convert Single to Multi
 * @param <T> item type
 * @param single single to convert
 * @return Multi from Single
 */
static <T> Multi<T> create(Single<? extends T> single);

/**
 * Convert CompletionStage to Multi
 * @param <T> item type
 * @param completionStage completion stage to convert
 * @return Multi from CompletionStage
 */
static <T> Multi<T> create(CompletionStage<? extends T> completionStage);

/**
 * Convert CompletionStage to Multi with null handling control
 * @param <T> item type
 * @param completionStage completion stage to convert
 * @param nullMeansEmpty if true, null result means empty Multi
 * @return Multi from CompletionStage
 */
static <T> Multi<T> create(CompletionStage<? extends T> completionStage, boolean nullMeansEmpty);

/**
 * Create Multi from Iterable
 * @param <T> item type
 * @param iterable iterable to convert
 * @return Multi emitting iterable items
 */
static <T> Multi<T> create(Iterable<? extends T> iterable);

/**
 * Create Multi from Stream (closes stream when done)
 * @param <T> item type
 * @param stream stream to convert
 * @return Multi emitting stream items
 */
static <T> Multi<T> create(Stream<? extends T> stream);

Factory Methods - Sequence Generation

Generate numeric sequences and timed emissions.

/**
 * Generate integer sequence
 * @param start starting value (inclusive)
 * @param count number of items to generate
 * @return Multi emitting integer sequence
 */
static Multi<Integer> range(int start, int count);

/**
 * Generate long sequence
 * @param start starting value (inclusive)
 * @param count number of items to generate
 * @return Multi emitting long sequence
 */
static Multi<Long> rangeLong(long start, long count);

/**
 * Periodic timer sequence
 * @param period time between emissions
 * @param unit time unit
 * @param executor scheduled executor for timing
 * @return Multi emitting periodic signals
 */
static Multi<Long> interval(long period, TimeUnit unit, ScheduledExecutorService executor);

/**
 * Delayed periodic timer sequence
 * @param initialDelay initial delay before first emission
 * @param period time between emissions
 * @param unit time unit
 * @param executor scheduled executor for timing
 * @return Multi emitting delayed periodic signals
 */
static Multi<Long> interval(long initialDelay, long period, TimeUnit unit, ScheduledExecutorService executor);

/**
 * Single delayed emission
 * @param time delay time
 * @param unit time unit
 * @param executor scheduled executor for timing
 * @return Multi emitting single delayed signal
 */
static Multi<Long> timer(long time, TimeUnit unit, ScheduledExecutorService executor);

Factory Methods - Composition

Combine multiple publishers into single streams.

/**
 * Concatenate two publishers sequentially
 * @param <T> item type
 * @param firstMulti first stream
 * @param secondMulti second stream
 * @return concatenated Multi
 */
static <T> Multi<T> concat(Flow.Publisher<? extends T> firstMulti, Flow.Publisher<? extends T> secondMulti);

/**
 * Concatenate multiple publishers sequentially
 * @param <T> item type
 * @param firstPublisher first stream
 * @param secondPublisher second stream
 * @param morePublishers additional publishers
 * @return concatenated Multi
 */
static <T> Multi<T> concat(Flow.Publisher<? extends T> firstPublisher, 
                          Flow.Publisher<? extends T> secondPublisher,
                          Flow.Publisher<? extends T>... morePublishers);

/**
 * Concatenate array of publishers
 * @param <T> item type
 * @param publishers array of publishers to concatenate
 * @return concatenated Multi
 */
static <T> Multi<T> concatArray(Flow.Publisher<? extends T>... publishers);

Transformation Operators - Filtering & Selection

Filter and select items from streams.

/**
 * Filter items based on predicate
 * @param predicate filter predicate
 * @return filtered Multi
 * @throws NullPointerException if predicate is null
 */
Multi<T> filter(Predicate<? super T> predicate);

/**
 * Remove duplicate items
 * @return Multi with duplicates removed
 */
Multi<T> distinct();

/**
 * Take only first N items
 * @param maxSize maximum number of items
 * @return limited Multi
 */
Multi<T> limit(long maxSize);

/**
 * Skip first N items
 * @param count number of items to skip
 * @return Multi with items skipped
 */
Multi<T> skip(long count);

/**
 * Take items while predicate is true
 * @param predicate condition predicate
 * @return Multi taking while condition holds
 * @throws NullPointerException if predicate is null
 */
Multi<T> takeWhile(Predicate<? super T> predicate);

/**
 * Skip items while predicate is true
 * @param predicate condition predicate
 * @return Multi dropping while condition holds
 * @throws NullPointerException if predicate is null
 */
Multi<T> dropWhile(Predicate<? super T> predicate);

/**
 * Take items until other publisher signals
 * @param <U> signal type
 * @param other publisher to signal stop
 * @return Multi taking until signal
 * @throws NullPointerException if other is null
 */
<U> Multi<T> takeUntil(Flow.Publisher<U> other);

/**
 * Get first item as Single
 * @return Single with first item or empty
 */
Single<T> first();

Transformation Operators - Mapping & Transformation

Transform and reshape stream items.

/**
 * Transform each item
 * @param <U> result type
 * @param mapper transformation function
 * @return transformed Multi
 * @throws NullPointerException if mapper is null
 */
<U> Multi<U> map(Function<? super T, ? extends U> mapper);

/**
 * Transform and flatten publishers
 * @param <U> result type
 * @param mapper function producing publishers
 * @return flattened Multi
 * @throws NullPointerException if mapper is null
 */
<U> Multi<U> flatMap(Function<? super T, ? extends Flow.Publisher<? extends U>> mapper);

/**
 * Advanced flatMap with concurrency control
 * @param <U> result type
 * @param mapper function producing publishers
 * @param maxConcurrency maximum concurrent inner publishers
 * @param delayErrors if true, delay errors until all complete
 * @param prefetch prefetch amount for inner publishers
 * @return flattened Multi with concurrency control
 */
<U> Multi<U> flatMap(Function<? super T, ? extends Flow.Publisher<? extends U>> mapper,
                     long maxConcurrency, boolean delayErrors, long prefetch);

/**
 * FlatMap CompletionStages
 * @param <U> result type
 * @param mapper function producing CompletionStages
 * @return flattened Multi from CompletionStages
 * @throws NullPointerException if mapper is null
 */
<U> Multi<U> flatMapCompletionStage(Function<? super T, ? extends CompletionStage<? extends U>> mapper);

/**
 * FlatMap Iterables
 * @param <U> result type
 * @param mapper function producing Iterables
 * @return flattened Multi from Iterables
 * @throws NullPointerException if mapper is null
 */
<U> Multi<U> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> mapper);

/**
 * FlatMap Iterables with prefetch control
 * @param <U> result type
 * @param mapper function producing Iterables
 * @param prefetch prefetch amount
 * @return flattened Multi from Iterables
 */
<U> Multi<U> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> mapper, int prefetch);

/**
 * FlatMap Optionals
 * @param <U> result type
 * @param mapper function producing Optionals
 * @return flattened Multi from Optionals
 * @throws NullPointerException if mapper is null
 */
<U> Multi<U> flatMapOptional(Function<? super T, ? extends Optional<? extends U>> mapper);

Error Handling Operators

Handle errors and implement retry logic.

/**
 * Resume with single item on error
 * @param resumeFunction function providing resume value
 * @return Multi with error handling
 * @throws NullPointerException if resumeFunction is null
 */
Multi<T> onErrorResume(Function<? super Throwable, ? extends T> resumeFunction);

/**
 * Resume with publisher on error
 * @param resumeFunction function providing resume publisher
 * @return Multi with error handling
 * @throws NullPointerException if resumeFunction is null
 */
Multi<T> onErrorResumeWith(Function<? super Throwable, ? extends Flow.Publisher<? extends T>> resumeFunction);

/**
 * Retry failed stream N times
 * @param count number of retries
 * @return Multi with retry logic
 */
Multi<T> retry(long count);

/**
 * Conditional retry
 * @param retryPredicate predicate testing if retry should occur
 * @return Multi with conditional retry
 * @throws NullPointerException if retryPredicate is null
 */
Multi<T> retry(BiPredicate<? super Throwable, ? super Long> retryPredicate);

/**
 * Advanced retry control
 * @param <U> signal type
 * @param whenRetryFunction function controlling retry timing
 * @return Multi with advanced retry control
 * @throws NullPointerException if whenRetryFunction is null
 */
<U> Multi<T> retryWhen(BiFunction<? super Throwable, ? super Long, ? extends Flow.Publisher<U>> whenRetryFunction);

Flow Control & Default Value Operators

Control stream flow and provide fallback values.

/**
 * Provide default item if stream is empty
 * @param defaultItem default item to emit
 * @return Multi with default value
 */
Multi<T> defaultIfEmpty(T defaultItem);

/**
 * Provide default item via supplier if stream is empty
 * @param supplier supplier of default item
 * @return Multi with default value from supplier
 * @throws NullPointerException if supplier is null
 */
Multi<T> defaultIfEmpty(Supplier<? extends T> supplier);

/**
 * Switch to alternative publisher if empty
 * @param other alternative publisher
 * @return Multi switching to alternative if empty
 * @throws NullPointerException if other is null
 */
Multi<T> switchIfEmpty(Flow.Publisher<? extends T> other);

/**
 * Execute action if stream is empty
 * @param emptyAction action to execute
 * @return Multi executing action if empty
 * @throws NullPointerException if emptyAction is null
 */
Multi<T> ifEmpty(Runnable emptyAction);

Completion Handling Operators

Handle completion events and append additional items.

/**
 * Append item after completion
 * @param resumeValue item to append
 * @return Multi with appended item
 */
Multi<T> onCompleteResume(T resumeValue);

/**
 * Append publisher after completion
 * @param resumePublisher publisher to append
 * @return Multi with appended publisher
 * @throws NullPointerException if resumePublisher is null
 */
Multi<T> onCompleteResumeWith(Flow.Publisher<? extends T> resumePublisher);

Side Effect Operators

Observe and react to stream events without modifying the stream.

/**
 * Observe items without modification
 * @param consumer observer function
 * @return Multi with side effect
 * @throws NullPointerException if consumer is null
 */
Multi<T> peek(Consumer<? super T> consumer);

/**
 * Execute on cancellation
 * @param onCancel action to execute
 * @return Multi with cancel handler
 * @throws NullPointerException if onCancel is null
 */
Multi<T> onCancel(Runnable onCancel);

/**
 * Execute on completion
 * @param onComplete action to execute
 * @return Multi with completion handler
 * @throws NullPointerException if onComplete is null
 */
Multi<T> onComplete(Runnable onComplete);

/**
 * Execute on error
 * @param onError action to execute
 * @return Multi with error handler
 * @throws NullPointerException if onError is null
 */
Multi<T> onError(Consumer<? super Throwable> onError);

/**
 * Execute on any termination (complete/error/cancel)
 * @param onTerminate action to execute
 * @return Multi with termination handler
 * @throws NullPointerException if onTerminate is null
 */
Multi<T> onTerminate(Runnable onTerminate);

Threading & Timing Operators

Control execution context and implement timeout behavior.

/**
 * Switch execution context
 * @param executor executor for downstream operations
 * @return Multi executing on specified executor
 * @throws NullPointerException if executor is null
 */
Multi<T> observeOn(Executor executor);

/**
 * Advanced threading control
 * @param executor executor for downstream operations
 * @param bufferSize buffer size for context switching
 * @param delayErrors if true, delay errors until buffer is drained
 * @return Multi with advanced threading control
 */
Multi<T> observeOn(Executor executor, int bufferSize, boolean delayErrors);

/**
 * Timeout with error
 * @param timeout timeout duration
 * @param unit time unit
 * @param executor scheduled executor for timeout
 * @return Multi with timeout
 * @throws NullPointerException if unit or executor is null
 */
Multi<T> timeout(long timeout, TimeUnit unit, ScheduledExecutorService executor);

/**
 * Timeout with fallback
 * @param timeout timeout duration
 * @param unit time unit
 * @param executor scheduled executor for timeout
 * @param fallback fallback publisher
 * @return Multi with timeout and fallback
 * @throws NullPointerException if unit, executor, or fallback is null
 */
Multi<T> timeout(long timeout, TimeUnit unit, ScheduledExecutorService executor, Flow.Publisher<? extends T> fallback);

Utility Operators

Debug, compose, and transform streams.

/**
 * Log all reactive signals
 * @return Multi with logging
 */
Multi<T> log();

/**
 * Log with specific level
 * @param level logging level
 * @return Multi with logging at level
 * @throws NullPointerException if level is null
 */
Multi<T> log(Level level);

/**
 * Log with custom logger
 * @param level logging level
 * @param loggerName logger name
 * @return Multi with custom logging
 * @throws NullPointerException if level or loggerName is null
 */
Multi<T> log(Level level, String loggerName);

/**
 * Log with trace information
 * @param level logging level
 * @param trace include trace information
 * @return Multi with trace logging
 * @throws NullPointerException if level is null
 */
Multi<T> log(Level level, boolean trace);

/**
 * Apply custom composition
 * @param <U> result type
 * @param composer composition function
 * @return result of composition
 * @throws NullPointerException if composer is null
 */
<U> U compose(Function<? super Multi<T>, ? extends U> composer);

/**
 * Terminal transformation
 * @param <U> result type
 * @param converter conversion function
 * @return converted result
 * @throws NullPointerException if converter is null
 */
<U> U to(Function<? super Multi<T>, ? extends U> converter);

Collection & Reduction Operations

Collect stream items into data structures or reduce to single values.

Collector Operations

/**
 * Collect using custom collector
 * @param <A> accumulator type
 * @param <R> result type
 * @param collector collector to use
 * @return Single with collected result
 * @throws NullPointerException if collector is null
 */
<A, R> Single<R> collect(Collector<? super T, A, R> collector);

/**
 * Collect with supplier and accumulator
 * @param <R> result type
 * @param supplier supplier of collection container
 * @param accumulator accumulator function
 * @return Single with collected result
 * @throws NullPointerException if supplier or accumulator is null
 */
<R> Single<R> collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator);

/**
 * Collect to List
 * @return Single containing List of all items
 */
Single<List<T>> collectList();

/**
 * Use Java Stream collectors
 * @param <A> accumulator type
 * @param <R> result type
 * @param collector Java Stream collector
 * @return Single with collected result
 * @throws NullPointerException if collector is null
 */
<A, R> Single<R> collectStream(java.util.stream.Collector<? super T, A, R> collector);

Reduction Operations

/**
 * Reduce to single value
 * @param accumulator reduction function
 * @return Single with reduced result or empty
 * @throws NullPointerException if accumulator is null
 */
Single<T> reduce(BinaryOperator<T> accumulator);

/**
 * Reduce with initial value
 * @param <U> result type
 * @param identity initial value
 * @param accumulator reduction function
 * @return Single with reduced result
 * @throws NullPointerException if accumulator is null
 */
<U> Single<U> reduce(U identity, BiFunction<U, ? super T, U> accumulator);

Terminal Operations

Final operations that trigger stream execution and consumption.

Consumption Operations

/**
 * Consume all items (no backpressure)
 * @param consumer item consumer
 * @throws NullPointerException if consumer is null
 */
void forEach(Consumer<? super T> consumer);

/**
 * Sequential async consumption
 * @param mapper function producing CompletionStages
 * @return CompletionStage completing when all items processed
 * @throws NullPointerException if mapper is null
 */
CompletionStage<Void> forEachCompletionStage(Function<? super T, ? extends CompletionStage<Void>> mapper);

/**
 * Ignore all items, complete when stream terminates
 * @return Single that completes when stream terminates
 */
Single<Void> ignoreElements();

Usage Examples

Basic Stream Processing

import io.helidon.common.reactive.Multi;

// Create and process stream
Multi<String> processed = Multi.just("apple", "banana", "cherry")
    .filter(fruit -> fruit.length() > 5)
    .map(String::toUpperCase);

processed.forEach(System.out::println); // Prints: BANANA, CHERRY

Error Handling

Multi<Integer> withErrorHandling = Multi.just(1, 2, 0, 4)
    .map(n -> 10 / n)  // Will throw ArithmeticException for 0
    .onErrorResumeWith(error -> Multi.just(-1));  // Resume with -1

List<Integer> result = withErrorHandling.collectList().await();
// Result: [10, 5, -1]

Retry Logic

Multi<String> withRetry = Multi.error(new RuntimeException("Network error"))
    .retry(3)  // Retry 3 times
    .onErrorResumeWith(error -> Multi.just("Fallback value"));

String result = withRetry.first().await();
// Result: "Fallback value" (after 3 retries)

Async Processing

Multi<CompletionStage<String>> asyncTasks = Multi.range(1, 5)
    .map(i -> CompletableFuture.supplyAsync(() -> "Task " + i));

// Process async tasks sequentially
CompletionStage<Void> completion = Multi.create(asyncTasks)
    .flatMapCompletionStage(Function.identity())
    .forEachCompletionStage(result -> {
        System.out.println(result);
        return CompletableFuture.completedFuture(null);
    });

Install with Tessl CLI

npx tessl i tessl/maven-io-helidon-common--helidon-common-reactive

docs

index.md

io-integration.md

multi.md

single.md

supporting-types.md

tile.json