0
# Multi Reactive Streams
1
2
Multi represents reactive streams that emit zero or more items, optionally followed by an error or completion. It provides comprehensive operators for transformation, filtering, error handling, and collection operations with full backpressure support.
3
4
## Capabilities
5
6
### Core Multi Interface
7
8
The main reactive streams interface for 0-N item sequences.
9
10
```java { .api }
11
/**
12
* Represents a Flow.Publisher emitting zero or more items, optionally followed by
13
* an error or completion.
14
* @param <T> item type
15
*/
16
public interface Multi<T> extends Subscribable<T> {
17
}
18
```
19
20
### Factory Methods - Source Creation
21
22
Create Multi instances from various sources and generators.
23
24
```java { .api }
25
/**
26
* Create empty Multi that completes immediately
27
* @param <T> item type
28
* @return empty Multi
29
*/
30
static <T> Multi<T> empty();
31
32
/**
33
* Create Multi that signals error immediately
34
* @param <T> item type
35
* @param error the error to signal
36
* @return error Multi
37
* @throws NullPointerException if error is null
38
*/
39
static <T> Multi<T> error(Throwable error);
40
41
/**
42
* Create Multi that never completes
43
* @param <T> item type
44
* @return never-completing Multi
45
*/
46
static <T> Multi<T> never();
47
48
/**
49
* Create Multi from varargs items
50
* @param <T> item type
51
* @param items items to emit
52
* @return Multi emitting the items
53
*/
54
static <T> Multi<T> just(T... items);
55
56
/**
57
* Create Multi from collection items
58
* @param <T> item type
59
* @param items collection of items to emit
60
* @return Multi emitting the items
61
*/
62
static <T> Multi<T> just(Collection<? extends T> items);
63
64
/**
65
* Create Multi with single item
66
* @param <T> item type
67
* @param item single item to emit
68
* @return Multi emitting the single item
69
*/
70
static <T> Multi<T> singleton(T item);
71
72
/**
73
* Deferred Multi creation per subscriber
74
* @param <T> item type
75
* @param supplier supplier function called for each subscriber
76
* @return deferred Multi
77
* @throws NullPointerException if supplier is null
78
*/
79
static <T> Multi<T> defer(Supplier<? extends Flow.Publisher<? extends T>> supplier);
80
```
81
82
### Factory Methods - From Existing Sources
83
84
Convert existing data sources to Multi streams.
85
86
```java { .api }
87
/**
88
* Wrap existing Publisher as Multi
89
* @param <T> item type
90
* @param publisher publisher to wrap
91
* @return Multi wrapping the publisher
92
*/
93
static <T> Multi<T> create(Flow.Publisher<? extends T> publisher);
94
95
/**
96
* Convert Single to Multi
97
* @param <T> item type
98
* @param single single to convert
99
* @return Multi from Single
100
*/
101
static <T> Multi<T> create(Single<? extends T> single);
102
103
/**
104
* Convert CompletionStage to Multi
105
* @param <T> item type
106
* @param completionStage completion stage to convert
107
* @return Multi from CompletionStage
108
*/
109
static <T> Multi<T> create(CompletionStage<? extends T> completionStage);
110
111
/**
112
* Convert CompletionStage to Multi with null handling control
113
* @param <T> item type
114
* @param completionStage completion stage to convert
115
* @param nullMeansEmpty if true, null result means empty Multi
116
* @return Multi from CompletionStage
117
*/
118
static <T> Multi<T> create(CompletionStage<? extends T> completionStage, boolean nullMeansEmpty);
119
120
/**
121
* Create Multi from Iterable
122
* @param <T> item type
123
* @param iterable iterable to convert
124
* @return Multi emitting iterable items
125
*/
126
static <T> Multi<T> create(Iterable<? extends T> iterable);
127
128
/**
129
* Create Multi from Stream (closes stream when done)
130
* @param <T> item type
131
* @param stream stream to convert
132
* @return Multi emitting stream items
133
*/
134
static <T> Multi<T> create(Stream<? extends T> stream);
135
```
136
137
### Factory Methods - Sequence Generation
138
139
Generate numeric sequences and timed emissions.
140
141
```java { .api }
142
/**
143
* Generate integer sequence
144
* @param start starting value (inclusive)
145
* @param count number of items to generate
146
* @return Multi emitting integer sequence
147
*/
148
static Multi<Integer> range(int start, int count);
149
150
/**
151
* Generate long sequence
152
* @param start starting value (inclusive)
153
* @param count number of items to generate
154
* @return Multi emitting long sequence
155
*/
156
static Multi<Long> rangeLong(long start, long count);
157
158
/**
159
* Periodic timer sequence
160
* @param period time between emissions
161
* @param unit time unit
162
* @param executor scheduled executor for timing
163
* @return Multi emitting periodic signals
164
*/
165
static Multi<Long> interval(long period, TimeUnit unit, ScheduledExecutorService executor);
166
167
/**
168
* Delayed periodic timer sequence
169
* @param initialDelay initial delay before first emission
170
* @param period time between emissions
171
* @param unit time unit
172
* @param executor scheduled executor for timing
173
* @return Multi emitting delayed periodic signals
174
*/
175
static Multi<Long> interval(long initialDelay, long period, TimeUnit unit, ScheduledExecutorService executor);
176
177
/**
178
* Single delayed emission
179
* @param time delay time
180
* @param unit time unit
181
* @param executor scheduled executor for timing
182
* @return Multi emitting single delayed signal
183
*/
184
static Multi<Long> timer(long time, TimeUnit unit, ScheduledExecutorService executor);
185
```
186
187
### Factory Methods - Composition
188
189
Combine multiple publishers into single streams.
190
191
```java { .api }
192
/**
193
* Concatenate two publishers sequentially
194
* @param <T> item type
195
* @param firstMulti first stream
196
* @param secondMulti second stream
197
* @return concatenated Multi
198
*/
199
static <T> Multi<T> concat(Flow.Publisher<? extends T> firstMulti, Flow.Publisher<? extends T> secondMulti);
200
201
/**
202
* Concatenate multiple publishers sequentially
203
* @param <T> item type
204
* @param firstPublisher first stream
205
* @param secondPublisher second stream
206
* @param morePublishers additional publishers
207
* @return concatenated Multi
208
*/
209
static <T> Multi<T> concat(Flow.Publisher<? extends T> firstPublisher,
210
Flow.Publisher<? extends T> secondPublisher,
211
Flow.Publisher<? extends T>... morePublishers);
212
213
/**
214
* Concatenate array of publishers
215
* @param <T> item type
216
* @param publishers array of publishers to concatenate
217
* @return concatenated Multi
218
*/
219
static <T> Multi<T> concatArray(Flow.Publisher<? extends T>... publishers);
220
```
221
222
### Transformation Operators - Filtering & Selection
223
224
Filter and select items from streams.
225
226
```java { .api }
227
/**
228
* Filter items based on predicate
229
* @param predicate filter predicate
230
* @return filtered Multi
231
* @throws NullPointerException if predicate is null
232
*/
233
Multi<T> filter(Predicate<? super T> predicate);
234
235
/**
236
* Remove duplicate items
237
* @return Multi with duplicates removed
238
*/
239
Multi<T> distinct();
240
241
/**
242
* Take only first N items
243
* @param maxSize maximum number of items
244
* @return limited Multi
245
*/
246
Multi<T> limit(long maxSize);
247
248
/**
249
* Skip first N items
250
* @param count number of items to skip
251
* @return Multi with items skipped
252
*/
253
Multi<T> skip(long count);
254
255
/**
256
* Take items while predicate is true
257
* @param predicate condition predicate
258
* @return Multi taking while condition holds
259
* @throws NullPointerException if predicate is null
260
*/
261
Multi<T> takeWhile(Predicate<? super T> predicate);
262
263
/**
264
* Skip items while predicate is true
265
* @param predicate condition predicate
266
* @return Multi dropping while condition holds
267
* @throws NullPointerException if predicate is null
268
*/
269
Multi<T> dropWhile(Predicate<? super T> predicate);
270
271
/**
272
* Take items until other publisher signals
273
* @param <U> signal type
274
* @param other publisher to signal stop
275
* @return Multi taking until signal
276
* @throws NullPointerException if other is null
277
*/
278
<U> Multi<T> takeUntil(Flow.Publisher<U> other);
279
280
/**
281
* Get first item as Single
282
* @return Single with first item or empty
283
*/
284
Single<T> first();
285
```
286
287
### Transformation Operators - Mapping & Transformation
288
289
Transform and reshape stream items.
290
291
```java { .api }
292
/**
293
* Transform each item
294
* @param <U> result type
295
* @param mapper transformation function
296
* @return transformed Multi
297
* @throws NullPointerException if mapper is null
298
*/
299
<U> Multi<U> map(Function<? super T, ? extends U> mapper);
300
301
/**
302
* Transform and flatten publishers
303
* @param <U> result type
304
* @param mapper function producing publishers
305
* @return flattened Multi
306
* @throws NullPointerException if mapper is null
307
*/
308
<U> Multi<U> flatMap(Function<? super T, ? extends Flow.Publisher<? extends U>> mapper);
309
310
/**
311
* Advanced flatMap with concurrency control
312
* @param <U> result type
313
* @param mapper function producing publishers
314
* @param maxConcurrency maximum concurrent inner publishers
315
* @param delayErrors if true, delay errors until all complete
316
* @param prefetch prefetch amount for inner publishers
317
* @return flattened Multi with concurrency control
318
*/
319
<U> Multi<U> flatMap(Function<? super T, ? extends Flow.Publisher<? extends U>> mapper,
320
long maxConcurrency, boolean delayErrors, long prefetch);
321
322
/**
323
* FlatMap CompletionStages
324
* @param <U> result type
325
* @param mapper function producing CompletionStages
326
* @return flattened Multi from CompletionStages
327
* @throws NullPointerException if mapper is null
328
*/
329
<U> Multi<U> flatMapCompletionStage(Function<? super T, ? extends CompletionStage<? extends U>> mapper);
330
331
/**
332
* FlatMap Iterables
333
* @param <U> result type
334
* @param mapper function producing Iterables
335
* @return flattened Multi from Iterables
336
* @throws NullPointerException if mapper is null
337
*/
338
<U> Multi<U> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> mapper);
339
340
/**
341
* FlatMap Iterables with prefetch control
342
* @param <U> result type
343
* @param mapper function producing Iterables
344
* @param prefetch prefetch amount
345
* @return flattened Multi from Iterables
346
*/
347
<U> Multi<U> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> mapper, int prefetch);
348
349
/**
350
* FlatMap Optionals
351
* @param <U> result type
352
* @param mapper function producing Optionals
353
* @return flattened Multi from Optionals
354
* @throws NullPointerException if mapper is null
355
*/
356
<U> Multi<U> flatMapOptional(Function<? super T, ? extends Optional<? extends U>> mapper);
357
```
358
359
### Error Handling Operators
360
361
Handle errors and implement retry logic.
362
363
```java { .api }
364
/**
365
* Resume with single item on error
366
* @param resumeFunction function providing resume value
367
* @return Multi with error handling
368
* @throws NullPointerException if resumeFunction is null
369
*/
370
Multi<T> onErrorResume(Function<? super Throwable, ? extends T> resumeFunction);
371
372
/**
373
* Resume with publisher on error
374
* @param resumeFunction function providing resume publisher
375
* @return Multi with error handling
376
* @throws NullPointerException if resumeFunction is null
377
*/
378
Multi<T> onErrorResumeWith(Function<? super Throwable, ? extends Flow.Publisher<? extends T>> resumeFunction);
379
380
/**
381
* Retry failed stream N times
382
* @param count number of retries
383
* @return Multi with retry logic
384
*/
385
Multi<T> retry(long count);
386
387
/**
388
* Conditional retry
389
* @param retryPredicate predicate testing if retry should occur
390
* @return Multi with conditional retry
391
* @throws NullPointerException if retryPredicate is null
392
*/
393
Multi<T> retry(BiPredicate<? super Throwable, ? super Long> retryPredicate);
394
395
/**
396
* Advanced retry control
397
* @param <U> signal type
398
* @param whenRetryFunction function controlling retry timing
399
* @return Multi with advanced retry control
400
* @throws NullPointerException if whenRetryFunction is null
401
*/
402
<U> Multi<T> retryWhen(BiFunction<? super Throwable, ? super Long, ? extends Flow.Publisher<U>> whenRetryFunction);
403
```
404
405
### Flow Control & Default Value Operators
406
407
Control stream flow and provide fallback values.
408
409
```java { .api }
410
/**
411
* Provide default item if stream is empty
412
* @param defaultItem default item to emit
413
* @return Multi with default value
414
*/
415
Multi<T> defaultIfEmpty(T defaultItem);
416
417
/**
418
* Provide default item via supplier if stream is empty
419
* @param supplier supplier of default item
420
* @return Multi with default value from supplier
421
* @throws NullPointerException if supplier is null
422
*/
423
Multi<T> defaultIfEmpty(Supplier<? extends T> supplier);
424
425
/**
426
* Switch to alternative publisher if empty
427
* @param other alternative publisher
428
* @return Multi switching to alternative if empty
429
* @throws NullPointerException if other is null
430
*/
431
Multi<T> switchIfEmpty(Flow.Publisher<? extends T> other);
432
433
/**
434
* Execute action if stream is empty
435
* @param emptyAction action to execute
436
* @return Multi executing action if empty
437
* @throws NullPointerException if emptyAction is null
438
*/
439
Multi<T> ifEmpty(Runnable emptyAction);
440
```
441
442
### Completion Handling Operators
443
444
Handle completion events and append additional items.
445
446
```java { .api }
447
/**
448
* Append item after completion
449
* @param resumeValue item to append
450
* @return Multi with appended item
451
*/
452
Multi<T> onCompleteResume(T resumeValue);
453
454
/**
455
* Append publisher after completion
456
* @param resumePublisher publisher to append
457
* @return Multi with appended publisher
458
* @throws NullPointerException if resumePublisher is null
459
*/
460
Multi<T> onCompleteResumeWith(Flow.Publisher<? extends T> resumePublisher);
461
```
462
463
### Side Effect Operators
464
465
Observe and react to stream events without modifying the stream.
466
467
```java { .api }
468
/**
469
* Observe items without modification
470
* @param consumer observer function
471
* @return Multi with side effect
472
* @throws NullPointerException if consumer is null
473
*/
474
Multi<T> peek(Consumer<? super T> consumer);
475
476
/**
477
* Execute on cancellation
478
* @param onCancel action to execute
479
* @return Multi with cancel handler
480
* @throws NullPointerException if onCancel is null
481
*/
482
Multi<T> onCancel(Runnable onCancel);
483
484
/**
485
* Execute on completion
486
* @param onComplete action to execute
487
* @return Multi with completion handler
488
* @throws NullPointerException if onComplete is null
489
*/
490
Multi<T> onComplete(Runnable onComplete);
491
492
/**
493
* Execute on error
494
* @param onError action to execute
495
* @return Multi with error handler
496
* @throws NullPointerException if onError is null
497
*/
498
Multi<T> onError(Consumer<? super Throwable> onError);
499
500
/**
501
* Execute on any termination (complete/error/cancel)
502
* @param onTerminate action to execute
503
* @return Multi with termination handler
504
* @throws NullPointerException if onTerminate is null
505
*/
506
Multi<T> onTerminate(Runnable onTerminate);
507
```
508
509
### Threading & Timing Operators
510
511
Control execution context and implement timeout behavior.
512
513
```java { .api }
514
/**
515
* Switch execution context
516
* @param executor executor for downstream operations
517
* @return Multi executing on specified executor
518
* @throws NullPointerException if executor is null
519
*/
520
Multi<T> observeOn(Executor executor);
521
522
/**
523
* Advanced threading control
524
* @param executor executor for downstream operations
525
* @param bufferSize buffer size for context switching
526
* @param delayErrors if true, delay errors until buffer is drained
527
* @return Multi with advanced threading control
528
*/
529
Multi<T> observeOn(Executor executor, int bufferSize, boolean delayErrors);
530
531
/**
532
* Timeout with error
533
* @param timeout timeout duration
534
* @param unit time unit
535
* @param executor scheduled executor for timeout
536
* @return Multi with timeout
537
* @throws NullPointerException if unit or executor is null
538
*/
539
Multi<T> timeout(long timeout, TimeUnit unit, ScheduledExecutorService executor);
540
541
/**
542
* Timeout with fallback
543
* @param timeout timeout duration
544
* @param unit time unit
545
* @param executor scheduled executor for timeout
546
* @param fallback fallback publisher
547
* @return Multi with timeout and fallback
548
* @throws NullPointerException if unit, executor, or fallback is null
549
*/
550
Multi<T> timeout(long timeout, TimeUnit unit, ScheduledExecutorService executor, Flow.Publisher<? extends T> fallback);
551
```
552
553
### Utility Operators
554
555
Debug, compose, and transform streams.
556
557
```java { .api }
558
/**
559
* Log all reactive signals
560
* @return Multi with logging
561
*/
562
Multi<T> log();
563
564
/**
565
* Log with specific level
566
* @param level logging level
567
* @return Multi with logging at level
568
* @throws NullPointerException if level is null
569
*/
570
Multi<T> log(Level level);
571
572
/**
573
* Log with custom logger
574
* @param level logging level
575
* @param loggerName logger name
576
* @return Multi with custom logging
577
* @throws NullPointerException if level or loggerName is null
578
*/
579
Multi<T> log(Level level, String loggerName);
580
581
/**
582
* Log with trace information
583
* @param level logging level
584
* @param trace include trace information
585
* @return Multi with trace logging
586
* @throws NullPointerException if level is null
587
*/
588
Multi<T> log(Level level, boolean trace);
589
590
/**
591
* Apply custom composition
592
* @param <U> result type
593
* @param composer composition function
594
* @return result of composition
595
* @throws NullPointerException if composer is null
596
*/
597
<U> U compose(Function<? super Multi<T>, ? extends U> composer);
598
599
/**
600
* Terminal transformation
601
* @param <U> result type
602
* @param converter conversion function
603
* @return converted result
604
* @throws NullPointerException if converter is null
605
*/
606
<U> U to(Function<? super Multi<T>, ? extends U> converter);
607
```
608
609
## Collection & Reduction Operations
610
611
Collect stream items into data structures or reduce to single values.
612
613
### Collector Operations
614
615
```java { .api }
616
/**
617
* Collect using custom collector
618
* @param <A> accumulator type
619
* @param <R> result type
620
* @param collector collector to use
621
* @return Single with collected result
622
* @throws NullPointerException if collector is null
623
*/
624
<A, R> Single<R> collect(Collector<? super T, A, R> collector);
625
626
/**
627
* Collect with supplier and accumulator
628
* @param <R> result type
629
* @param supplier supplier of collection container
630
* @param accumulator accumulator function
631
* @return Single with collected result
632
* @throws NullPointerException if supplier or accumulator is null
633
*/
634
<R> Single<R> collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator);
635
636
/**
637
* Collect to List
638
* @return Single containing List of all items
639
*/
640
Single<List<T>> collectList();
641
642
/**
643
* Use Java Stream collectors
644
* @param <A> accumulator type
645
* @param <R> result type
646
* @param collector Java Stream collector
647
* @return Single with collected result
648
* @throws NullPointerException if collector is null
649
*/
650
<A, R> Single<R> collectStream(java.util.stream.Collector<? super T, A, R> collector);
651
```
652
653
### Reduction Operations
654
655
```java { .api }
656
/**
657
* Reduce to single value
658
* @param accumulator reduction function
659
* @return Single with reduced result or empty
660
* @throws NullPointerException if accumulator is null
661
*/
662
Single<T> reduce(BinaryOperator<T> accumulator);
663
664
/**
665
* Reduce with initial value
666
* @param <U> result type
667
* @param identity initial value
668
* @param accumulator reduction function
669
* @return Single with reduced result
670
* @throws NullPointerException if accumulator is null
671
*/
672
<U> Single<U> reduce(U identity, BiFunction<U, ? super T, U> accumulator);
673
```
674
675
## Terminal Operations
676
677
Final operations that trigger stream execution and consumption.
678
679
### Consumption Operations
680
681
```java { .api }
682
/**
683
* Consume all items (no backpressure)
684
* @param consumer item consumer
685
* @throws NullPointerException if consumer is null
686
*/
687
void forEach(Consumer<? super T> consumer);
688
689
/**
690
* Sequential async consumption
691
* @param mapper function producing CompletionStages
692
* @return CompletionStage completing when all items processed
693
* @throws NullPointerException if mapper is null
694
*/
695
CompletionStage<Void> forEachCompletionStage(Function<? super T, ? extends CompletionStage<Void>> mapper);
696
697
/**
698
* Ignore all items, complete when stream terminates
699
* @return Single that completes when stream terminates
700
*/
701
Single<Void> ignoreElements();
702
```
703
704
## Usage Examples
705
706
### Basic Stream Processing
707
708
```java
709
import io.helidon.common.reactive.Multi;
710
711
// Create and process stream
712
Multi<String> processed = Multi.just("apple", "banana", "cherry")
713
.filter(fruit -> fruit.length() > 5)
714
.map(String::toUpperCase);
715
716
processed.forEach(System.out::println); // Prints: BANANA, CHERRY
717
```
718
719
### Error Handling
720
721
```java
722
Multi<Integer> withErrorHandling = Multi.just(1, 2, 0, 4)
723
.map(n -> 10 / n) // Will throw ArithmeticException for 0
724
.onErrorResumeWith(error -> Multi.just(-1)); // Resume with -1
725
726
List<Integer> result = withErrorHandling.collectList().await();
727
// Result: [10, 5, -1]
728
```
729
730
### Retry Logic
731
732
```java
733
Multi<String> withRetry = Multi.error(new RuntimeException("Network error"))
734
.retry(3) // Retry 3 times
735
.onErrorResumeWith(error -> Multi.just("Fallback value"));
736
737
String result = withRetry.first().await();
738
// Result: "Fallback value" (after 3 retries)
739
```
740
741
### Async Processing
742
743
```java
744
Multi<CompletionStage<String>> asyncTasks = Multi.range(1, 5)
745
.map(i -> CompletableFuture.supplyAsync(() -> "Task " + i));
746
747
// Process async tasks sequentially
748
CompletionStage<Void> completion = Multi.create(asyncTasks)
749
.flatMapCompletionStage(Function.identity())
750
.forEachCompletionStage(result -> {
751
System.out.println(result);
752
return CompletableFuture.completedFuture(null);
753
});
754
```