or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdio-integration.mdmulti.mdsingle.mdsupporting-types.md

multi.mddocs/

0

# Multi Reactive Streams

1

2

Multi represents reactive streams that emit zero or more items, optionally followed by an error or completion. It provides comprehensive operators for transformation, filtering, error handling, and collection operations with full backpressure support.

3

4

## Capabilities

5

6

### Core Multi Interface

7

8

The main reactive streams interface for 0-N item sequences.

9

10

```java { .api }

11

/**

12

* Represents a Flow.Publisher emitting zero or more items, optionally followed by

13

* an error or completion.

14

* @param <T> item type

15

*/

16

public interface Multi<T> extends Subscribable<T> {

17

}

18

```

19

20

### Factory Methods - Source Creation

21

22

Create Multi instances from various sources and generators.

23

24

```java { .api }

25

/**

26

* Create empty Multi that completes immediately

27

* @param <T> item type

28

* @return empty Multi

29

*/

30

static <T> Multi<T> empty();

31

32

/**

33

* Create Multi that signals error immediately

34

* @param <T> item type

35

* @param error the error to signal

36

* @return error Multi

37

* @throws NullPointerException if error is null

38

*/

39

static <T> Multi<T> error(Throwable error);

40

41

/**

42

* Create Multi that never completes

43

* @param <T> item type

44

* @return never-completing Multi

45

*/

46

static <T> Multi<T> never();

47

48

/**

49

* Create Multi from varargs items

50

* @param <T> item type

51

* @param items items to emit

52

* @return Multi emitting the items

53

*/

54

static <T> Multi<T> just(T... items);

55

56

/**

57

* Create Multi from collection items

58

* @param <T> item type

59

* @param items collection of items to emit

60

* @return Multi emitting the items

61

*/

62

static <T> Multi<T> just(Collection<? extends T> items);

63

64

/**

65

* Create Multi with single item

66

* @param <T> item type

67

* @param item single item to emit

68

* @return Multi emitting the single item

69

*/

70

static <T> Multi<T> singleton(T item);

71

72

/**

73

* Deferred Multi creation per subscriber

74

* @param <T> item type

75

* @param supplier supplier function called for each subscriber

76

* @return deferred Multi

77

* @throws NullPointerException if supplier is null

78

*/

79

static <T> Multi<T> defer(Supplier<? extends Flow.Publisher<? extends T>> supplier);

80

```

81

82

### Factory Methods - From Existing Sources

83

84

Convert existing data sources to Multi streams.

85

86

```java { .api }

87

/**

88

* Wrap existing Publisher as Multi

89

* @param <T> item type

90

* @param publisher publisher to wrap

91

* @return Multi wrapping the publisher

92

*/

93

static <T> Multi<T> create(Flow.Publisher<? extends T> publisher);

94

95

/**

96

* Convert Single to Multi

97

* @param <T> item type

98

* @param single single to convert

99

* @return Multi from Single

100

*/

101

static <T> Multi<T> create(Single<? extends T> single);

102

103

/**

104

* Convert CompletionStage to Multi

105

* @param <T> item type

106

* @param completionStage completion stage to convert

107

* @return Multi from CompletionStage

108

*/

109

static <T> Multi<T> create(CompletionStage<? extends T> completionStage);

110

111

/**

112

* Convert CompletionStage to Multi with null handling control

113

* @param <T> item type

114

* @param completionStage completion stage to convert

115

* @param nullMeansEmpty if true, null result means empty Multi

116

* @return Multi from CompletionStage

117

*/

118

static <T> Multi<T> create(CompletionStage<? extends T> completionStage, boolean nullMeansEmpty);

119

120

/**

121

* Create Multi from Iterable

122

* @param <T> item type

123

* @param iterable iterable to convert

124

* @return Multi emitting iterable items

125

*/

126

static <T> Multi<T> create(Iterable<? extends T> iterable);

127

128

/**

129

* Create Multi from Stream (closes stream when done)

130

* @param <T> item type

131

* @param stream stream to convert

132

* @return Multi emitting stream items

133

*/

134

static <T> Multi<T> create(Stream<? extends T> stream);

135

```

