Helidon Common Reactive Library - A reactive programming library providing Multi and Single abstractions
—
Core interfaces and utility classes that support reactive operations including subscription management, awaitable operations, retry strategies, and completion handling.
Base interface providing functional subscription methods and common reactive operations for both Multi and Single.
/**
* Base interface providing functional subscription methods and common reactive operations
* @param <T> item type
*/
public interface Subscribable<T> extends Flow.Publisher<T> {
}Convenient subscription methods that use functional interfaces instead of requiring full Subscriber implementations.
/**
* Subscribe with onNext only
* @param onNext consumer for items
* @throws NullPointerException if onNext is null
*/
void subscribe(Consumer<? super T> onNext);
/**
* Subscribe with onNext and onError
* @param onNext consumer for items
* @param onError consumer for errors
* @throws NullPointerException if onNext or onError is null
*/
void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);
/**
* Subscribe with onNext, onError, and onComplete
* @param onNext consumer for items
* @param onError consumer for errors
* @param onComplete action for completion
* @throws NullPointerException if any parameter is null
*/
void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Runnable onComplete);
/**
* Subscribe with full control over subscription
* @param onNext consumer for items
* @param onError consumer for errors
* @param onComplete action for completion
* @param onSubscribe consumer for subscription
* @throws NullPointerException if any parameter is null
*/
void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Runnable onComplete, Consumer<? super Flow.Subscription> onSubscribe);Provides convenient blocking operations for CompletionStage-based types, allowing reactive types to be used in synchronous contexts.
/**
* Provides convenient blocking operations for CompletionStage-based types
* @param <T> result type
*/
public interface Awaitable<T> {
}/**
* Convert to CompletableFuture for compatibility
* @return CompletableFuture representation
*/
CompletableFuture<T> toCompletableFuture();/**
* Block until completion (unchecked exceptions only)
* Checked exceptions are wrapped in RuntimeException
* @return the result value
* @throws RuntimeException for any checked exceptions or timeout
*/
T await();
/**
* Block with timeout (unchecked exceptions)
* @param timeout timeout duration
* @return the result value
* @throws RuntimeException for timeout or checked exceptions
* @throws NullPointerException if timeout is null
*/
T await(Duration timeout);
/**
* Block with timeout using TimeUnit (deprecated)
* @param timeout timeout value
* @param unit time unit
* @return the result value
* @throws RuntimeException for timeout or checked exceptions
* @throws NullPointerException if unit is null
* @deprecated Use await(Duration) instead
*/
@Deprecated
T await(long timeout, TimeUnit unit);CompletionStage wrapper that also implements Awaitable for convenient blocking operations and enhanced chaining.
/**
* CompletionStage wrapper that also implements Awaitable for convenient blocking
* @param <T> result type
*/
public final class CompletionAwaitable<T> implements CompletionStage<T>, Awaitable<T> {
}All standard CompletionStage methods return CompletionAwaitable for seamless chaining.
/**
* All standard CompletionStage methods enhanced to return CompletionAwaitable
*/
<U> CompletionAwaitable<U> thenApply(Function<? super T, ? extends U> fn);
CompletionAwaitable<Void> thenAccept(Consumer<? super T> action);
CompletionAwaitable<Void> thenRun(Runnable action);
<U> CompletionAwaitable<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
<U, V> CompletionAwaitable<V> thenCombine(CompletionStage<? extends U> other,
BiFunction<? super T, ? super U, ? extends V> fn);
<U> CompletionAwaitable<Void> thenAcceptBoth(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
CompletionAwaitable<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
<U> CompletionAwaitable<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
CompletionAwaitable<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
CompletionAwaitable<Void> runAfterEither(CompletionStage<?> other, Runnable action);
CompletionAwaitable<T> exceptionally(Function<Throwable, ? extends T> fn);
CompletionAwaitable<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
<U> CompletionAwaitable<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);/**
* Handle exceptions with consumer (does not transform result)
* @param exceptionConsumer consumer for exceptions
* @return CompletionAwaitable with exception handling
* @throws NullPointerException if exceptionConsumer is null
*/
CompletionAwaitable<T> exceptionallyAccept(Consumer<? super Throwable> exceptionConsumer);CompletionStage variant optimized for Optional values with convenient empty handling methods.
/**
* CompletionStage variant optimized for Optional values with convenient empty handling
* @param <T> wrapped type
*/
public interface OptionalCompletionStage<T> extends CompletionStage<Optional<T>> {
}/**
* Execute action when Optional is empty
* @param emptyAction action to execute for empty Optional
* @return same OptionalCompletionStage for chaining
* @throws NullPointerException if emptyAction is null
*/
OptionalCompletionStage<T> onEmpty(Runnable emptyAction);
/**
* Execute action when Optional has value
* @param valueConsumer consumer for present values
* @return same OptionalCompletionStage for chaining
* @throws NullPointerException if valueConsumer is null
*/
OptionalCompletionStage<T> onValue(Consumer<? super T> valueConsumer);/**
* Create OptionalCompletionStage from existing CompletionStage
* @param <T> value type
* @param stage completion stage containing Optional
* @return OptionalCompletionStage wrapper
* @throws NullPointerException if stage is null
*/
static <T> OptionalCompletionStage<T> create(CompletionStage<Optional<T>> stage);Simple collector interface for accumulating stream items into custom data structures.
/**
* Simple collector interface for accumulating stream items
* @param <T> item type
* @param <U> result type
*/
public interface Collector<T, U> {
}/**
* Add item to collection
* @param item item to collect
*/
void collect(T item);
/**
* Get final collected result
* @return collected result
*/
U value();Defines retry delay strategies for polling operations and retry logic, providing flexible backoff algorithms.
/**
* Defines retry delay strategies for polling operations and retry logic
*/
@FunctionalInterface
public interface RetrySchema {
}/**
* Calculate next retry delay in milliseconds
* @param retryCount current retry attempt (0-based)
* @param lastDelay previous delay in milliseconds
* @return next delay in milliseconds
*/
long nextDelay(int retryCount, long lastDelay);/**
* Always return same delay
* @param delay constant delay in milliseconds
* @return RetrySchema with constant delay
*/
static RetrySchema constant(long delay);
/**
* Linear backoff with maximum limit
* @param firstDelay initial delay in milliseconds
* @param increment delay increment per retry in milliseconds
* @param maxDelay maximum delay in milliseconds
* @return RetrySchema with linear backoff
*/
static RetrySchema linear(long firstDelay, long increment, long maxDelay);
/**
* Exponential backoff with maximum limit
* @param firstDelay initial delay in milliseconds
* @param ratio multiplication ratio for exponential growth
* @param maxDelay maximum delay in milliseconds
* @return RetrySchema with exponential backoff
*/
static RetrySchema geometric(long firstDelay, double ratio, long maxDelay);import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Subscribable;
Multi<String> data = Multi.just("apple", "banana", "cherry");
// Simple subscription with just onNext
data.subscribe(System.out::println);
// Subscription with error handling
data.subscribe(
item -> System.out.println("Received: " + item),
error -> System.err.println("Error: " + error.getMessage())
);
// Full subscription control
data.subscribe(
item -> System.out.println("Item: " + item),
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed"),
subscription -> {
System.out.println("Subscribed, requesting all");
subscription.request(Long.MAX_VALUE);
}
);import io.helidon.common.reactive.Single;
import io.helidon.common.reactive.Awaitable;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
Single<String> asyncValue = Single.create(
CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) {}
return "Async result";
})
);
// Block until completion
String result = asyncValue.await();
System.out.println(result); // "Async result"
// Block with timeout
try {
String quickResult = asyncValue.await(Duration.ofMillis(500));
System.out.println(quickResult);
} catch (RuntimeException e) {
System.out.println("Timeout occurred");
}
// Convert to CompletableFuture for integration
CompletableFuture<String> future = asyncValue.toCompletableFuture();
future.thenAccept(System.out::println);import io.helidon.common.reactive.Single;
import io.helidon.common.reactive.CompletionAwaitable;
Single<Integer> number = Single.just(42);
// Chain operations with enhanced CompletionAwaitable
CompletionAwaitable<String> result = number
.thenApply(n -> n * 2)
.thenApply(n -> "Result: " + n)
.exceptionallyAccept(error -> System.err.println("Error: " + error));
String finalResult = result.await();
System.out.println(finalResult); // "Result: 84"import io.helidon.common.reactive.Single;
import io.helidon.common.reactive.OptionalCompletionStage;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
CompletableFuture<Optional<String>> optionalFuture =
CompletableFuture.completedFuture(Optional.of("Present value"));
OptionalCompletionStage<String> optionalStage =
OptionalCompletionStage.create(optionalFuture);
optionalStage
.onValue(value -> System.out.println("Got value: " + value))
.onEmpty(() -> System.out.println("No value present"));
// With empty Optional
CompletableFuture<Optional<String>> emptyFuture =
CompletableFuture.completedFuture(Optional.empty());
OptionalCompletionStage.create(emptyFuture)
.onValue(value -> System.out.println("Got: " + value))
.onEmpty(() -> System.out.println("Empty result")); // This will executeimport io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Collector;
import java.util.ArrayList;
import java.util.List;
// Custom collector that only collects even numbers
class EvenNumberCollector implements Collector<Integer, List<Integer>> {
private final List<Integer> evenNumbers = new ArrayList<>();
@Override
public void collect(Integer item) {
if (item % 2 == 0) {
evenNumbers.add(item);
}
}
@Override
public List<Integer> value() {
return new ArrayList<>(evenNumbers);
}
}
Multi<Integer> numbers = Multi.range(1, 10);
List<Integer> evenNumbers = numbers.collect(new EvenNumberCollector()).await();
System.out.println(evenNumbers); // [2, 4, 6, 8, 10]import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.RetrySchema;
import java.util.concurrent.atomic.AtomicInteger;
// Constant delay retry
RetrySchema constantRetry = RetrySchema.constant(1000); // 1 second delay
// Linear backoff retry
RetrySchema linearRetry = RetrySchema.linear(100, 200, 2000);
// Delays: 100ms, 300ms, 500ms, 700ms, 900ms, 1100ms, 1300ms, 1500ms, 1700ms, 1900ms, 2000ms (max)
// Exponential backoff retry
RetrySchema exponentialRetry = RetrySchema.geometric(100, 2.0, 5000);
// Delays: 100ms, 200ms, 400ms, 800ms, 1600ms, 3200ms, 5000ms (max)
// Using retry schema with Multi
AtomicInteger attempts = new AtomicInteger(0);
Multi<String> flakyOperation = Multi.defer(() -> {
int attempt = attempts.incrementAndGet();
if (attempt < 3) {
return Multi.error(new RuntimeException("Attempt " + attempt + " failed"));
}
return Multi.just("Success on attempt " + attempt);
});
// Retry with exponential backoff
String result = flakyOperation
.retryWhen((error, retryCount) -> {
long delay = exponentialRetry.nextDelay(retryCount.intValue(), 0);
System.out.println("Retry " + retryCount + " after " + delay + "ms");
return Multi.timer(delay, TimeUnit.MILLISECONDS, scheduler);
})
.first()
.await();
System.out.println(result); // "Success on attempt 3"import io.helidon.common.reactive.Multi;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
Multi<Integer> stream = Multi.range(1, 1000000); // Large stream
AtomicReference<Flow.Subscription> subscriptionRef = new AtomicReference<>();
stream.subscribe(
item -> {
System.out.println("Processing: " + item);
// Simulate slow processing
try { Thread.sleep(100); } catch (InterruptedException e) {}
// Request next item (backpressure control)
Flow.Subscription subscription = subscriptionRef.get();
if (subscription != null) {
subscription.request(1);
}
},
error -> System.err.println("Stream error: " + error),
() -> System.out.println("Stream completed"),
subscription -> {
subscriptionRef.set(subscription);
// Start with requesting just one item
subscription.request(1);
}
);import io.helidon.common.reactive.Single;
import io.helidon.common.reactive.CompletionAwaitable;
Single<String> unreliableService = Single.error(new RuntimeException("Service down"));
// Chain multiple fallback strategies
CompletionAwaitable<String> robustCall = unreliableService
.exceptionally(error -> {
System.out.println("Primary service failed: " + error.getMessage());
throw new RuntimeException("Fallback to secondary service");
})
.exceptionally(error -> {
System.out.println("Secondary service failed: " + error.getMessage());
return "Cached response";
})
.exceptionallyAccept(error -> {
// Log any remaining errors without changing the result
System.err.println("Final error handler: " + error.getMessage());
});
String result = robustCall.await();
System.out.println("Final result: " + result); // "Cached response"Install with Tessl CLI
npx tessl i tessl/maven-io-helidon-common--helidon-common-reactive