or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdio-integration.mdmulti.mdsingle.mdsupporting-types.md

io-integration.mddocs/

0

# I/O Integration

1

2

Reactive I/O utilities for integrating with Java I/O streams, channels, and file operations. Enables reactive processing of I/O data with proper backpressure handling and asynchronous execution.

3

4

## Capabilities

5

6

### IoMulti Utility Class

7

8

Factory class providing I/O integration utilities for creating reactive streams from I/O sources.

9

10

```java { .api }

11

/**

12

* Factory class for I/O integration utilities

13

*/

14

public final class IoMulti {

15

}

16

```

17

18

## InputStream Integration

19

20

Convert InputStreams to reactive Multi streams with configurable buffering and execution.

21

22

### Basic InputStream to Multi

23

24

```java { .api }

25

/**

26

* Create Multi<ByteBuffer> from InputStream

27

* @param inputStream input stream to read from

28

* @return Multi emitting ByteBuffers from stream

29

* @throws NullPointerException if inputStream is null

30

*/

31

static Multi<ByteBuffer> multiFromStream(InputStream inputStream);

32

33

/**

34

* Advanced builder for InputStream to Multi conversion

35

* @param inputStream input stream to read from

36

* @return builder for configuration

37

* @throws NullPointerException if inputStream is null

38

*/

39

static MultiFromInputStreamBuilder multiFromStreamBuilder(InputStream inputStream);

40

```

41

42

### MultiFromInputStreamBuilder Configuration

43

44

```java { .api }

45

/**

46

* Builder for advanced InputStream to Multi configuration

47

*/

48

public static final class MultiFromInputStreamBuilder {

49

/**

50

* Set buffer size for reads

51

* @param size buffer size in bytes

52

* @return builder for chaining

53

*/

54

MultiFromInputStreamBuilder byteBufferSize(int size);

55

56

/**

57

* Set executor for blocking reads

58

* @param executor executor service for I/O operations

59

* @return builder for chaining

60

* @throws NullPointerException if executor is null

61

*/

62

MultiFromInputStreamBuilder executor(ExecutorService executor);

63

64

/**

65

* Build the configured Multi

66

* @return Multi<ByteBuffer> from InputStream

67

*/

68

Multi<ByteBuffer> build();

69

}

70

```

71

72

## OutputStream Integration

73

74

Create reactive OutputStreams that publish written data as Multi streams.

75

76

### Basic OutputStream Multi

77

78

```java { .api }

79

/**

80

* Create OutputStream that publishes written data as Multi<ByteBuffer>

81

* @return OutputStreamMulti instance

82

*/

83

static OutputStreamMulti outputStreamMulti();

84

85

/**

86

* Advanced builder for OutputStream Multi creation

87

* @return builder for configuration

88

*/

89

static OutputStreamMultiBuilder outputStreamMultiBuilder();

90

```

91

92

### OutputStreamMultiBuilder Configuration

93

94

```java { .api }

95

/**

96

* Builder for advanced OutputStream Multi configuration

97

*/

98

public static final class OutputStreamMultiBuilder {

99

/**

100

* Set write timeout when no downstream demand

101

* @param timeout timeout duration

102

* @return builder for chaining

103

* @throws NullPointerException if timeout is null

104

*/

105

OutputStreamMultiBuilder timeout(Duration timeout);

106

107

/**

108

* Set callback for demand notifications

109

* @param onRequest callback receiving (requested, current_demand)

110

* @return builder for chaining

111

* @throws NullPointerException if onRequest is null

112

*/

113

OutputStreamMultiBuilder onRequest(BiConsumer<Long, Long> onRequest);

114

115

/**

116

* Build the configured OutputStreamMulti

117

* @return OutputStreamMulti instance

118

*/

119

OutputStreamMulti build();

120

}

121

```

122

123

### OutputStreamMulti Class

124

125