136

137

### Factory Methods - Sequence Generation

138

139

Generate numeric sequences and timed emissions.

140

141

```java { .api }

142

/**

143

* Generate integer sequence

144

* @param start starting value (inclusive)

145

* @param count number of items to generate

146

* @return Multi emitting integer sequence

147

*/

148

static Multi<Integer> range(int start, int count);

149

150

/**

151

* Generate long sequence

152

* @param start starting value (inclusive)

153

* @param count number of items to generate

154

* @return Multi emitting long sequence

155

*/

156

static Multi<Long> rangeLong(long start, long count);

157

158

/**

159

* Periodic timer sequence

160

* @param period time between emissions

161

* @param unit time unit

162

* @param executor scheduled executor for timing

163

* @return Multi emitting periodic signals

164

*/

165

static Multi<Long> interval(long period, TimeUnit unit, ScheduledExecutorService executor);

166

167

/**

168

* Delayed periodic timer sequence

169

* @param initialDelay initial delay before first emission

170

* @param period time between emissions

171

* @param unit time unit

172

* @param executor scheduled executor for timing

173

* @return Multi emitting delayed periodic signals

174

*/

175

static Multi<Long> interval(long initialDelay, long period, TimeUnit unit, ScheduledExecutorService executor);

176

177

/**

178

* Single delayed emission

179

* @param time delay time

180

* @param unit time unit

181

* @param executor scheduled executor for timing

182

* @return Multi emitting single delayed signal

183

*/

184

static Multi<Long> timer(long time, TimeUnit unit, ScheduledExecutorService executor);

185

```

186

187

### Factory Methods - Composition

188

189

Combine multiple publishers into single streams.

190

191

```java { .api }

192

/**

193

* Concatenate two publishers sequentially

194

* @param <T> item type

195

* @param firstMulti first stream

196

* @param secondMulti second stream

197

* @return concatenated Multi

198

*/

199

static <T> Multi<T> concat(Flow.Publisher<? extends T> firstMulti, Flow.Publisher<? extends T> secondMulti);

200

201

/**

202

* Concatenate multiple publishers sequentially

203

* @param <T> item type

204

* @param firstPublisher first stream

205

* @param secondPublisher second stream

206

* @param morePublishers additional publishers

207

* @return concatenated Multi

208

*/

209

static <T> Multi<T> concat(Flow.Publisher<? extends T> firstPublisher,

210

Flow.Publisher<? extends T> secondPublisher,

211

Flow.Publisher<? extends T>... morePublishers);

212

213

/**

214

* Concatenate array of publishers

215

* @param <T> item type

216

* @param publishers array of publishers to concatenate

217

* @return concatenated Multi

218

*/

219

static <T> Multi<T> concatArray(Flow.Publisher<? extends T>... publishers);

220

```

221

222

### Transformation Operators - Filtering & Selection

223

224

Filter and select items from streams.

225

226

