CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-helidon-common--helidon-common-reactive

Helidon Common Reactive Library - A reactive programming library providing Multi and Single abstractions

Pending
Overview
Eval results
Files

supporting-types.mddocs/

Supporting Types

Core interfaces and utility classes that support reactive operations including subscription management, awaitable operations, retry strategies, and completion handling.

Capabilities

Subscribable Interface

Base interface providing functional subscription methods and common reactive operations for both Multi and Single.

/**
 * Base interface providing functional subscription methods and common reactive operations
 * @param <T> item type
 */
public interface Subscribable<T> extends Flow.Publisher<T> {
}

Functional Subscription Methods

Convenient subscription methods that use functional interfaces instead of requiring full Subscriber implementations.

/**
 * Subscribe with onNext only
 * @param onNext consumer for items
 * @throws NullPointerException if onNext is null
 */
void subscribe(Consumer<? super T> onNext);

/**
 * Subscribe with onNext and onError
 * @param onNext consumer for items
 * @param onError consumer for errors
 * @throws NullPointerException if onNext or onError is null
 */
void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);

/**
 * Subscribe with onNext, onError, and onComplete
 * @param onNext consumer for items
 * @param onError consumer for errors
 * @param onComplete action for completion
 * @throws NullPointerException if any parameter is null
 */
void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Runnable onComplete);

/**
 * Subscribe with full control over subscription
 * @param onNext consumer for items
 * @param onError consumer for errors
 * @param onComplete action for completion
 * @param onSubscribe consumer for subscription
 * @throws NullPointerException if any parameter is null
 */
void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, 
               Runnable onComplete, Consumer<? super Flow.Subscription> onSubscribe);

Awaitable Interface

Provides convenient blocking operations for CompletionStage-based types, allowing reactive types to be used in synchronous contexts.

/**
 * Provides convenient blocking operations for CompletionStage-based types
 * @param <T> result type
 */
public interface Awaitable<T> {
}

Core Conversion Method

/**
 * Convert to CompletableFuture for compatibility
 * @return CompletableFuture representation
 */
CompletableFuture<T> toCompletableFuture();

Blocking Operations

/**
 * Block until completion (unchecked exceptions only)
 * Checked exceptions are wrapped in RuntimeException
 * @return the result value
 * @throws RuntimeException for any checked exceptions or timeout
 */
T await();

/**
 * Block with timeout (unchecked exceptions)
 * @param timeout timeout duration
 * @return the result value
 * @throws RuntimeException for timeout or checked exceptions
 * @throws NullPointerException if timeout is null
 */
T await(Duration timeout);

/**
 * Block with timeout using TimeUnit (deprecated)
 * @param timeout timeout value
 * @param unit time unit
 * @return the result value
 * @throws RuntimeException for timeout or checked exceptions
 * @throws NullPointerException if unit is null
 * @deprecated Use await(Duration) instead
 */
@Deprecated
T await(long timeout, TimeUnit unit);

CompletionAwaitable Class

CompletionStage wrapper that also implements Awaitable for convenient blocking operations and enhanced chaining.

/**
 * CompletionStage wrapper that also implements Awaitable for convenient blocking
 * @param <T> result type
 */
public final class CompletionAwaitable<T> implements CompletionStage<T>, Awaitable<T> {
}

Enhanced CompletionStage Methods

All standard CompletionStage methods return CompletionAwaitable for seamless chaining.

/**
 * All standard CompletionStage methods enhanced to return CompletionAwaitable
 */
<U> CompletionAwaitable<U> thenApply(Function<? super T, ? extends U> fn);
CompletionAwaitable<Void> thenAccept(Consumer<? super T> action);
CompletionAwaitable<Void> thenRun(Runnable action);

<U> CompletionAwaitable<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);

<U, V> CompletionAwaitable<V> thenCombine(CompletionStage<? extends U> other, 
                                          BiFunction<? super T, ? super U, ? extends V> fn);
<U> CompletionAwaitable<Void> thenAcceptBoth(CompletionStage<? extends U> other, 
                                            BiConsumer<? super T, ? super U> action);
CompletionAwaitable<Void> runAfterBoth(CompletionStage<?> other, Runnable action);

<U> CompletionAwaitable<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
CompletionAwaitable<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
CompletionAwaitable<Void> runAfterEither(CompletionStage<?> other, Runnable action);

CompletionAwaitable<T> exceptionally(Function<Throwable, ? extends T> fn);
CompletionAwaitable<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
<U> CompletionAwaitable<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

Additional Error Handling

/**
 * Handle exceptions with consumer (does not transform result)
 * @param exceptionConsumer consumer for exceptions
 * @return CompletionAwaitable with exception handling
 * @throws NullPointerException if exceptionConsumer is null
 */
CompletionAwaitable<T> exceptionallyAccept(Consumer<? super Throwable> exceptionConsumer);

OptionalCompletionStage Interface

