0
# Supporting Types
1
2
Core interfaces and utility classes that support reactive operations including subscription management, awaitable operations, retry strategies, and completion handling.
3
4
## Capabilities
5
6
### Subscribable Interface
7
8
Base interface providing functional subscription methods and common reactive operations for both Multi and Single.
9
10
```java { .api }
11
/**
12
* Base interface providing functional subscription methods and common reactive operations
13
* @param <T> item type
14
*/
15
public interface Subscribable<T> extends Flow.Publisher<T> {
16
}
17
```
18
19
#### Functional Subscription Methods
20
21
Convenient subscription methods that use functional interfaces instead of requiring full Subscriber implementations.
22
23
```java { .api }
24
/**
25
* Subscribe with onNext only
26
* @param onNext consumer for items
27
* @throws NullPointerException if onNext is null
28
*/
29
void subscribe(Consumer<? super T> onNext);
30
31
/**
32
* Subscribe with onNext and onError
33
* @param onNext consumer for items
34
* @param onError consumer for errors
35
* @throws NullPointerException if onNext or onError is null
36
*/
37
void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);
38
39
/**
40
* Subscribe with onNext, onError, and onComplete
41
* @param onNext consumer for items
42
* @param onError consumer for errors
43
* @param onComplete action for completion
44
* @throws NullPointerException if any parameter is null
45
*/
46
void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Runnable onComplete);
47
48
/**
49
* Subscribe with full control over subscription
50
* @param onNext consumer for items
51
* @param onError consumer for errors
52
* @param onComplete action for completion
53
* @param onSubscribe consumer for subscription
54
* @throws NullPointerException if any parameter is null
55
*/
56
void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
57
Runnable onComplete, Consumer<? super Flow.Subscription> onSubscribe);
58
```
59
60
### Awaitable Interface
61
62
Provides convenient blocking operations for CompletionStage-based types, allowing reactive types to be used in synchronous contexts.
63
64
```java { .api }
65
/**
66
* Provides convenient blocking operations for CompletionStage-based types
67
* @param <T> result type
68
*/
69
public interface Awaitable<T> {
70
}
71
```
72
73
#### Core Conversion Method
74
75
```java { .api }
76
/**
77
* Convert to CompletableFuture for compatibility
78
* @return CompletableFuture representation
79
*/
80
CompletableFuture<T> toCompletableFuture();
81
```
82
83
#### Blocking Operations
84
85
```java { .api }
86
/**
87
* Block until completion (unchecked exceptions only)
88
* Checked exceptions are wrapped in RuntimeException
89
* @return the result value
90
* @throws RuntimeException for any checked exceptions or timeout
91
*/
92
T await();
93
94
/**
95
* Block with timeout (unchecked exceptions)
96
* @param timeout timeout duration
97
* @return the result value
98
* @throws RuntimeException for timeout or checked exceptions
99
* @throws NullPointerException if timeout is null
100
*/
101
T await(Duration timeout);
102
103
/**
104
* Block with timeout using TimeUnit (deprecated)
105
* @param timeout timeout value
106
* @param unit time unit
107
* @return the result value
108
* @throws RuntimeException for timeout or checked exceptions
109
* @throws NullPointerException if unit is null
110
* @deprecated Use await(Duration) instead
111
*/
112
@Deprecated
113
T await(long timeout, TimeUnit unit);
114
```
115
116
### CompletionAwaitable Class
117
118
CompletionStage wrapper that also implements Awaitable for convenient blocking operations and enhanced chaining.
119
120
```java { .api }
121
/**
122
* CompletionStage wrapper that also implements Awaitable for convenient blocking
123
* @param <T> result type
124
*/
125
public final class CompletionAwaitable<T> implements CompletionStage<T>, Awaitable<T> {
126
}
127
```
128
129
#### Enhanced CompletionStage Methods
130
131
All standard CompletionStage methods return CompletionAwaitable for seamless chaining.
132
133
```java { .api }
134
/**
135
* All standard CompletionStage methods enhanced to return CompletionAwaitable
136
*/
137
<U> CompletionAwaitable<U> thenApply(Function<? super T, ? extends U> fn);
138
CompletionAwaitable<Void> thenAccept(Consumer<? super T> action);
139
CompletionAwaitable<Void> thenRun(Runnable action);
140
141
<U> CompletionAwaitable<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
142
143
<U, V> CompletionAwaitable<V> thenCombine(CompletionStage<? extends U> other,
144
BiFunction<? super T, ? super U, ? extends V> fn);
145
<U> CompletionAwaitable<Void> thenAcceptBoth(CompletionStage<? extends U> other,
146
BiConsumer<? super T, ? super U> action);
147
CompletionAwaitable<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
148
149
<U> CompletionAwaitable<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
150
CompletionAwaitable<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
151
CompletionAwaitable<Void> runAfterEither(CompletionStage<?> other, Runnable action);
152
153
CompletionAwaitable<T> exceptionally(Function<Throwable, ? extends T> fn);
154
CompletionAwaitable<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
155
<U> CompletionAwaitable<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
156
```
157
158
#### Additional Error Handling
159
160
```java { .api }
161
/**
162
* Handle exceptions with consumer (does not transform result)
163
* @param exceptionConsumer consumer for exceptions
164
* @return CompletionAwaitable with exception handling
165
* @throws NullPointerException if exceptionConsumer is null
166
*/
167
CompletionAwaitable<T> exceptionallyAccept(Consumer<? super Throwable> exceptionConsumer);
168
```
169
170
### OptionalCompletionStage Interface
171
172
CompletionStage variant optimized for Optional values with convenient empty handling methods.
173
174
```java { .api }
175
/**
176
* CompletionStage variant optimized for Optional values with convenient empty handling
177
* @param <T> wrapped type
178
*/
179
public interface OptionalCompletionStage<T> extends CompletionStage<Optional<T>> {
180
}
181
```
182
183
#### Optional-Specific Methods
184
185
```java { .api }
186
/**
187
* Execute action when Optional is empty
188
* @param emptyAction action to execute for empty Optional
189
* @return same OptionalCompletionStage for chaining
190
* @throws NullPointerException if emptyAction is null
191
*/
192
OptionalCompletionStage<T> onEmpty(Runnable emptyAction);
193
194
/**
195
* Execute action when Optional has value
196
* @param valueConsumer consumer for present values
197
* @return same OptionalCompletionStage for chaining
198
* @throws NullPointerException if valueConsumer is null
199
*/
200
OptionalCompletionStage<T> onValue(Consumer<? super T> valueConsumer);
201
```
202
203
#### Factory Method
204
205
```java { .api }
206
/**
207
* Create OptionalCompletionStage from existing CompletionStage
208
* @param <T> value type
209
* @param stage completion stage containing Optional
210
* @return OptionalCompletionStage wrapper
211
* @throws NullPointerException if stage is null
212
*/
213
static <T> OptionalCompletionStage<T> create(CompletionStage<Optional<T>> stage);
214
```
215
216
### Collector Interface
217
218
Simple collector interface for accumulating stream items into custom data structures.
219
220
```java { .api }
221
/**
222
* Simple collector interface for accumulating stream items
223
* @param <T> item type
224
* @param <U> result type
225
*/
226
public interface Collector<T, U> {
227
}
228
```
229
230
#### Collector Methods
231
232
```java { .api }
233
/**
234
* Add item to collection
235
* @param item item to collect
236
*/
237
void collect(T item);
238
239
/**
240
* Get final collected result
241
* @return collected result
242
*/
243
U value();
244
```
245
246
### RetrySchema Interface
247
248
Defines retry delay strategies for polling operations and retry logic, providing flexible backoff algorithms.
249
250
```java { .api }
251
/**
252
* Defines retry delay strategies for polling operations and retry logic
253
*/
254
@FunctionalInterface
255
public interface RetrySchema {
256
}
257
```
258
259
#### Core Method
260
261
```java { .api }
262
/**
263
* Calculate next retry delay in milliseconds
264
* @param retryCount current retry attempt (0-based)
265
* @param lastDelay previous delay in milliseconds
266
* @return next delay in milliseconds
267
*/
268
long nextDelay(int retryCount, long lastDelay);
269
```
270
271
#### Static Factory Methods
272
273
```java { .api }
274
/**
275
* Always return same delay
276
* @param delay constant delay in milliseconds
277
* @return RetrySchema with constant delay
278
*/
279
static RetrySchema constant(long delay);
280
281
/**
282
* Linear backoff with maximum limit
283
* @param firstDelay initial delay in milliseconds
284
* @param increment delay increment per retry in milliseconds
285
* @param maxDelay maximum delay in milliseconds
286
* @return RetrySchema with linear backoff
287
*/
288
static RetrySchema linear(long firstDelay, long increment, long maxDelay);
289
290
/**
291
* Exponential backoff with maximum limit
292
* @param firstDelay initial delay in milliseconds
293
* @param ratio multiplication ratio for exponential growth
294
* @param maxDelay maximum delay in milliseconds
295
* @return RetrySchema with exponential backoff
296
*/
297
static RetrySchema geometric(long firstDelay, double ratio, long maxDelay);
298
```
299
300
## Usage Examples
301
302
### Functional Subscription
303
304
```java
305
import io.helidon.common.reactive.Multi;
306
import io.helidon.common.reactive.Subscribable;
307
308
Multi<String> data = Multi.just("apple", "banana", "cherry");
309
310
// Simple subscription with just onNext
311
data.subscribe(System.out::println);
312
313
// Subscription with error handling
314
data.subscribe(
315
item -> System.out.println("Received: " + item),
316
error -> System.err.println("Error: " + error.getMessage())
317
);
318
319
// Full subscription control
320
data.subscribe(
321
item -> System.out.println("Item: " + item),
322
error -> System.err.println("Error: " + error),
323
() -> System.out.println("Completed"),
324
subscription -> {
325
System.out.println("Subscribed, requesting all");
326
subscription.request(Long.MAX_VALUE);
327
}
328
);
329
```
330
331
### Awaitable Operations
332
333
```java
334
import io.helidon.common.reactive.Single;
335
import io.helidon.common.reactive.Awaitable;
336
import java.time.Duration;
337
import java.util.concurrent.CompletableFuture;
338
339
Single<String> asyncValue = Single.create(
340
CompletableFuture.supplyAsync(() -> {
341
try { Thread.sleep(1000); } catch (InterruptedException e) {}
342
return "Async result";
343
})
344
);
345
346
// Block until completion
347
String result = asyncValue.await();
348
System.out.println(result); // "Async result"
349
350
// Block with timeout
351
try {
352
String quickResult = asyncValue.await(Duration.ofMillis(500));
353
System.out.println(quickResult);
354
} catch (RuntimeException e) {
355
System.out.println("Timeout occurred");
356
}
357
358
// Convert to CompletableFuture for integration
359
CompletableFuture<String> future = asyncValue.toCompletableFuture();
360
future.thenAccept(System.out::println);
361
```
362
363
### CompletionAwaitable Chaining
364
365
```java
366
import io.helidon.common.reactive.Single;
367
import io.helidon.common.reactive.CompletionAwaitable;
368
369
Single<Integer> number = Single.just(42);
370
371
// Chain operations with enhanced CompletionAwaitable
372
CompletionAwaitable<String> result = number
373
.thenApply(n -> n * 2)
374
.thenApply(n -> "Result: " + n)
375
.exceptionallyAccept(error -> System.err.println("Error: " + error));
376
377
String finalResult = result.await();
378
System.out.println(finalResult); // "Result: 84"
379
```
380
381
### OptionalCompletionStage Usage
382
383
```java
384
import io.helidon.common.reactive.Single;
385
import io.helidon.common.reactive.OptionalCompletionStage;
386
import java.util.Optional;
387
import java.util.concurrent.CompletableFuture;
388
389
CompletableFuture<Optional<String>> optionalFuture =
390
CompletableFuture.completedFuture(Optional.of("Present value"));
391
392
OptionalCompletionStage<String> optionalStage =
393
OptionalCompletionStage.create(optionalFuture);
394
395
optionalStage
396
.onValue(value -> System.out.println("Got value: " + value))
397
.onEmpty(() -> System.out.println("No value present"));
398
399
// With empty Optional
400
CompletableFuture<Optional<String>> emptyFuture =
401
CompletableFuture.completedFuture(Optional.empty());
402
403
OptionalCompletionStage.create(emptyFuture)
404
.onValue(value -> System.out.println("Got: " + value))
405
.onEmpty(() -> System.out.println("Empty result")); // This will execute
406
```
407
408
### Custom Collectors
409
410
```java
411
import io.helidon.common.reactive.Multi;
412
import io.helidon.common.reactive.Collector;
413
import java.util.ArrayList;
414
import java.util.List;
415
416
// Custom collector that only collects even numbers
417
class EvenNumberCollector implements Collector<Integer, List<Integer>> {
418
private final List<Integer> evenNumbers = new ArrayList<>();
419
420
@Override
421
public void collect(Integer item) {
422
if (item % 2 == 0) {
423
evenNumbers.add(item);
424
}
425
}
426
427
@Override
428
public List<Integer> value() {
429
return new ArrayList<>(evenNumbers);
430
}
431
}
432
433
Multi<Integer> numbers = Multi.range(1, 10);
434
List<Integer> evenNumbers = numbers.collect(new EvenNumberCollector()).await();
435
System.out.println(evenNumbers); // [2, 4, 6, 8, 10]
436
```
437
438
### RetrySchema Examples
439
440
```java
441
import io.helidon.common.reactive.Multi;
442
import io.helidon.common.reactive.RetrySchema;
443
import java.util.concurrent.atomic.AtomicInteger;
444
445
// Constant delay retry
446
RetrySchema constantRetry = RetrySchema.constant(1000); // 1 second delay
447
448
// Linear backoff retry
449
RetrySchema linearRetry = RetrySchema.linear(100, 200, 2000);
450
// Delays: 100ms, 300ms, 500ms, 700ms, 900ms, 1100ms, 1300ms, 1500ms, 1700ms, 1900ms, 2000ms (max)
451
452
// Exponential backoff retry
453
RetrySchema exponentialRetry = RetrySchema.geometric(100, 2.0, 5000);
454
// Delays: 100ms, 200ms, 400ms, 800ms, 1600ms, 3200ms, 5000ms (max)
455
456
// Using retry schema with Multi
457
AtomicInteger attempts = new AtomicInteger(0);
458
459
Multi<String> flakyOperation = Multi.defer(() -> {
460
int attempt = attempts.incrementAndGet();
461
if (attempt < 3) {
462
return Multi.error(new RuntimeException("Attempt " + attempt + " failed"));
463
}
464
return Multi.just("Success on attempt " + attempt);
465
});
466
467
// Retry with exponential backoff
468
String result = flakyOperation
469
.retryWhen((error, retryCount) -> {
470
long delay = exponentialRetry.nextDelay(retryCount.intValue(), 0);
471
System.out.println("Retry " + retryCount + " after " + delay + "ms");
472
return Multi.timer(delay, TimeUnit.MILLISECONDS, scheduler);
473
})
474
.first()
475
.await();
476
477
System.out.println(result); // "Success on attempt 3"
478
```
479
480
### Advanced Subscription Management
481
482
```java
483
import io.helidon.common.reactive.Multi;
484
import java.util.concurrent.Flow;
485
import java.util.concurrent.atomic.AtomicReference;
486
487
Multi<Integer> stream = Multi.range(1, 1000000); // Large stream
488
489
AtomicReference<Flow.Subscription> subscriptionRef = new AtomicReference<>();
490
491
stream.subscribe(
492
item -> {
493
System.out.println("Processing: " + item);
494
495
// Simulate slow processing
496
try { Thread.sleep(100); } catch (InterruptedException e) {}
497
498
// Request next item (backpressure control)
499
Flow.Subscription subscription = subscriptionRef.get();
500
if (subscription != null) {
501
subscription.request(1);
502
}
503
},
504
error -> System.err.println("Stream error: " + error),
505
() -> System.out.println("Stream completed"),
506
subscription -> {
507
subscriptionRef.set(subscription);
508
// Start with requesting just one item
509
subscription.request(1);
510
}
511
);
512
```
513
514
### Error Recovery Patterns
515
516
```java
517
import io.helidon.common.reactive.Single;
518
import io.helidon.common.reactive.CompletionAwaitable;
519
520
Single<String> unreliableService = Single.error(new RuntimeException("Service down"));
521
522
// Chain multiple fallback strategies
523
CompletionAwaitable<String> robustCall = unreliableService
524
.exceptionally(error -> {
525
System.out.println("Primary service failed: " + error.getMessage());
526
throw new RuntimeException("Fallback to secondary service");
527
})
528
.exceptionally(error -> {
529
System.out.println("Secondary service failed: " + error.getMessage());
530
return "Cached response";
531
})
532
.exceptionallyAccept(error -> {
533
// Log any remaining errors without changing the result
534
System.err.println("Final error handler: " + error.getMessage());
535
});
536
537
String result = robustCall.await();
538
System.out.println("Final result: " + result); // "Cached response"
539
```