```java { .api }

227

/**

228

* Filter items based on predicate

229

* @param predicate filter predicate

230

* @return filtered Multi

231

* @throws NullPointerException if predicate is null

232

*/

233

Multi<T> filter(Predicate<? super T> predicate);

234

235

/**

236

* Remove duplicate items

237

* @return Multi with duplicates removed

238

*/

239

Multi<T> distinct();

240

241

/**

242

* Take only first N items

243

* @param maxSize maximum number of items

244

* @return limited Multi

245

*/

246

Multi<T> limit(long maxSize);

247

248

/**

249

* Skip first N items

250

* @param count number of items to skip

251

* @return Multi with items skipped

252

*/

253

Multi<T> skip(long count);

254

255

/**

256

* Take items while predicate is true

257

* @param predicate condition predicate

258

* @return Multi taking while condition holds

259

* @throws NullPointerException if predicate is null

260

*/

261

Multi<T> takeWhile(Predicate<? super T> predicate);

262

263

/**

264

* Skip items while predicate is true

265

* @param predicate condition predicate

266

* @return Multi dropping while condition holds

267

* @throws NullPointerException if predicate is null

268

*/

269

Multi<T> dropWhile(Predicate<? super T> predicate);

270

271

/**

272

* Take items until other publisher signals

273

* @param <U> signal type

274

* @param other publisher to signal stop

275

* @return Multi taking until signal

276

* @throws NullPointerException if other is null

277

*/

278

<U> Multi<T> takeUntil(Flow.Publisher<U> other);

279

280

/**

281

* Get first item as Single

282

* @return Single with first item or empty

283

*/

284

Single<T> first();

285

```

286

287

### Transformation Operators - Mapping & Transformation

288

289

Transform and reshape stream items.

290

291

```java { .api }

292

/**

293

* Transform each item

294

* @param <U> result type

295

* @param mapper transformation function

296

* @return transformed Multi

297

* @throws NullPointerException if mapper is null

298

*/

299

<U> Multi<U> map(Function<? super T, ? extends U> mapper);

300

301

/**

302

* Transform and flatten publishers

303

* @param <U> result type

304

* @param mapper function producing publishers

305

* @return flattened Multi

306

* @throws NullPointerException if mapper is null

307

*/

308

<U> Multi<U> flatMap(Function<? super T, ? extends Flow.Publisher<? extends U>> mapper);

309

310

/**

311

* Advanced flatMap with concurrency control

312

* @param <U> result type

313

* @param mapper function producing publishers

314

* @param maxConcurrency maximum concurrent inner publishers

315

* @param delayErrors if true, delay errors until all complete

316

* @param prefetch prefetch amount for inner publishers

317

* @return flattened Multi with concurrency control

318

*/

319

<U> Multi<U> flatMap(Function<? super T, ? extends Flow.Publisher<? extends U>> mapper,

320

long maxConcurrency, boolean delayErrors, long prefetch);

321

322

/**

323

* FlatMap CompletionStages

324

* @param <U> result type

325

* @param mapper function producing CompletionStages

326

* @return flattened Multi from CompletionStages

327

* @throws NullPointerException if mapper is null

328

*/

329

<U> Multi<U> flatMapCompletionStage(Function<? super T, ? extends CompletionStage<? extends U>> mapper);

330

331

/**

332

* FlatMap Iterables

333

* @param <U> result type

334

* @param mapper function producing Iterables

335

* @return flattened Multi from Iterables

336

* @throws NullPointerException if mapper is null

337

*/

338

<U> Multi<U> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> mapper);

339

340

/**

341

* FlatMap Iterables with prefetch control

342

* @param <U> result type

343

* @param mapper function producing Iterables

344

* @param prefetch prefetch amount

345

* @return flattened Multi from Iterables

346

*/

347

<U> Multi<U> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> mapper, int prefetch);

348

349

/**

350

* FlatMap Optionals

351

* @param <U> result type

352

* @param mapper function producing Optionals

353

* @return flattened Multi from Optionals

354

* @throws NullPointerException if mapper is null

355

*/

356

<U> Multi<U> flatMapOptional(Function<? super T, ? extends Optional<? extends U>> mapper);

357

```

358

359

### Error Handling Operators

360

361

Handle errors and implement retry logic.

362

363

