0
# Single Reactive Values
1
2
Single represents reactive streams that emit at most one item, then complete, or signal an error. It implements CompletionStage and provides blocking await operations for integration with existing synchronous code.
3
4
## Capabilities
5
6
### Core Single Interface
7
8
The main reactive interface for 0-1 item sequences with CompletionStage compatibility.
9
10
```java { .api }
11
/**
12
* Represents a Flow.Publisher that may: signal one item then completes, complete without
13
* an item or signal an error.
14
* @param <T> item type
15
*/
16
public interface Single<T> extends Subscribable<T>, CompletionStage<T>, Awaitable<T> {
17
}
18
```
19
20
### Factory Methods - Source Creation
21
22
Create Single instances from various sources.
23
24
```java { .api }
25
/**
26
* Create empty Single that completes immediately
27
* @param <T> item type
28
* @return empty Single
29
*/
30
static <T> Single<T> empty();
31
32
/**
33
* Create Single that signals error immediately
34
* @param <T> item type
35
* @param error the error to signal
36
* @return error Single
37
* @throws NullPointerException if error is null
38
*/
39
static <T> Single<T> error(Throwable error);
40
41
/**
42
* Create Single that never completes
43
* @param <T> item type
44
* @return never-completing Single
45
*/
46
static <T> Single<T> never();
47
48
/**
49
* Create Single with item
50
* @param <T> item type
51
* @param item item to emit
52
* @return Single emitting the item
53
*/
54
static <T> Single<T> just(T item);
55
56
/**
57
* Create Single from supplier
58
* @param <T> item type
59
* @param supplier supplier of the item
60
* @return Single from supplier
61
* @throws NullPointerException if supplier is null
62
*/
63
static <T> Single<T> create(Supplier<? extends T> supplier);
64
65
/**
66
* Deferred Single creation per subscriber
67
* @param <T> item type
68
* @param supplier supplier function called for each subscriber
69
* @return deferred Single
70
* @throws NullPointerException if supplier is null
71
*/
72
static <T> Single<T> defer(Supplier<? extends Single<? extends T>> supplier);
73
```
74
75
### Factory Methods - From Existing Sources
76
77
Convert existing data sources to Single instances.
78
79
```java { .api }
80
/**
81
* Create Single from CompletionStage
82
* @param <T> item type
83
* @param completionStage completion stage to convert
84
* @return Single from CompletionStage
85
* @throws NullPointerException if completionStage is null
86
*/
87
static <T> Single<T> create(CompletionStage<? extends T> completionStage);
88
89
/**
90
* Create Single from CompletionStage with null handling
91
* @param <T> item type
92
* @param completionStage completion stage to convert
93
* @param nullMeansEmpty if true, null result means empty Single
94
* @return Single from CompletionStage
95
* @throws NullPointerException if completionStage is null
96
*/
97
static <T> Single<T> create(CompletionStage<? extends T> completionStage, boolean nullMeansEmpty);
98
99
/**
100
* Create Single from Publisher (expects 0-1 items)
101
* @param <T> item type
102
* @param publisher publisher to convert
103
* @return Single from Publisher
104
* @throws NullPointerException if publisher is null
105
*/
106
static <T> Single<T> create(Flow.Publisher<? extends T> publisher);
107
108
/**
109
* Copy constructor for Single
110
* @param <T> item type
111
* @param single single to copy
112
* @return copied Single
113
* @throws NullPointerException if single is null
114
*/
115
static <T> Single<T> create(Single<? extends T> single);
116
```
117
118
### Factory Methods - Timing
119
120
Create time-based Single emissions.
121
122
```java { .api }
123
/**
124
* Single delayed emission
125
* @param time delay time
126
* @param unit time unit
127
* @param executor scheduled executor for timing
128
* @return Single emitting after delay
129
* @throws NullPointerException if unit or executor is null
130
*/
131
static Single<Long> timer(long time, TimeUnit unit, ScheduledExecutorService executor);
132
```
133
134
### Transformation Operators - Mapping & Transformation
135
136
Transform Single values and convert to other reactive types.
137
138
```java { .api }
139
/**
140
* Transform the item
141
* @param <U> result type
142
* @param mapper transformation function
143
* @return transformed Single
144
* @throws NullPointerException if mapper is null
145
*/
146
<U> Single<U> map(Function<? super T, ? extends U> mapper);
147
148
/**
149
* Transform to Multi
150
* @param <U> result type
151
* @param mapper function producing publishers
152
* @return Multi from transformation
153
* @throws NullPointerException if mapper is null
154
*/
155
<U> Multi<U> flatMap(Function<? super T, ? extends Flow.Publisher<? extends U>> mapper);
156
157
/**
158
* Transform to another Single
159
* @param <U> result type
160
* @param mapper function producing Single
161
* @return flattened Single
162
* @throws NullPointerException if mapper is null
163
*/
164
<U> Single<U> flatMapSingle(Function<? super T, ? extends Single<? extends U>> mapper);
165
166
/**
167
* Transform to CompletionStage
168
* @param <U> result type
169
* @param mapper function producing CompletionStage
170
* @return Single from CompletionStage
171
* @throws NullPointerException if mapper is null
172
*/
173
<U> Single<U> flatMapCompletionStage(Function<? super T, ? extends CompletionStage<? extends U>> mapper);
174
175
/**
176
* Transform to Multi via Iterable
177
* @param <U> result type
178
* @param mapper function producing Iterable
179
* @return Multi from Iterable
180
* @throws NullPointerException if mapper is null
181
*/
182
<U> Multi<U> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> mapper);
183
184
/**
185
* Transform via Optional
186
* @param <U> result type
187
* @param mapper function producing Optional
188
* @return Single from Optional (empty if Optional is empty)
189
* @throws NullPointerException if mapper is null
190
*/
191
<U> Single<U> flatMapOptional(Function<? super T, ? extends Optional<? extends U>> mapper);
192
```
193
194
### Flow Control & Default Value Operators
195
196
Control Single behavior and provide fallback values.
197
198
```java { .api }
199
/**
200
* Provide default item if empty
201
* @param defaultItem default item to emit
202
* @return Single with default value
203
*/
204
Single<T> defaultIfEmpty(T defaultItem);
205
206
/**
207
* Provide default item via supplier if empty
208
* @param supplier supplier of default item
209
* @return Single with default value from supplier
210
* @throws NullPointerException if supplier is null
211
*/
212
Single<T> defaultIfEmpty(Supplier<? extends T> supplier);
213
214
/**
215
* Switch to other Single if empty
216
* @param other alternative Single
217
* @return Single switching to alternative if empty
218
* @throws NullPointerException if other is null
219
*/
220
Single<T> switchIfEmpty(Single<? extends T> other);
221
222
/**
223
* Execute action if empty
224
* @param emptyAction action to execute
225
* @return Single executing action if empty
226
* @throws NullPointerException if emptyAction is null
227
*/
228
Single<T> ifEmpty(Runnable emptyAction);
229
```
230
231
### Error Handling Operators
232
233
Handle errors and implement retry logic for Single values.
234
235
```java { .api }
236
/**
237
* Resume with item on error
238
* @param resumeFunction function providing resume value
239
* @return Single with error handling
240
* @throws NullPointerException if resumeFunction is null
241
*/
242
Single<T> onErrorResume(Function<? super Throwable, ? extends T> resumeFunction);
243
244
/**
245
* Resume with Single on error
246
* @param resumeFunction function providing resume Single
247
* @return Single with error handling
248
* @throws NullPointerException if resumeFunction is null
249
*/
250
Single<T> onErrorResumeWithSingle(Function<? super Throwable, ? extends Single<? extends T>> resumeFunction);
251
252
/**
253
* Resume with Publisher on error (returns Multi)
254
* @param resumeFunction function providing resume Publisher
255
* @return Multi with error handling
256
* @throws NullPointerException if resumeFunction is null
257
*/
258
Multi<T> onErrorResumeWith(Function<? super Throwable, ? extends Flow.Publisher<? extends T>> resumeFunction);
259
260
/**
261
* Retry N times
262
* @param count number of retries
263
* @return Single with retry logic
264
*/
265
Single<T> retry(long count);
266
267
/**
268
* Conditional retry
269
* @param retryPredicate predicate testing if retry should occur
270
* @return Single with conditional retry
271
* @throws NullPointerException if retryPredicate is null
272
*/
273
Single<T> retry(BiPredicate<? super Throwable, ? super Long> retryPredicate);
274
275
/**
276
* Advanced retry control
277
* @param <U> signal type
278
* @param whenRetryFunction function controlling retry timing
279
* @return Single with advanced retry control
280
* @throws NullPointerException if whenRetryFunction is null
281
*/
282
<U> Single<T> retryWhen(BiFunction<? super Throwable, ? super Long, ? extends Flow.Publisher<U>> whenRetryFunction);
283
```
284
285
### Completion Handling Operators
286
287
Handle completion events and append additional items or streams.
288
289
```java { .api }
290
/**
291
* Append item after completion (returns Multi)
292
* @param resumeValue item to append
293
* @return Multi with appended item
294
*/
295
Multi<T> onCompleteResume(T resumeValue);
296
297
/**
298
* Append publisher after completion (returns Multi)
299
* @param resumePublisher publisher to append
300
* @return Multi with appended publisher
301
* @throws NullPointerException if resumePublisher is null
302
*/
303
Multi<T> onCompleteResumeWith(Flow.Publisher<? extends T> resumePublisher);
304
305
/**
306
* Resume with Single based on completion state
307
* @param resumeFunction function receiving Optional of completed value
308
* @return Single with conditional resume
309
* @throws NullPointerException if resumeFunction is null
310
*/
311
Single<T> onCompleteResumeWithSingle(Function<Optional<T>, Single<T>> resumeFunction);
312
```
313
314
### Side Effect Operators
315
316
Observe and react to Single events without modifying the value.
317
318
```java { .api }
319
/**
320
* Observe item without modification
321
* @param consumer observer function
322
* @return Single with side effect
323
* @throws NullPointerException if consumer is null
324
*/
325
Single<T> peek(Consumer<? super T> consumer);
326
327
/**
328
* Execute on cancellation
329
* @param onCancel action to execute
330
* @return Single with cancel handler
331
* @throws NullPointerException if onCancel is null
332
*/
333
Single<T> onCancel(Runnable onCancel);
334
335
/**
336
* Execute on completion
337
* @param onComplete action to execute
338
* @return Single with completion handler
339
* @throws NullPointerException if onComplete is null
340
*/
341
Single<T> onComplete(Runnable onComplete);
342
343
/**
344
* Execute on error
345
* @param onError action to execute
346
* @return Single with error handler
347
* @throws NullPointerException if onError is null
348
*/
349
Single<T> onError(Consumer<? super Throwable> onError);
350
351
/**
352
* Execute on any termination (complete/error/cancel)
353
* @param onTerminate action to execute
354
* @return Single with termination handler
355
* @throws NullPointerException if onTerminate is null
356
*/
357
Single<T> onTerminate(Runnable onTerminate);
358
```
359
360
### Threading & Timing Operators
361
362
Control execution context and implement timeout behavior.
363
364
```java { .api }
365
/**
366
* Switch execution context
367
* @param executor executor for downstream operations
368
* @return Single executing on specified executor
369
* @throws NullPointerException if executor is null
370
*/
371
Single<T> observeOn(Executor executor);
372
373
/**
374
* Take until other publisher signals
375
* @param <U> signal type
376
* @param other publisher to signal stop
377
* @return Single taking until signal
378
* @throws NullPointerException if other is null
379
*/
380
<U> Single<T> takeUntil(Flow.Publisher<U> other);
381
382
/**
383
* Timeout with error
384
* @param timeout timeout duration
385
* @param unit time unit
386
* @param executor scheduled executor for timeout
387
* @return Single with timeout
388
* @throws NullPointerException if unit or executor is null
389
*/
390
Single<T> timeout(long timeout, TimeUnit unit, ScheduledExecutorService executor);
391
392
/**
393
* Timeout with fallback
394
* @param timeout timeout duration
395
* @param unit time unit
396
* @param executor scheduled executor for timeout
397
* @param fallback fallback Single
398
* @return Single with timeout and fallback
399
* @throws NullPointerException if unit, executor, or fallback is null
400
*/
401
Single<T> timeout(long timeout, TimeUnit unit, ScheduledExecutorService executor, Single<? extends T> fallback);
402
```
403
404
### Utility Operators
405
406
Debug, compose, and transform Single instances.
407
408
```java { .api }
409
/**
410
* Apply custom composition
411
* @param <U> result type
412
* @param composer composition function
413
* @return result of composition
414
* @throws NullPointerException if composer is null
415
*/
416
<U> U compose(Function<? super Single<T>, ? extends U> composer);
417
418
/**
419
* Terminal transformation
420
* @param <U> result type
421
* @param converter conversion function
422
* @return converted result
423
* @throws NullPointerException if converter is null
424
*/
425
<U> U to(Function<? super Single<T>, ? extends U> converter);
426
427
/**
428
* Log all reactive signals
429
* @return Single with logging
430
*/
431
Single<T> log();
432
433
/**
434
* Log with specific level
435
* @param level logging level
436
* @return Single with logging at level
437
* @throws NullPointerException if level is null
438
*/
439
Single<T> log(Level level);
440
441
/**
442
* Log with custom logger
443
* @param level logging level
444
* @param loggerName logger name
445
* @return Single with custom logging
446
* @throws NullPointerException if level or loggerName is null
447
*/
448
Single<T> log(Level level, String loggerName);
449
450
/**
451
* Log with trace information
452
* @param level logging level
453
* @param trace include trace information
454
* @return Single with trace logging
455
* @throws NullPointerException if level is null
456
*/
457
Single<T> log(Level level, boolean trace);
458
```
459
460
## CompletionStage Integration
461
462
Single implements CompletionStage, providing full async composition capabilities.
463
464
### Standard CompletionStage Methods
465
466
All standard CompletionStage methods return `CompletionAwaitable<T>` for enhanced functionality.
467
468
```java { .api }
469
/**
470
* CompletionStage transformation methods
471
*/
472
<U> CompletionAwaitable<U> thenApply(Function<? super T, ? extends U> fn);
473
CompletionAwaitable<Void> thenAccept(Consumer<? super T> action);
474
CompletionAwaitable<Void> thenRun(Runnable action);
475
476
<U> CompletionAwaitable<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
477
478
<U, V> CompletionAwaitable<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn);
479
<U> CompletionAwaitable<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
480
CompletionAwaitable<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
481
482
<U> CompletionAwaitable<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
483
CompletionAwaitable<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
484
CompletionAwaitable<Void> runAfterEither(CompletionStage<?> other, Runnable action);
485
486
CompletionAwaitable<T> exceptionally(Function<Throwable, ? extends T> fn);
487
CompletionAwaitable<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
488
<U> CompletionAwaitable<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
489
```
490
491
### Additional CompletionStage Methods
492
493
Enhanced error handling method specific to CompletionAwaitable.
494
495
```java { .api }
496
/**
497
* Handle exceptions with consumer (does not transform result)
498
* @param exceptionConsumer consumer for exceptions
499
* @return CompletionAwaitable with exception handling
500
* @throws NullPointerException if exceptionConsumer is null
501
*/
502
CompletionAwaitable<T> exceptionallyAccept(Consumer<? super Throwable> exceptionConsumer);
503
```
504
505
## Blocking Operations & Integration
506
507
Single provides convenient blocking operations through the Awaitable interface.
508
509
### Awaitable Methods
510
511
```java { .api }
512
/**
513
* Block until completion (unchecked exceptions only)
514
* @return the result value
515
* @throws RuntimeException for any checked exceptions
516
*/
517
T await();
518
519
/**
520
* Block with timeout (unchecked exceptions)
521
* @param timeout timeout duration
522
* @return the result value
523
* @throws RuntimeException for timeout or checked exceptions
524
* @throws NullPointerException if timeout is null
525
*/
526
T await(Duration timeout);
527
528
/**
529
* Block and get result (may throw checked exceptions)
530
* @return the result value
531
* @throws InterruptedException if interrupted while waiting
532
* @throws ExecutionException if computation threw exception
533
*/
534
T get() throws InterruptedException, ExecutionException;
535
536
/**
537
* Block with timeout and get result
538
* @param timeout timeout value
539
* @param unit timeout unit
540
* @return the result value
541
* @throws InterruptedException if interrupted while waiting
542
* @throws ExecutionException if computation threw exception
543
* @throws TimeoutException if timeout elapsed
544
* @throws NullPointerException if unit is null
545
*/
546
T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
547
```
548
549
### Conversion Methods
550
551
```java { .api }
552
/**
553
* Convert to CompletionStage
554
* @return CompletionStage representation
555
*/
556
CompletionStage<T> toStage();
557
558
/**
559
* Convert to CompletionStage with empty completion control
560
* @param completeWithoutValue if true, complete with null for empty Single
561
* @return CompletionStage representation
562
*/
563
CompletionStage<T> toStage(boolean completeWithoutValue);
564
565
/**
566
* Convert to Single<Optional<T>>
567
* @return Single wrapping result in Optional
568
*/
569
Single<Optional<T>> toOptionalSingle();
570
571
/**
572
* Cancel upstream and return new Single
573
* @return cancelled Single
574
*/
575
Single<T> cancel();
576
```
577
578
### Terminal Operations
579
580
Final operations for consuming Single values.
581
582
```java { .api }
583
/**
584
* Consume the item if present
585
* @param consumer item consumer
586
* @throws NullPointerException if consumer is null
587
*/
588
void forSingle(Consumer<? super T> consumer);
589
590
/**
591
* Ignore the item, complete when done
592
* @return Single<Void> that completes when original completes
593
*/
594
Single<Void> ignoreElement();
595
```
596
597
## Usage Examples
598
599
### Basic Single Operations
600
601
```java
602
import io.helidon.common.reactive.Single;
603
import java.util.concurrent.CompletableFuture;
604
605
// Create Single from value
606
Single<String> greeting = Single.just("Hello")
607
.map(s -> s + " World!");
608
609
String result = greeting.await();
610
// Result: "Hello World!"
611
612
// From CompletionStage
613
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42);
614
Single<Integer> value = Single.create(future);
615
616
Integer number = value.await();
617
// Result: 42
618
```
619
620
### Error Handling
621
622
```java
623
Single<String> withErrorHandling = Single.error(new RuntimeException("Error"))
624
.onErrorResume(error -> "Fallback value");
625
626
String result = withErrorHandling.await();
627
// Result: "Fallback value"
628
629
// Retry with backoff
630
Single<String> withRetry = Single.error(new RuntimeException("Network error"))
631
.retry(3)
632
.onErrorResume(error -> "Final fallback");
633
634
String final_result = withRetry.await();
635
// Result: "Final fallback" (after 3 retries)
636
```
637
638
### CompletionStage Integration
639
640
```java
641
Single<String> data = Single.just("data");
642
643
// Use as CompletionStage
644
CompletionStage<String> stage = data
645
.thenApply(String::toUpperCase)
646
.thenApply(s -> s + "!");
647
648
String result = stage.toCompletableFuture().join();
649
// Result: "DATA!"
650
651
// Combine with other CompletionStages
652
CompletableFuture<String> other = CompletableFuture.completedFuture("other");
653
CompletionStage<String> combined = data.thenCombine(other, (a, b) -> a + "-" + b);
654
655
String combinedResult = combined.toCompletableFuture().join();
656
// Result: "data-other"
657
```
658
659
### Transformation to Multi
660
661
```java
662
Single<List<String>> items = Single.just(Arrays.asList("a", "b", "c"));
663
664
// FlatMap to Multi
665
Multi<String> stream = items.flatMapIterable(Function.identity());
666
667
List<String> result = stream.collectList().await();
668
// Result: ["a", "b", "c"]
669
```
670
671
### Timeout and Fallback
672
673
```java
674
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
675
676
Single<String> delayed = Single.timer(5, TimeUnit.SECONDS, executor)
677
.map(ignored -> "Delayed result");
678
679
Single<String> withTimeout = delayed
680
.timeout(2, TimeUnit.SECONDS, executor, Single.just("Timeout fallback"));
681
682
String result = withTimeout.await();
683
// Result: "Timeout fallback" (timeout after 2 seconds)
684
```