Helidon Common Reactive Library - A reactive programming library providing Multi and Single abstractions
npx @tessl/cli install tessl/maven-io-helidon-common--helidon-common-reactive@4.2.0Helidon Common Reactive is a comprehensive reactive programming library that implements Multi (0-N items) and Single (0-1 item) reactive streams following the Java Flow API specification. The library offers a rich set of operators for stream transformation, filtering, combining, and error handling, enabling developers to build non-blocking, backpressure-aware applications.
pom.xml:<dependency>
<groupId>io.helidon.common</groupId>
<artifactId>helidon-common-reactive</artifactId>
<version>4.2.2</version>
</dependency>For Gradle:
implementation 'io.helidon.common:helidon-common-reactive:4.2.2'import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
import io.helidon.common.reactive.IoMulti;Import supporting types as needed:
import io.helidon.common.reactive.Subscribable;
import io.helidon.common.reactive.Awaitable;
import io.helidon.common.reactive.CompletionAwaitable;
import io.helidon.common.reactive.RetrySchema;import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
import java.util.concurrent.CompletableFuture;
import java.util.List;
// Create and transform a Multi
Multi<String> names = Multi.just("Alice", "Bob", "Charlie")
.filter(name -> name.length() > 3)
.map(String::toUpperCase);
// Collect to List
List<String> result = names.collectList().await();
// Result: ["ALICE", "CHARLIE"]
// Create a Single from CompletionStage
CompletableFuture<String> future = CompletableFuture.completedFuture("Hello");
Single<String> greeting = Single.create(future)
.map(s -> s + " World!");
String message = greeting.await();
// Result: "Hello World!"
// Chain operations
Multi<Integer> numbers = Multi.range(1, 10)
.filter(n -> n % 2 == 0)
.map(n -> n * n)
.limit(3);
numbers.forEach(System.out::println); // Prints: 4, 16, 36Helidon Common Reactive is built around several key components:
java.util.concurrent.Flow specificationMulti represents reactive streams that emit zero or more items. Provides comprehensive operators for transformation, filtering, error handling, and collection operations.
public interface Multi<T> extends Subscribable<T> {
// Factory methods
static <T> Multi<T> empty();
static <T> Multi<T> just(T... items);
static <T> Multi<T> create(Flow.Publisher<T> publisher);
static <T> Multi<T> range(int start, int count);
// Transformation operators
<U> Multi<U> map(Function<? super T, ? extends U> mapper);
<U> Multi<U> flatMap(Function<? super T, ? extends Flow.Publisher<? extends U>> mapper);
Multi<T> filter(Predicate<? super T> predicate);
Multi<T> limit(long maxSize);
// Terminal operations
Single<List<T>> collectList();
void forEach(Consumer<? super T> consumer);
}Single represents reactive streams that emit at most one item. Implements CompletionStage and provides blocking await operations for integration with existing code.
public interface Single<T> extends Subscribable<T>, CompletionStage<T>, Awaitable<T> {
// Factory methods
static <T> Single<T> empty();
static <T> Single<T> just(T item);
static <T> Single<T> create(CompletionStage<T> completionStage);
static <T> Single<T> error(Throwable error);
// Transformation operators
<U> Single<U> map(Function<? super T, ? extends U> mapper);
<U> Multi<U> flatMap(Function<? super T, ? extends Flow.Publisher<? extends U>> mapper);
// Blocking operations
T await();
T await(Duration timeout);
}Reactive I/O utilities for integrating with Java I/O streams, channels, and file operations. Enables reactive processing of I/O data with proper backpressure handling.
public final class IoMulti {
// InputStream integration
static Multi<ByteBuffer> multiFromStream(InputStream inputStream);
static MultiFromInputStreamBuilder multiFromStreamBuilder(InputStream inputStream);
// OutputStream integration
static OutputStreamMulti outputStreamMulti();
static OutputStreamMultiBuilder outputStreamMultiBuilder();
// ByteChannel integration
static Multi<ByteBuffer> multiFromByteChannel(ReadableByteChannel channel);
static Function<Multi<ByteBuffer>, CompletionStage<Void>> multiToByteChannel(WritableByteChannel channel);
}Core interfaces and utility classes that support reactive operations including subscription management, awaitable operations, and retry strategies.
public interface Subscribable<T> extends Flow.Publisher<T> {
void subscribe(Consumer<? super T> onNext);
void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);
void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Runnable onComplete);
}
public interface Awaitable<T> {
CompletableFuture<T> toCompletableFuture();
T await();
T await(Duration timeout);
}
public interface RetrySchema {
long nextDelay(int retryCount, long lastDelay);
static RetrySchema constant(long delay);
static RetrySchema linear(long firstDelay, long increment, long maxDelay);
static RetrySchema geometric(long firstDelay, double ratio, long maxDelay);
}The library provides comprehensive error handling through several patterns:
onErrorResume() and onErrorResumeWith() allow recovering from errorsretry() methods with count, predicate, or publisher-based controltimeout() methods with optional fallback publishersonCompleteResume() methods for appending after completionMulti.create(Stream<T>) converts Java StreamsMulti.create(Iterable<T>) and flatMapIterable() methodsflatMapOptional() and toOptionalSingle() methodscollectStream(java.util.stream.Collector) uses Stream collectorsobserveOn() methods allow switching execution contexts