```java { .api }

364

/**

365

* Resume with single item on error

366

* @param resumeFunction function providing resume value

367

* @return Multi with error handling

368

* @throws NullPointerException if resumeFunction is null

369

*/

370

Multi<T> onErrorResume(Function<? super Throwable, ? extends T> resumeFunction);

371

372

/**

373

* Resume with publisher on error

374

* @param resumeFunction function providing resume publisher

375

* @return Multi with error handling

376

* @throws NullPointerException if resumeFunction is null

377

*/

378

Multi<T> onErrorResumeWith(Function<? super Throwable, ? extends Flow.Publisher<? extends T>> resumeFunction);

379

380

/**

381

* Retry failed stream N times

382

* @param count number of retries

383

* @return Multi with retry logic

384

*/

385

Multi<T> retry(long count);

386

387

/**

388

* Conditional retry

389

* @param retryPredicate predicate testing if retry should occur

390

* @return Multi with conditional retry

391

* @throws NullPointerException if retryPredicate is null

392

*/

393

Multi<T> retry(BiPredicate<? super Throwable, ? super Long> retryPredicate);

394

395

/**

396

* Advanced retry control

397

* @param <U> signal type

398

* @param whenRetryFunction function controlling retry timing

399

* @return Multi with advanced retry control

400

* @throws NullPointerException if whenRetryFunction is null

401

*/

402

<U> Multi<T> retryWhen(BiFunction<? super Throwable, ? super Long, ? extends Flow.Publisher<U>> whenRetryFunction);

403

```

404

405

### Flow Control & Default Value Operators

406

407

Control stream flow and provide fallback values.

408

409

```java { .api }

410

/**

411

* Provide default item if stream is empty

412

* @param defaultItem default item to emit

413

* @return Multi with default value

414

*/

415

Multi<T> defaultIfEmpty(T defaultItem);

416

417

/**

418

* Provide default item via supplier if stream is empty

419

* @param supplier supplier of default item

420

* @return Multi with default value from supplier

421

* @throws NullPointerException if supplier is null

422

*/

423

Multi<T> defaultIfEmpty(Supplier<? extends T> supplier);

424

425

/**

426

* Switch to alternative publisher if empty

427

* @param other alternative publisher

428

* @return Multi switching to alternative if empty

429

* @throws NullPointerException if other is null

430

*/

431

Multi<T> switchIfEmpty(Flow.Publisher<? extends T> other);

432

433

/**

434

* Execute action if stream is empty

435

* @param emptyAction action to execute

436

* @return Multi executing action if empty

437

* @throws NullPointerException if emptyAction is null

438

*/

439

Multi<T> ifEmpty(Runnable emptyAction);

440

```

441

442

### Completion Handling Operators

443

444

Handle completion events and append additional items.

445

446

```java { .api }

447

/**

448

* Append item after completion

449

* @param resumeValue item to append

450

* @return Multi with appended item

451

*/

452

Multi<T> onCompleteResume(T resumeValue);

453

454

/**

455

* Append publisher after completion

456

* @param resumePublisher publisher to append

457

* @return Multi with appended publisher

458

* @throws NullPointerException if resumePublisher is null

459

*/

460

Multi<T> onCompleteResumeWith(Flow.Publisher<? extends T> resumePublisher);

461

```

462

463

### Side Effect Operators

464

465

Observe and react to stream events without modifying the stream.

466

467

```java { .api }

468

/**

469

* Observe items without modification

470

* @param consumer observer function

471

* @return Multi with side effect

472

* @throws NullPointerException if consumer is null

473

*/

474

Multi<T> peek(Consumer<? super T> consumer);

475

476

/**

477

* Execute on cancellation

478

* @param onCancel action to execute

479

* @return Multi with cancel handler

480

* @throws NullPointerException if onCancel is null

481

*/

482

Multi<T> onCancel(Runnable onCancel);

483

484

/**

485

* Execute on completion

486

* @param onComplete action to execute

487

* @return Multi with completion handler

488

* @throws NullPointerException if onComplete is null

489

*/

490

Multi<T> onComplete(Runnable onComplete);

491

492

/**

493

* Execute on error

494

* @param onError action to execute

495

* @return Multi with error handler

496

* @throws NullPointerException if onError is null

497

*/

498

Multi<T> onError(Consumer<? super Throwable> onError);

499

500

/**

501

* Execute on any termination (complete/error/cancel)

502

* @param onTerminate action to execute

503

* @return Multi with termination handler

504

* @throws NullPointerException if onTerminate is null

505

*/

506

Multi<T> onTerminate(Runnable onTerminate);

507

```