CompletionStage variant optimized for Optional values with convenient empty handling methods.

/**
 * CompletionStage variant optimized for Optional values with convenient empty handling
 * @param <T> wrapped type
 */
public interface OptionalCompletionStage<T> extends CompletionStage<Optional<T>> {
}

Optional-Specific Methods

/**
 * Execute action when Optional is empty
 * @param emptyAction action to execute for empty Optional
 * @return same OptionalCompletionStage for chaining
 * @throws NullPointerException if emptyAction is null
 */
OptionalCompletionStage<T> onEmpty(Runnable emptyAction);

/**
 * Execute action when Optional has value
 * @param valueConsumer consumer for present values
 * @return same OptionalCompletionStage for chaining
 * @throws NullPointerException if valueConsumer is null
 */
OptionalCompletionStage<T> onValue(Consumer<? super T> valueConsumer);

Factory Method

/**
 * Create OptionalCompletionStage from existing CompletionStage
 * @param <T> value type
 * @param stage completion stage containing Optional
 * @return OptionalCompletionStage wrapper
 * @throws NullPointerException if stage is null
 */
static <T> OptionalCompletionStage<T> create(CompletionStage<Optional<T>> stage);

Collector Interface

Simple collector interface for accumulating stream items into custom data structures.

/**
 * Simple collector interface for accumulating stream items
 * @param <T> item type
 * @param <U> result type
 */
public interface Collector<T, U> {
}

Collector Methods

/**
 * Add item to collection
 * @param item item to collect
 */
void collect(T item);

/**
 * Get final collected result
 * @return collected result
 */
U value();

RetrySchema Interface

Defines retry delay strategies for polling operations and retry logic, providing flexible backoff algorithms.

/**
 * Defines retry delay strategies for polling operations and retry logic
 */
@FunctionalInterface
public interface RetrySchema {
}

Core Method

/**
 * Calculate next retry delay in milliseconds
 * @param retryCount current retry attempt (0-based)
 * @param lastDelay previous delay in milliseconds
 * @return next delay in milliseconds
 */
long nextDelay(int retryCount, long lastDelay);

Static Factory Methods

/**
 * Always return same delay
 * @param delay constant delay in milliseconds
 * @return RetrySchema with constant delay
 */
static RetrySchema constant(long delay);

/**
 * Linear backoff with maximum limit
 * @param firstDelay initial delay in milliseconds
 * @param increment delay increment per retry in milliseconds
 * @param maxDelay maximum delay in milliseconds
 * @return RetrySchema with linear backoff
 */
static RetrySchema linear(long firstDelay, long increment, long maxDelay);

/**
 * Exponential backoff with maximum limit
 * @param firstDelay initial delay in milliseconds
 * @param ratio multiplication ratio for exponential growth
 * @param maxDelay maximum delay in milliseconds
 * @return RetrySchema with exponential backoff
 */
static RetrySchema geometric(long firstDelay, double ratio, long maxDelay);

Usage Examples

Functional Subscription

import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Subscribable;

Multi<String> data = Multi.just("apple", "banana", "cherry");

// Simple subscription with just onNext
data.subscribe(System.out::println);

// Subscription with error handling
data.subscribe(
    item -> System.out.println("Received: " + item),
    error -> System.err.println("Error: " + error.getMessage())
);

// Full subscription control
data.subscribe(
    item -> System.out.println("Item: " + item),
    error -> System.err.println("Error: " + error),
    () -> System.out.println("Completed"),
    subscription -> {
        System.out.println("Subscribed, requesting all");
        subscription.request(Long.MAX_VALUE);
    }
);

Awaitable Operations

import io.helidon.common.reactive.Single;
import io.helidon.common.reactive.Awaitable;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;

Single<String> asyncValue = Single.create(
    CompletableFuture.supplyAsync(() -> {
        try { Thread.sleep(1000); } catch (InterruptedException e) {}
        return "Async result";
    })
);

// Block until completion
String result = asyncValue.await();
System.out.println(result); // "Async result"

// Block with timeout
try {
    String quickResult = asyncValue.await(Duration.ofMillis(500));
    System.out.println(quickResult);
} catch (RuntimeException e) {
    System.out.println("Timeout occurred");
}

// Convert to CompletableFuture for integration
CompletableFuture<String> future = asyncValue.toCompletableFuture();
future.thenAccept(System.out::println);

CompletionAwaitable Chaining

import io.helidon.common.reactive.Single;
import io.helidon.common.reactive.CompletionAwaitable;

Single<Integer> number = Single.just(42);

// Chain operations with enhanced CompletionAwaitable
CompletionAwaitable<String> result = number
    .thenApply(n -> n * 2)
    .thenApply(n -> "Result: " + n)
    .exceptionallyAccept(error -> System.err.println("Error: " + error));

String finalResult = result.await();
System.out.println(finalResult); // "Result: 84"

OptionalCompletionStage Usage

