or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdio-integration.mdmulti.mdsingle.mdsupporting-types.md

supporting-types.mddocs/

0

# Supporting Types

1

2

Core interfaces and utility classes that support reactive operations including subscription management, awaitable operations, retry strategies, and completion handling.

3

4

## Capabilities

5

6

### Subscribable Interface

7

8

Base interface providing functional subscription methods and common reactive operations for both Multi and Single.

9

10

```java { .api }

11

/**

12

* Base interface providing functional subscription methods and common reactive operations

13

* @param <T> item type

14

*/

15

public interface Subscribable<T> extends Flow.Publisher<T> {

16

}

17

```

18

19

#### Functional Subscription Methods

20

21

Convenient subscription methods that use functional interfaces instead of requiring full Subscriber implementations.

22

23

```java { .api }

24

/**

25

* Subscribe with onNext only

26

* @param onNext consumer for items

27

* @throws NullPointerException if onNext is null

28

*/

29

void subscribe(Consumer<? super T> onNext);

30

31

/**

32

* Subscribe with onNext and onError

33

* @param onNext consumer for items

34

* @param onError consumer for errors

35

* @throws NullPointerException if onNext or onError is null

36

*/

37

void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);

38

39

/**

40

* Subscribe with onNext, onError, and onComplete

41

* @param onNext consumer for items

42

* @param onError consumer for errors

43

* @param onComplete action for completion

44

* @throws NullPointerException if any parameter is null

45

*/

46

void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Runnable onComplete);

47

48

/**

49

* Subscribe with full control over subscription

50

* @param onNext consumer for items

51

* @param onError consumer for errors

52

* @param onComplete action for completion

53

* @param onSubscribe consumer for subscription

54

* @throws NullPointerException if any parameter is null

55

*/

56

void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,

57

Runnable onComplete, Consumer<? super Flow.Subscription> onSubscribe);

58

```

59

60

### Awaitable Interface

61

62

Provides convenient blocking operations for CompletionStage-based types, allowing reactive types to be used in synchronous contexts.

63

64

```java { .api }

65

/**

66

* Provides convenient blocking operations for CompletionStage-based types

67

* @param <T> result type

68

*/

69

public interface Awaitable<T> {

70

}

71

```

72

73

#### Core Conversion Method

74

75

```java { .api }

76

/**

77

* Convert to CompletableFuture for compatibility

78

* @return CompletableFuture representation

79

*/

80

CompletableFuture<T> toCompletableFuture();

81

```

82

83

#### Blocking Operations

84

85

```java { .api }

86

/**

87

* Block until completion (unchecked exceptions only)

88

* Checked exceptions are wrapped in RuntimeException

89

* @return the result value

90

* @throws RuntimeException for any checked exceptions or timeout

91

*/

92

T await();

93

94

/**

95

* Block with timeout (unchecked exceptions)

96

* @param timeout timeout duration

97

* @return the result value

98

* @throws RuntimeException for timeout or checked exceptions

99

* @throws NullPointerException if timeout is null

100

*/

101

T await(Duration timeout);

102

103

/**

104

* Block with timeout using TimeUnit (deprecated)

105

* @param timeout timeout value

106

* @param unit time unit

107

* @return the result value

108

* @throws RuntimeException for timeout or checked exceptions

109

* @throws NullPointerException if unit is null

110

* @deprecated Use await(Duration) instead

111

*/

112

@Deprecated

113

T await(long timeout, TimeUnit unit);

114

```

115

116

### CompletionAwaitable Class

117

118

CompletionStage wrapper that also implements Awaitable for convenient blocking operations and enhanced chaining.

119

120

```java { .api }

121

/**

122

* CompletionStage wrapper that also implements Awaitable for convenient blocking

123

* @param <T> result type

124

*/

125

public final class CompletionAwaitable<T> implements CompletionStage<T>, Awaitable<T> {

126

}

127

```

128

129

#### Enhanced CompletionStage Methods

130

131

All standard CompletionStage methods return CompletionAwaitable for seamless chaining.

132

133