508

509

### Threading & Timing Operators

510

511

Control execution context and implement timeout behavior.

512

513

```java { .api }

514

/**

515

* Switch execution context

516

* @param executor executor for downstream operations

517

* @return Multi executing on specified executor

518

* @throws NullPointerException if executor is null

519

*/

520

Multi<T> observeOn(Executor executor);

521

522

/**

523

* Advanced threading control

524

* @param executor executor for downstream operations

525

* @param bufferSize buffer size for context switching

526

* @param delayErrors if true, delay errors until buffer is drained

527

* @return Multi with advanced threading control

528

*/

529

Multi<T> observeOn(Executor executor, int bufferSize, boolean delayErrors);

530

531

/**

532

* Timeout with error

533

* @param timeout timeout duration

534

* @param unit time unit

535

* @param executor scheduled executor for timeout

536

* @return Multi with timeout

537

* @throws NullPointerException if unit or executor is null

538

*/

539

Multi<T> timeout(long timeout, TimeUnit unit, ScheduledExecutorService executor);

540

541

/**

542

* Timeout with fallback

543

* @param timeout timeout duration

544

* @param unit time unit

545

* @param executor scheduled executor for timeout

546

* @param fallback fallback publisher

547

* @return Multi with timeout and fallback

548

* @throws NullPointerException if unit, executor, or fallback is null

549

*/

550

Multi<T> timeout(long timeout, TimeUnit unit, ScheduledExecutorService executor, Flow.Publisher<? extends T> fallback);

551

```

552

553

### Utility Operators

554

555

Debug, compose, and transform streams.

556

557

```java { .api }

558

/**

559

* Log all reactive signals

560

* @return Multi with logging

561

*/

562

Multi<T> log();

563

564

/**

565

* Log with specific level

566

* @param level logging level

567

* @return Multi with logging at level

568

* @throws NullPointerException if level is null

569

*/

570

Multi<T> log(Level level);

571

572

/**

573

* Log with custom logger

574

* @param level logging level

575

* @param loggerName logger name

576

* @return Multi with custom logging

577

* @throws NullPointerException if level or loggerName is null

578

*/

579

Multi<T> log(Level level, String loggerName);

580

581

/**

582

* Log with trace information

583

* @param level logging level

584

* @param trace include trace information

585

* @return Multi with trace logging

586

* @throws NullPointerException if level is null

587

*/

588

Multi<T> log(Level level, boolean trace);

589

590

/**

591

* Apply custom composition

592

* @param <U> result type

593

* @param composer composition function

594

* @return result of composition

595

* @throws NullPointerException if composer is null

596

*/

597

<U> U compose(Function<? super Multi<T>, ? extends U> composer);

598

599

/**

600

* Terminal transformation

601

* @param <U> result type

602

* @param converter conversion function

603

* @return converted result

604

* @throws NullPointerException if converter is null

605

*/

606

<U> U to(Function<? super Multi<T>, ? extends U> converter);

607

```

608

609

## Collection & Reduction Operations

610

611

Collect stream items into data structures or reduce to single values.

612

613

### Collector Operations

614

615