import io.helidon.common.reactive.Single;
import io.helidon.common.reactive.OptionalCompletionStage;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

CompletableFuture<Optional<String>> optionalFuture = 
    CompletableFuture.completedFuture(Optional.of("Present value"));

OptionalCompletionStage<String> optionalStage = 
    OptionalCompletionStage.create(optionalFuture);

optionalStage
    .onValue(value -> System.out.println("Got value: " + value))
    .onEmpty(() -> System.out.println("No value present"));

// With empty Optional
CompletableFuture<Optional<String>> emptyFuture = 
    CompletableFuture.completedFuture(Optional.empty());

OptionalCompletionStage.create(emptyFuture)
    .onValue(value -> System.out.println("Got: " + value))
    .onEmpty(() -> System.out.println("Empty result")); // This will execute

Custom Collectors

import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Collector;
import java.util.ArrayList;
import java.util.List;

// Custom collector that only collects even numbers
class EvenNumberCollector implements Collector<Integer, List<Integer>> {
    private final List<Integer> evenNumbers = new ArrayList<>();
    
    @Override
    public void collect(Integer item) {
        if (item % 2 == 0) {
            evenNumbers.add(item);
        }
    }
    
    @Override
    public List<Integer> value() {
        return new ArrayList<>(evenNumbers);
    }
}

Multi<Integer> numbers = Multi.range(1, 10);
List<Integer> evenNumbers = numbers.collect(new EvenNumberCollector()).await();
System.out.println(evenNumbers); // [2, 4, 6, 8, 10]

RetrySchema Examples

import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.RetrySchema;
import java.util.concurrent.atomic.AtomicInteger;

// Constant delay retry
RetrySchema constantRetry = RetrySchema.constant(1000); // 1 second delay

// Linear backoff retry
RetrySchema linearRetry = RetrySchema.linear(100, 200, 2000);
// Delays: 100ms, 300ms, 500ms, 700ms, 900ms, 1100ms, 1300ms, 1500ms, 1700ms, 1900ms, 2000ms (max)

// Exponential backoff retry
RetrySchema exponentialRetry = RetrySchema.geometric(100, 2.0, 5000);
// Delays: 100ms, 200ms, 400ms, 800ms, 1600ms, 3200ms, 5000ms (max)

// Using retry schema with Multi
AtomicInteger attempts = new AtomicInteger(0);

Multi<String> flakyOperation = Multi.defer(() -> {
    int attempt = attempts.incrementAndGet();
    if (attempt < 3) {
        return Multi.error(new RuntimeException("Attempt " + attempt + " failed"));
    }
    return Multi.just("Success on attempt " + attempt);
});

// Retry with exponential backoff
String result = flakyOperation
    .retryWhen((error, retryCount) -> {
        long delay = exponentialRetry.nextDelay(retryCount.intValue(), 0);
        System.out.println("Retry " + retryCount + " after " + delay + "ms");
        return Multi.timer(delay, TimeUnit.MILLISECONDS, scheduler);
    })
    .first()
    .await();

System.out.println(result); // "Success on attempt 3"

Advanced Subscription Management

import io.helidon.common.reactive.Multi;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;

Multi<Integer> stream = Multi.range(1, 1000000); // Large stream

AtomicReference<Flow.Subscription> subscriptionRef = new AtomicReference<>();

stream.subscribe(
    item -> {
        System.out.println("Processing: " + item);
        
        // Simulate slow processing
        try { Thread.sleep(100); } catch (InterruptedException e) {}
        
        // Request next item (backpressure control)
        Flow.Subscription subscription = subscriptionRef.get();
        if (subscription != null) {
            subscription.request(1);
        }
    },
    error -> System.err.println("Stream error: " + error),
    () -> System.out.println("Stream completed"),
    subscription -> {
        subscriptionRef.set(subscription);
        // Start with requesting just one item
        subscription.request(1);
    }
);

Error Recovery Patterns

import io.helidon.common.reactive.Single;
import io.helidon.common.reactive.CompletionAwaitable;

Single<String> unreliableService = Single.error(new RuntimeException("Service down"));

// Chain multiple fallback strategies
CompletionAwaitable<String> robustCall = unreliableService
    .exceptionally(error -> {
        System.out.println("Primary service failed: " + error.getMessage());
        throw new RuntimeException("Fallback to secondary service");
    })
    .exceptionally(error -> {
        System.out.println("Secondary service failed: " + error.getMessage());
        return "Cached response";
    })
    .exceptionallyAccept(error -> {
        // Log any remaining errors without changing the result
        System.err.println("Final error handler: " + error.getMessage());
    });

String result = robustCall.await();
System.out.println("Final result: " + result); // "Cached response"

Install with Tessl CLI

npx tessl i tessl/maven-io-helidon-common--helidon-common-reactive

docs

index.md

io-integration.md

multi.md

single.md

supporting-types.md

tile.json