```java { .api }

134

/**

135

* All standard CompletionStage methods enhanced to return CompletionAwaitable

136

*/

137

<U> CompletionAwaitable<U> thenApply(Function<? super T, ? extends U> fn);

138

CompletionAwaitable<Void> thenAccept(Consumer<? super T> action);

139

CompletionAwaitable<Void> thenRun(Runnable action);

140

141

<U> CompletionAwaitable<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);

142

143

<U, V> CompletionAwaitable<V> thenCombine(CompletionStage<? extends U> other,

144

BiFunction<? super T, ? super U, ? extends V> fn);

145

<U> CompletionAwaitable<Void> thenAcceptBoth(CompletionStage<? extends U> other,

146

BiConsumer<? super T, ? super U> action);

147

CompletionAwaitable<Void> runAfterBoth(CompletionStage<?> other, Runnable action);

148

149

<U> CompletionAwaitable<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);

150

CompletionAwaitable<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);

151

CompletionAwaitable<Void> runAfterEither(CompletionStage<?> other, Runnable action);

152

153

CompletionAwaitable<T> exceptionally(Function<Throwable, ? extends T> fn);

154

CompletionAwaitable<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);

155

<U> CompletionAwaitable<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

156

```

157

158

#### Additional Error Handling

159

160

```java { .api }

161

/**

162

* Handle exceptions with consumer (does not transform result)

163

* @param exceptionConsumer consumer for exceptions

164

* @return CompletionAwaitable with exception handling

165

* @throws NullPointerException if exceptionConsumer is null

166

*/

167

CompletionAwaitable<T> exceptionallyAccept(Consumer<? super Throwable> exceptionConsumer);

168

```

169

170

### OptionalCompletionStage Interface

171

172

CompletionStage variant optimized for Optional values with convenient empty handling methods.

173

174

```java { .api }

175

/**

176

* CompletionStage variant optimized for Optional values with convenient empty handling

177

* @param <T> wrapped type

178

*/

179

public interface OptionalCompletionStage<T> extends CompletionStage<Optional<T>> {

180

}

181

```

182

183

#### Optional-Specific Methods

184

185

```java { .api }

186

/**

187

* Execute action when Optional is empty

188

* @param emptyAction action to execute for empty Optional

189

* @return same OptionalCompletionStage for chaining

190

* @throws NullPointerException if emptyAction is null

191

*/

192

OptionalCompletionStage<T> onEmpty(Runnable emptyAction);

193

194

/**

195

* Execute action when Optional has value

196

* @param valueConsumer consumer for present values

197

* @return same OptionalCompletionStage for chaining

198

* @throws NullPointerException if valueConsumer is null

199

*/

200

OptionalCompletionStage<T> onValue(Consumer<? super T> valueConsumer);

201

```

202

203

#### Factory Method

204

205

```java { .api }

206

/**

207

* Create OptionalCompletionStage from existing CompletionStage

208

* @param <T> value type

209

* @param stage completion stage containing Optional

210

* @return OptionalCompletionStage wrapper

211

* @throws NullPointerException if stage is null

212

*/

213

static <T> OptionalCompletionStage<T> create(CompletionStage<Optional<T>> stage);

214

```

215

216

### Collector Interface

217

218

Simple collector interface for accumulating stream items into custom data structures.

219

220

```java { .api }

221

/**

222

* Simple collector interface for accumulating stream items

223

* @param <T> item type

224

* @param <U> result type

225

*/

226

public interface Collector<T, U> {

227

}

228

```

229

230

#### Collector Methods

231

232

```java { .api }

233

/**

234

* Add item to collection

235

* @param item item to collect

236

*/

237

void collect(T item);

238

239

/**

240

* Get final collected result

241

* @return collected result

242

*/

243

U value();

244

```

245

246

### RetrySchema Interface

247

248

Defines retry delay strategies for polling operations and retry logic, providing flexible backoff algorithms.

249

250

```java { .api }

251

/**

252

* Defines retry delay strategies for polling operations and retry logic

253

*/

254

@FunctionalInterface

255

public interface RetrySchema {

256

}

257

```

258

259

#### Core Method

260

261

```java { .api }

262

/**

263

* Calculate next retry delay in milliseconds

264

* @param retryCount current retry attempt (0-based)

265

* @param lastDelay previous delay in milliseconds

266

* @return next delay in milliseconds

267

*/

268

long nextDelay(int retryCount, long lastDelay);

269

```

270

271

#### Static Factory Methods

272

273