```java { .api }

616

/**

617

* Collect using custom collector

618

* @param <A> accumulator type

619

* @param <R> result type

620

* @param collector collector to use

621

* @return Single with collected result

622

* @throws NullPointerException if collector is null

623

*/

624

<A, R> Single<R> collect(Collector<? super T, A, R> collector);

625

626

/**

627

* Collect with supplier and accumulator

628

* @param <R> result type

629

* @param supplier supplier of collection container

630

* @param accumulator accumulator function

631

* @return Single with collected result

632

* @throws NullPointerException if supplier or accumulator is null

633

*/

634

<R> Single<R> collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator);

635

636

/**

637

* Collect to List

638

* @return Single containing List of all items

639

*/

640

Single<List<T>> collectList();

641

642

/**

643

* Use Java Stream collectors

644

* @param <A> accumulator type

645

* @param <R> result type

646

* @param collector Java Stream collector

647

* @return Single with collected result

648

* @throws NullPointerException if collector is null

649

*/

650

<A, R> Single<R> collectStream(java.util.stream.Collector<? super T, A, R> collector);

651

```

652

653

### Reduction Operations

654

655

```java { .api }

656

/**

657

* Reduce to single value

658

* @param accumulator reduction function

659

* @return Single with reduced result or empty

660

* @throws NullPointerException if accumulator is null

661

*/

662

Single<T> reduce(BinaryOperator<T> accumulator);

663

664

/**

665

* Reduce with initial value

666

* @param <U> result type

667

* @param identity initial value

668

* @param accumulator reduction function

669

* @return Single with reduced result

670

* @throws NullPointerException if accumulator is null

671

*/

672

<U> Single<U> reduce(U identity, BiFunction<U, ? super T, U> accumulator);

673

```

674

675

## Terminal Operations

676

677

Final operations that trigger stream execution and consumption.

678

679

### Consumption Operations

680

681

```java { .api }

682

/**

683

* Consume all items (no backpressure)

684

* @param consumer item consumer

685

* @throws NullPointerException if consumer is null

686

*/

687

void forEach(Consumer<? super T> consumer);

688

689

/**

690

* Sequential async consumption

691

* @param mapper function producing CompletionStages

692

* @return CompletionStage completing when all items processed

693

* @throws NullPointerException if mapper is null

694

*/

695

CompletionStage<Void> forEachCompletionStage(Function<? super T, ? extends CompletionStage<Void>> mapper);

696

697

/**

698

* Ignore all items, complete when stream terminates

699

* @return Single that completes when stream terminates

700

*/

701

Single<Void> ignoreElements();

702

```

703

704

## Usage Examples

705

706

### Basic Stream Processing

707

708

```java

709

import io.helidon.common.reactive.Multi;

710

711

// Create and process stream

712

Multi<String> processed = Multi.just("apple", "banana", "cherry")

713

.filter(fruit -> fruit.length() > 5)

714

.map(String::toUpperCase);

715

716

processed.forEach(System.out::println); // Prints: BANANA, CHERRY

717

```

718

719

### Error Handling

720

721

```java

722

Multi<Integer> withErrorHandling = Multi.just(1, 2, 0, 4)

723

.map(n -> 10 / n) // Will throw ArithmeticException for 0

724

.onErrorResumeWith(error -> Multi.just(-1)); // Resume with -1

725

726

List<Integer> result = withErrorHandling.collectList().await();

727

// Result: [10, 5, -1]

728

```

729

730

### Retry Logic

731

732

```java

733

Multi<String> withRetry = Multi.error(new RuntimeException("Network error"))

734

.retry(3) // Retry 3 times

735

.onErrorResumeWith(error -> Multi.just("Fallback value"));

736

737

String result = withRetry.first().await();

738

// Result: "Fallback value" (after 3 retries)

739

```

740

741

### Async Processing

742

743

```java

744

Multi<CompletionStage<String>> asyncTasks = Multi.range(1, 5)

745

.map(i -> CompletableFuture.supplyAsync(() -> "Task " + i));

746

747

// Process async tasks sequentially

748

CompletionStage<Void> completion = Multi.create(asyncTasks)

749

.flatMapCompletionStage(Function.identity())

750

.forEachCompletionStage(result -> {

751

System.out.println(result);

752

return CompletableFuture.completedFuture(null);

753

});

754

```