```java { .api }

126

/**

127

* OutputStream implementation that publishes written data as reactive stream

128

*/

129

public final class OutputStreamMulti extends OutputStream implements Multi<ByteBuffer> {

130

/**

131

* Standard OutputStream write methods

132

*/

133

@Override

134

void write(int b) throws IOException;

135

136

@Override

137

void write(byte[] b) throws IOException;

138

139

@Override

140

void write(byte[] b, int off, int len) throws IOException;

141

142

@Override

143

void flush() throws IOException;

144

145

@Override

146

void close() throws IOException;

147

}

148

```

149

150

## ByteChannel Integration

151

152

Reactive integration with Java NIO ByteChannels for both reading and writing operations.

153

154

### Reading from ByteChannels

155

156

```java { .api }

157

/**

158

* Create Multi from ReadableByteChannel

159

* @param channel readable byte channel

160

* @return Multi<ByteBuffer> from channel

161

* @throws NullPointerException if channel is null

162

*/

163

static Multi<ByteBuffer> multiFromByteChannel(ReadableByteChannel channel);

164

165

/**

166

* Advanced builder for ByteChannel reading

167

* @param channel readable byte channel

168

* @return builder for configuration

169

* @throws NullPointerException if channel is null

170

*/

171

static MultiFromByteChannelBuilder multiFromByteChannelBuilder(ReadableByteChannel channel);

172

```

173

174

### MultiFromByteChannelBuilder Configuration

175

176

```java { .api }

177

/**

178

* Builder for advanced ByteChannel reading configuration

179

*/

180

public static final class MultiFromByteChannelBuilder {

181

/**

182

* Set executor for async reads

183

* @param executor scheduled executor service

184

* @return builder for chaining

185

* @throws NullPointerException if executor is null

186

*/

187

MultiFromByteChannelBuilder executor(ScheduledExecutorService executor);

188

189

/**

190

* Set retry delays for failed reads

191

* @param retrySchema retry delay strategy

192

* @return builder for chaining

193

* @throws NullPointerException if retrySchema is null

194

*/

195

MultiFromByteChannelBuilder retrySchema(RetrySchema retrySchema);

196

197

/**

198

* Set read buffer size

199

* @param capacity buffer capacity in bytes

200

* @return builder for chaining

201

*/

202

MultiFromByteChannelBuilder bufferCapacity(int capacity);

203

204

/**

205

* Build the configured Multi

206

* @return Multi<ByteBuffer> from channel

207

*/

208

Multi<ByteBuffer> build();

209

}

210

```

211

212

### Writing to ByteChannels

213

214

```java { .api }

215

/**

216

* Create function to write Multi<ByteBuffer> to WritableByteChannel

217

* @param channel writable byte channel

218

* @return function that writes Multi data to channel

219

* @throws NullPointerException if channel is null

220

*/

221

static Function<Multi<ByteBuffer>, CompletionStage<Void>> multiToByteChannel(WritableByteChannel channel);

222

223

/**

224

* Advanced builder for ByteChannel writing

225

* @param channel writable byte channel

226

* @return builder for configuration

227

* @throws NullPointerException if channel is null

228

*/

229

static MultiToByteChannelBuilder multiToByteChannelBuilder(WritableByteChannel channel);

230

231

/**

232

* Convenience method for writing Multi to file

233

* @param path file path to write to

234

* @return function that writes Multi data to file

235

* @throws NullPointerException if path is null

236

*/

237

static Function<Multi<ByteBuffer>, CompletionStage<Void>> writeToFile(Path path);

238

```

239

240

### MultiToByteChannelBuilder Configuration

241

242

```java { .api }

243

/**

244

* Builder for advanced ByteChannel writing configuration

245

*/

246

public static final class MultiToByteChannelBuilder {

247

/**

248

* Set executor for blocking writes

249

* @param executor executor for I/O operations

250

* @return builder for chaining

251

* @throws NullPointerException if executor is null

252

*/

253

MultiToByteChannelBuilder executor(Executor executor);

254

255

/**

256

* Build the configured write function

257

* @return function to write Multi<ByteBuffer> to channel

258

*/

259

Function<Multi<ByteBuffer>, CompletionStage<Void>> build();

260

}

261

```

262

263

## Usage Examples

264