```java { .api }

274

/**

275

* Always return same delay

276

* @param delay constant delay in milliseconds

277

* @return RetrySchema with constant delay

278

*/

279

static RetrySchema constant(long delay);

280

281

/**

282

* Linear backoff with maximum limit

283

* @param firstDelay initial delay in milliseconds

284

* @param increment delay increment per retry in milliseconds

285

* @param maxDelay maximum delay in milliseconds

286

* @return RetrySchema with linear backoff

287

*/

288

static RetrySchema linear(long firstDelay, long increment, long maxDelay);

289

290

/**

291

* Exponential backoff with maximum limit

292

* @param firstDelay initial delay in milliseconds

293

* @param ratio multiplication ratio for exponential growth

294

* @param maxDelay maximum delay in milliseconds

295

* @return RetrySchema with exponential backoff

296

*/

297

static RetrySchema geometric(long firstDelay, double ratio, long maxDelay);

298

```

299

300

## Usage Examples

301

302

### Functional Subscription

303

304

```java

305

import io.helidon.common.reactive.Multi;

306

import io.helidon.common.reactive.Subscribable;

307

308

Multi<String> data = Multi.just("apple", "banana", "cherry");

309

310

// Simple subscription with just onNext

311

data.subscribe(System.out::println);

312

313

// Subscription with error handling

314

data.subscribe(

315

item -> System.out.println("Received: " + item),

316

error -> System.err.println("Error: " + error.getMessage())

317

);

318

319

// Full subscription control

320

data.subscribe(

321

item -> System.out.println("Item: " + item),

322

error -> System.err.println("Error: " + error),

323

() -> System.out.println("Completed"),

324

subscription -> {

325

System.out.println("Subscribed, requesting all");

326

subscription.request(Long.MAX_VALUE);

327

}

328

);

329

```

330

331

### Awaitable Operations

332

333

```java

334

import io.helidon.common.reactive.Single;

335

import io.helidon.common.reactive.Awaitable;

336

import java.time.Duration;

337

import java.util.concurrent.CompletableFuture;

338

339

Single<String> asyncValue = Single.create(

340

CompletableFuture.supplyAsync(() -> {

341

try { Thread.sleep(1000); } catch (InterruptedException e) {}

342

return "Async result";

343

})

344

);

345

346

// Block until completion

347

String result = asyncValue.await();

348

System.out.println(result); // "Async result"

349

350

// Block with timeout

351

try {

352

String quickResult = asyncValue.await(Duration.ofMillis(500));

353

System.out.println(quickResult);

354

} catch (RuntimeException e) {

355

System.out.println("Timeout occurred");

356

}

357

358

// Convert to CompletableFuture for integration

359

CompletableFuture<String> future = asyncValue.toCompletableFuture();

360

future.thenAccept(System.out::println);

361

```

362

363

### CompletionAwaitable Chaining

364

365

```java

366

import io.helidon.common.reactive.Single;

367

import io.helidon.common.reactive.CompletionAwaitable;

368

369

Single<Integer> number = Single.just(42);

370

371

// Chain operations with enhanced CompletionAwaitable

372

CompletionAwaitable<String> result = number

373

.thenApply(n -> n * 2)

374

.thenApply(n -> "Result: " + n)

375

.exceptionallyAccept(error -> System.err.println("Error: " + error));

376

377

String finalResult = result.await();

378

System.out.println(finalResult); // "Result: 84"

379

```

380

381

### OptionalCompletionStage Usage

382

383

```java

384

import io.helidon.common.reactive.Single;

385

import io.helidon.common.reactive.OptionalCompletionStage;

386

import java.util.Optional;

387

import java.util.concurrent.CompletableFuture;

388

389

CompletableFuture<Optional<String>> optionalFuture =

390

CompletableFuture.completedFuture(Optional.of("Present value"));

391

392

OptionalCompletionStage<String> optionalStage =

393

OptionalCompletionStage.create(optionalFuture);

394

395

optionalStage

396

.onValue(value -> System.out.println("Got value: " + value))

397

.onEmpty(() -> System.out.println("No value present"));

398

399

// With empty Optional

400

CompletableFuture<Optional<String>> emptyFuture =

401

CompletableFuture.completedFuture(Optional.empty());

402

403

OptionalCompletionStage.create(emptyFuture)

404

.onValue(value -> System.out.println("Got: " + value))

405

.onEmpty(() -> System.out.println("Empty result")); // This will execute

406

```

407

408

### Custom Collectors

409

410