265

### Reading Files Reactively

266

267

```java

268

import io.helidon.common.reactive.IoMulti;

269

import io.helidon.common.reactive.Multi;

270

import java.io.FileInputStream;

271

import java.io.IOException;

272

import java.nio.ByteBuffer;

273

import java.util.concurrent.Executors;

274

275

// Basic file reading

276

try (FileInputStream fis = new FileInputStream("data.txt")) {

277

Multi<ByteBuffer> fileData = IoMulti.multiFromStream(fis);

278

279

// Process data

280

fileData

281

.map(buffer -> {

282

byte[] bytes = new byte[buffer.remaining()];

283

buffer.get(bytes);

284

return new String(bytes);

285

})

286

.forEach(System.out::println);

287

} catch (IOException e) {

288

e.printStackTrace();

289

}

290

291

// Advanced file reading with custom buffer size

292

try (FileInputStream fis = new FileInputStream("large-file.dat")) {

293

Multi<ByteBuffer> fileData = IoMulti.multiFromStreamBuilder(fis)

294

.byteBufferSize(8192) // 8KB buffer

295

.executor(Executors.newCachedThreadPool())

296

.build();

297

298

long totalBytes = fileData

299

.map(ByteBuffer::remaining)

300

.map(Integer::longValue)

301

.reduce(0L, Long::sum)

302

.await();

303

304

System.out.println("Total bytes read: " + totalBytes);

305

} catch (IOException e) {

306

e.printStackTrace();

307

}

308

```

309

310

### Writing Data Reactively

311

312

```java

313

import io.helidon.common.reactive.IoMulti;

314

import io.helidon.common.reactive.Multi;

315

import java.nio.ByteBuffer;

316

import java.time.Duration;

317

318

// Create reactive OutputStream

319

IoMulti.OutputStreamMulti outputStream = IoMulti.outputStreamMultiBuilder()

320

.timeout(Duration.ofSeconds(5))

321

.onRequest((requested, totalDemand) ->

322

System.out.println("Requested: " + requested + ", Total demand: " + totalDemand))

323

.build();

324

325

// Write data to the stream from another thread

326

new Thread(() -> {

327

try {

328

outputStream.write("Hello ".getBytes());

329

outputStream.write("reactive ".getBytes());

330

outputStream.write("world!".getBytes());

331

outputStream.close();

332

} catch (Exception e) {

333

e.printStackTrace();

334

}

335

}).start();

336

337

// Read the written data reactively

338

outputStream

339

.map(buffer -> {

340

byte[] bytes = new byte[buffer.remaining()];

341

buffer.get(bytes);

342

return new String(bytes);

343

})

344

.forEach(System.out::print); // Prints: Hello reactive world!

345

```

346

347

### ByteChannel Operations

348

349

```java

350

import io.helidon.common.reactive.IoMulti;

351

import io.helidon.common.reactive.Multi;

352

import io.helidon.common.reactive.RetrySchema;

353

import java.nio.channels.FileChannel;

354

import java.nio.file.Path;

355

import java.nio.file.StandardOpenOption;

356

import java.util.concurrent.Executors;

357

import java.util.concurrent.ScheduledExecutorService;

358

359

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

360

361

// Reading from channel with retry

362

try (FileChannel readChannel = FileChannel.open(Path.of("input.txt"), StandardOpenOption.READ)) {

363

Multi<ByteBuffer> data = IoMulti.multiFromByteChannelBuilder(readChannel)

364

.executor(executor)

365

.retrySchema(RetrySchema.linear(100, 50, 1000)) // Linear backoff

366

.bufferCapacity(4096)

367

.build();

368

369

// Process and write to another file

370

Function<Multi<ByteBuffer>, CompletionStage<Void>> writer =

371

IoMulti.writeToFile(Path.of("output.txt"));

372

373

CompletionStage<Void> completion = data

374

.map(buffer -> {

375

// Transform data (e.g., uppercase text)

376

byte[] bytes = new byte[buffer.remaining()];

377

buffer.get(bytes);

378

String text = new String(bytes).toUpperCase();

379

return ByteBuffer.wrap(text.getBytes());

380

})

381

.to(writer);

382

383

completion.toCompletableFuture().join(); // Wait for completion

384

System.out.println("File processing completed");

385

} catch (Exception e) {

386

e.printStackTrace();

387

} finally {

388

executor.shutdown();

389

}

390

```