```java

411

import io.helidon.common.reactive.Multi;

412

import io.helidon.common.reactive.Collector;

413

import java.util.ArrayList;

414

import java.util.List;

415

416

// Custom collector that only collects even numbers

417

class EvenNumberCollector implements Collector<Integer, List<Integer>> {

418

private final List<Integer> evenNumbers = new ArrayList<>();

419

420

@Override

421

public void collect(Integer item) {

422

if (item % 2 == 0) {

423

evenNumbers.add(item);

424

}

425

}

426

427

@Override

428

public List<Integer> value() {

429

return new ArrayList<>(evenNumbers);

430

}

431

}

432

433

Multi<Integer> numbers = Multi.range(1, 10);

434

List<Integer> evenNumbers = numbers.collect(new EvenNumberCollector()).await();

435

System.out.println(evenNumbers); // [2, 4, 6, 8, 10]

436

```

437

438

### RetrySchema Examples

439

440

```java

441

import io.helidon.common.reactive.Multi;

442

import io.helidon.common.reactive.RetrySchema;

443

import java.util.concurrent.atomic.AtomicInteger;

444

445

// Constant delay retry

446

RetrySchema constantRetry = RetrySchema.constant(1000); // 1 second delay

447

448

// Linear backoff retry

449

RetrySchema linearRetry = RetrySchema.linear(100, 200, 2000);

450

// Delays: 100ms, 300ms, 500ms, 700ms, 900ms, 1100ms, 1300ms, 1500ms, 1700ms, 1900ms, 2000ms (max)

451

452

// Exponential backoff retry

453

RetrySchema exponentialRetry = RetrySchema.geometric(100, 2.0, 5000);

454

// Delays: 100ms, 200ms, 400ms, 800ms, 1600ms, 3200ms, 5000ms (max)

455

456

// Using retry schema with Multi

457

AtomicInteger attempts = new AtomicInteger(0);

458

459

Multi<String> flakyOperation = Multi.defer(() -> {

460

int attempt = attempts.incrementAndGet();

461

if (attempt < 3) {

462

return Multi.error(new RuntimeException("Attempt " + attempt + " failed"));

463

}

464

return Multi.just("Success on attempt " + attempt);

465

});

466

467

// Retry with exponential backoff

468

String result = flakyOperation

469

.retryWhen((error, retryCount) -> {

470

long delay = exponentialRetry.nextDelay(retryCount.intValue(), 0);

471

System.out.println("Retry " + retryCount + " after " + delay + "ms");

472

return Multi.timer(delay, TimeUnit.MILLISECONDS, scheduler);

473

})

474

.first()

475

.await();

476

477

System.out.println(result); // "Success on attempt 3"

478

```

479

480

### Advanced Subscription Management

481

482

```java

483

import io.helidon.common.reactive.Multi;

484

import java.util.concurrent.Flow;

485

import java.util.concurrent.atomic.AtomicReference;

486

487

Multi<Integer> stream = Multi.range(1, 1000000); // Large stream

488

489

AtomicReference<Flow.Subscription> subscriptionRef = new AtomicReference<>();

490

491

stream.subscribe(

492

item -> {

493

System.out.println("Processing: " + item);

494

495

// Simulate slow processing

496

try { Thread.sleep(100); } catch (InterruptedException e) {}

497

498

// Request next item (backpressure control)

499

Flow.Subscription subscription = subscriptionRef.get();

500

if (subscription != null) {

501

subscription.request(1);

502

}

503

},

504

error -> System.err.println("Stream error: " + error),

505

() -> System.out.println("Stream completed"),

506

subscription -> {

507

subscriptionRef.set(subscription);

508

// Start with requesting just one item

509

subscription.request(1);

510

}

511

);

512

```

513

514

### Error Recovery Patterns

515

516

```java

517

import io.helidon.common.reactive.Single;

518

import io.helidon.common.reactive.CompletionAwaitable;

519

520

Single<String> unreliableService = Single.error(new RuntimeException("Service down"));

521

522

// Chain multiple fallback strategies

523

CompletionAwaitable<String> robustCall = unreliableService

524

.exceptionally(error -> {

525

System.out.println("Primary service failed: " + error.getMessage());

526

throw new RuntimeException("Fallback to secondary service");

527

})

528

.exceptionally(error -> {

529

System.out.println("Secondary service failed: " + error.getMessage());

530

return "Cached response";

531

})

532

.exceptionallyAccept(error -> {

533

// Log any remaining errors without changing the result

534

System.err.println("Final error handler: " + error.getMessage());

535

});

536

537

String result = robustCall.await();

538

System.out.println("Final result: " + result); // "Cached response"

539

```