391

392

### Piping Data Between Streams

393

394

```java

395

import io.helidon.common.reactive.IoMulti;

396

import io.helidon.common.reactive.Multi;

397

import java.io.FileInputStream;

398

import java.io.FileOutputStream;

399

import java.nio.ByteBuffer;

400

401

// Reactive file copy with transformation

402

try (FileInputStream input = new FileInputStream("source.txt");

403

FileOutputStream output = new FileOutputStream("destination.txt")) {

404

405

// Create reactive streams

406

Multi<ByteBuffer> source = IoMulti.multiFromStream(input);

407

IoMulti.OutputStreamMulti destination = IoMulti.outputStreamMulti();

408

409

// Transform and pipe data

410

source

411

.map(buffer -> {

412

// Example transformation: add prefix to each line

413

byte[] bytes = new byte[buffer.remaining()];

414

buffer.get(bytes);

415

String text = new String(bytes);

416

String transformed = text.replaceAll("(?m)^", ">> ");

417

return ByteBuffer.wrap(transformed.getBytes());

418

})

419

.forEach(buffer -> {

420

try {

421

byte[] bytes = new byte[buffer.remaining()];

422

buffer.get(bytes);

423

output.write(bytes);

424

} catch (Exception e) {

425

throw new RuntimeException(e);

426

}

427

});

428

429

System.out.println("File transformation completed");

430

} catch (Exception e) {

431

e.printStackTrace();

432

}

433

```

434

435

### Async File Processing

436

437

```java

438

import io.helidon.common.reactive.IoMulti;

439

import io.helidon.common.reactive.Multi;

440

import java.nio.ByteBuffer;

441

import java.util.concurrent.CompletableFuture;

442

import java.util.concurrent.CompletionStage;

443

444

// Process multiple files concurrently

445

List<String> fileNames = Arrays.asList("file1.txt", "file2.txt", "file3.txt");

446

447

CompletionStage<Void> allFiles = Multi.create(fileNames)

448

.flatMapCompletionStage(fileName -> {

449

return CompletableFuture.supplyAsync(() -> {

450

try (FileInputStream fis = new FileInputStream(fileName)) {

451

Multi<ByteBuffer> fileData = IoMulti.multiFromStream(fis);

452

453

long size = fileData

454

.map(ByteBuffer::remaining)

455

.map(Integer::longValue)

456

.reduce(0L, Long::sum)

457

.await();

458

459

System.out.println(fileName + ": " + size + " bytes");

460

return null;

461

} catch (Exception e) {

462

throw new RuntimeException(e);

463

}

464

});

465

})

466

.ignoreElements()

467

.toStage();

468

469

allFiles.toCompletableFuture().join();

470

System.out.println("All files processed");

471

```

472

473

## Error Handling in I/O Operations

474

475

```java

476

import io.helidon.common.reactive.IoMulti;

477

import io.helidon.common.reactive.Multi;

478

import java.io.FileInputStream;

479

import java.io.FileNotFoundException;

480

481

// Robust file reading with error handling

482

Multi<String> fileContent = Multi.defer(() -> {

483

try {

484

FileInputStream fis = new FileInputStream("might-not-exist.txt");

485

return IoMulti.multiFromStream(fis)

486

.map(buffer -> {

487

byte[] bytes = new byte[buffer.remaining()];

488

buffer.get(bytes);

489

return new String(bytes);

490

});

491

} catch (FileNotFoundException e) {

492

return Multi.error(e);

493

}

494

})

495

.onErrorResumeWith(error -> {

496

System.err.println("File not found, using default content");

497

return Multi.just("Default content");

498

})

499

.retry(3); // Retry up to 3 times

500

501

fileContent.forEach(System.out::println);

502

```