0
# I/O Integration
1
2
Reactive I/O utilities for integrating with Java I/O streams, channels, and file operations. Enables reactive processing of I/O data with proper backpressure handling and asynchronous execution.
3
4
## Capabilities
5
6
### IoMulti Utility Class
7
8
Factory class providing I/O integration utilities for creating reactive streams from I/O sources.
9
10
```java { .api }
11
/**
12
* Factory class for I/O integration utilities
13
*/
14
public final class IoMulti {
15
}
16
```
17
18
## InputStream Integration
19
20
Convert InputStreams to reactive Multi streams with configurable buffering and execution.
21
22
### Basic InputStream to Multi
23
24
```java { .api }
25
/**
26
* Create Multi<ByteBuffer> from InputStream
27
* @param inputStream input stream to read from
28
* @return Multi emitting ByteBuffers from stream
29
* @throws NullPointerException if inputStream is null
30
*/
31
static Multi<ByteBuffer> multiFromStream(InputStream inputStream);
32
33
/**
34
* Advanced builder for InputStream to Multi conversion
35
* @param inputStream input stream to read from
36
* @return builder for configuration
37
* @throws NullPointerException if inputStream is null
38
*/
39
static MultiFromInputStreamBuilder multiFromStreamBuilder(InputStream inputStream);
40
```
41
42
### MultiFromInputStreamBuilder Configuration
43
44
```java { .api }
45
/**
46
* Builder for advanced InputStream to Multi configuration
47
*/
48
public static final class MultiFromInputStreamBuilder {
49
/**
50
* Set buffer size for reads
51
* @param size buffer size in bytes
52
* @return builder for chaining
53
*/
54
MultiFromInputStreamBuilder byteBufferSize(int size);
55
56
/**
57
* Set executor for blocking reads
58
* @param executor executor service for I/O operations
59
* @return builder for chaining
60
* @throws NullPointerException if executor is null
61
*/
62
MultiFromInputStreamBuilder executor(ExecutorService executor);
63
64
/**
65
* Build the configured Multi
66
* @return Multi<ByteBuffer> from InputStream
67
*/
68
Multi<ByteBuffer> build();
69
}
70
```
71
72
## OutputStream Integration
73
74
Create reactive OutputStreams that publish written data as Multi streams.
75
76
### Basic OutputStream Multi
77
78
```java { .api }
79
/**
80
* Create OutputStream that publishes written data as Multi<ByteBuffer>
81
* @return OutputStreamMulti instance
82
*/
83
static OutputStreamMulti outputStreamMulti();
84
85
/**
86
* Advanced builder for OutputStream Multi creation
87
* @return builder for configuration
88
*/
89
static OutputStreamMultiBuilder outputStreamMultiBuilder();
90
```
91
92
### OutputStreamMultiBuilder Configuration
93
94
```java { .api }
95
/**
96
* Builder for advanced OutputStream Multi configuration
97
*/
98
public static final class OutputStreamMultiBuilder {
99
/**
100
* Set write timeout when no downstream demand
101
* @param timeout timeout duration
102
* @return builder for chaining
103
* @throws NullPointerException if timeout is null
104
*/
105
OutputStreamMultiBuilder timeout(Duration timeout);
106
107
/**
108
* Set callback for demand notifications
109
* @param onRequest callback receiving (requested, current_demand)
110
* @return builder for chaining
111
* @throws NullPointerException if onRequest is null
112
*/
113
OutputStreamMultiBuilder onRequest(BiConsumer<Long, Long> onRequest);
114
115
/**
116
* Build the configured OutputStreamMulti
117
* @return OutputStreamMulti instance
118
*/
119
OutputStreamMulti build();
120
}
121
```
122
123
### OutputStreamMulti Class
124
125
```java { .api }
126
/**
127
* OutputStream implementation that publishes written data as reactive stream
128
*/
129
public final class OutputStreamMulti extends OutputStream implements Multi<ByteBuffer> {
130
/**
131
* Standard OutputStream write methods
132
*/
133
@Override
134
void write(int b) throws IOException;
135
136
@Override
137
void write(byte[] b) throws IOException;
138
139
@Override
140
void write(byte[] b, int off, int len) throws IOException;
141
142
@Override
143
void flush() throws IOException;
144
145
@Override
146
void close() throws IOException;
147
}
148
```
149
150
## ByteChannel Integration
151
152
Reactive integration with Java NIO ByteChannels for both reading and writing operations.
153
154
### Reading from ByteChannels
155
156
```java { .api }
157
/**
158
* Create Multi from ReadableByteChannel
159
* @param channel readable byte channel
160
* @return Multi<ByteBuffer> from channel
161
* @throws NullPointerException if channel is null
162
*/
163
static Multi<ByteBuffer> multiFromByteChannel(ReadableByteChannel channel);
164
165
/**
166
* Advanced builder for ByteChannel reading
167
* @param channel readable byte channel
168
* @return builder for configuration
169
* @throws NullPointerException if channel is null
170
*/
171
static MultiFromByteChannelBuilder multiFromByteChannelBuilder(ReadableByteChannel channel);
172
```
173
174
### MultiFromByteChannelBuilder Configuration
175
176
```java { .api }
177
/**
178
* Builder for advanced ByteChannel reading configuration
179
*/
180
public static final class MultiFromByteChannelBuilder {
181
/**
182
* Set executor for async reads
183
* @param executor scheduled executor service
184
* @return builder for chaining
185
* @throws NullPointerException if executor is null
186
*/
187
MultiFromByteChannelBuilder executor(ScheduledExecutorService executor);
188
189
/**
190
* Set retry delays for failed reads
191
* @param retrySchema retry delay strategy
192
* @return builder for chaining
193
* @throws NullPointerException if retrySchema is null
194
*/
195
MultiFromByteChannelBuilder retrySchema(RetrySchema retrySchema);
196
197
/**
198
* Set read buffer size
199
* @param capacity buffer capacity in bytes
200
* @return builder for chaining
201
*/
202
MultiFromByteChannelBuilder bufferCapacity(int capacity);
203
204
/**
205
* Build the configured Multi
206
* @return Multi<ByteBuffer> from channel
207
*/
208
Multi<ByteBuffer> build();
209
}
210
```
211
212
### Writing to ByteChannels
213
214
```java { .api }
215
/**
216
* Create function to write Multi<ByteBuffer> to WritableByteChannel
217
* @param channel writable byte channel
218
* @return function that writes Multi data to channel
219
* @throws NullPointerException if channel is null
220
*/
221
static Function<Multi<ByteBuffer>, CompletionStage<Void>> multiToByteChannel(WritableByteChannel channel);
222
223
/**
224
* Advanced builder for ByteChannel writing
225
* @param channel writable byte channel
226
* @return builder for configuration
227
* @throws NullPointerException if channel is null
228
*/
229
static MultiToByteChannelBuilder multiToByteChannelBuilder(WritableByteChannel channel);
230
231
/**
232
* Convenience method for writing Multi to file
233
* @param path file path to write to
234
* @return function that writes Multi data to file
235
* @throws NullPointerException if path is null
236
*/
237
static Function<Multi<ByteBuffer>, CompletionStage<Void>> writeToFile(Path path);
238
```
239
240
### MultiToByteChannelBuilder Configuration
241
242
```java { .api }
243
/**
244
* Builder for advanced ByteChannel writing configuration
245
*/
246
public static final class MultiToByteChannelBuilder {
247
/**
248
* Set executor for blocking writes
249
* @param executor executor for I/O operations
250
* @return builder for chaining
251
* @throws NullPointerException if executor is null
252
*/
253
MultiToByteChannelBuilder executor(Executor executor);
254
255
/**
256
* Build the configured write function
257
* @return function to write Multi<ByteBuffer> to channel
258
*/
259
Function<Multi<ByteBuffer>, CompletionStage<Void>> build();
260
}
261
```
262
263
## Usage Examples
264
265
### Reading Files Reactively
266
267
```java
268
import io.helidon.common.reactive.IoMulti;
269
import io.helidon.common.reactive.Multi;
270
import java.io.FileInputStream;
271
import java.io.IOException;
272
import java.nio.ByteBuffer;
273
import java.util.concurrent.Executors;
274
275
// Basic file reading
276
try (FileInputStream fis = new FileInputStream("data.txt")) {
277
Multi<ByteBuffer> fileData = IoMulti.multiFromStream(fis);
278
279
// Process data
280
fileData
281
.map(buffer -> {
282
byte[] bytes = new byte[buffer.remaining()];
283
buffer.get(bytes);
284
return new String(bytes);
285
})
286
.forEach(System.out::println);
287
} catch (IOException e) {
288
e.printStackTrace();
289
}
290
291
// Advanced file reading with custom buffer size
292
try (FileInputStream fis = new FileInputStream("large-file.dat")) {
293
Multi<ByteBuffer> fileData = IoMulti.multiFromStreamBuilder(fis)
294
.byteBufferSize(8192) // 8KB buffer
295
.executor(Executors.newCachedThreadPool())
296
.build();
297
298
long totalBytes = fileData
299
.map(ByteBuffer::remaining)
300
.map(Integer::longValue)
301
.reduce(0L, Long::sum)
302
.await();
303
304
System.out.println("Total bytes read: " + totalBytes);
305
} catch (IOException e) {
306
e.printStackTrace();
307
}
308
```
309
310
### Writing Data Reactively
311
312
```java
313
import io.helidon.common.reactive.IoMulti;
314
import io.helidon.common.reactive.Multi;
315
import java.nio.ByteBuffer;
316
import java.time.Duration;
317
318
// Create reactive OutputStream
319
IoMulti.OutputStreamMulti outputStream = IoMulti.outputStreamMultiBuilder()
320
.timeout(Duration.ofSeconds(5))
321
.onRequest((requested, totalDemand) ->
322
System.out.println("Requested: " + requested + ", Total demand: " + totalDemand))
323
.build();
324
325
// Write data to the stream from another thread
326
new Thread(() -> {
327
try {
328
outputStream.write("Hello ".getBytes());
329
outputStream.write("reactive ".getBytes());
330
outputStream.write("world!".getBytes());
331
outputStream.close();
332
} catch (Exception e) {
333
e.printStackTrace();
334
}
335
}).start();
336
337
// Read the written data reactively
338
outputStream
339
.map(buffer -> {
340
byte[] bytes = new byte[buffer.remaining()];
341
buffer.get(bytes);
342
return new String(bytes);
343
})
344
.forEach(System.out::print); // Prints: Hello reactive world!
345
```
346
347
### ByteChannel Operations
348
349
```java
350
import io.helidon.common.reactive.IoMulti;
351
import io.helidon.common.reactive.Multi;
352
import io.helidon.common.reactive.RetrySchema;
353
import java.nio.channels.FileChannel;
354
import java.nio.file.Path;
355
import java.nio.file.StandardOpenOption;
356
import java.util.concurrent.Executors;
357
import java.util.concurrent.ScheduledExecutorService;
358
359
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
360
361
// Reading from channel with retry
362
try (FileChannel readChannel = FileChannel.open(Path.of("input.txt"), StandardOpenOption.READ)) {
363
Multi<ByteBuffer> data = IoMulti.multiFromByteChannelBuilder(readChannel)
364
.executor(executor)
365
.retrySchema(RetrySchema.linear(100, 50, 1000)) // Linear backoff
366
.bufferCapacity(4096)
367
.build();
368
369
// Process and write to another file
370
Function<Multi<ByteBuffer>, CompletionStage<Void>> writer =
371
IoMulti.writeToFile(Path.of("output.txt"));
372
373
CompletionStage<Void> completion = data
374
.map(buffer -> {
375
// Transform data (e.g., uppercase text)
376
byte[] bytes = new byte[buffer.remaining()];
377
buffer.get(bytes);
378
String text = new String(bytes).toUpperCase();
379
return ByteBuffer.wrap(text.getBytes());
380
})
381
.to(writer);
382
383
completion.toCompletableFuture().join(); // Wait for completion
384
System.out.println("File processing completed");
385
} catch (Exception e) {
386
e.printStackTrace();
387
} finally {
388
executor.shutdown();
389
}
390
```
391
392
### Piping Data Between Streams
393
394
```java
395
import io.helidon.common.reactive.IoMulti;
396
import io.helidon.common.reactive.Multi;
397
import java.io.FileInputStream;
398
import java.io.FileOutputStream;
399
import java.nio.ByteBuffer;
400
401
// Reactive file copy with transformation
402
try (FileInputStream input = new FileInputStream("source.txt");
403
FileOutputStream output = new FileOutputStream("destination.txt")) {
404
405
// Create reactive streams
406
Multi<ByteBuffer> source = IoMulti.multiFromStream(input);
407
IoMulti.OutputStreamMulti destination = IoMulti.outputStreamMulti();
408
409
// Transform and pipe data
410
source
411
.map(buffer -> {
412
// Example transformation: add prefix to each line
413
byte[] bytes = new byte[buffer.remaining()];
414
buffer.get(bytes);
415
String text = new String(bytes);
416
String transformed = text.replaceAll("(?m)^", ">> ");
417
return ByteBuffer.wrap(transformed.getBytes());
418
})
419
.forEach(buffer -> {
420
try {
421
byte[] bytes = new byte[buffer.remaining()];
422
buffer.get(bytes);
423
output.write(bytes);
424
} catch (Exception e) {
425
throw new RuntimeException(e);
426
}
427
});
428
429
System.out.println("File transformation completed");
430
} catch (Exception e) {
431
e.printStackTrace();
432
}
433
```
434
435
### Async File Processing
436
437
```java
438
import io.helidon.common.reactive.IoMulti;
439
import io.helidon.common.reactive.Multi;
440
import java.nio.ByteBuffer;
441
import java.util.concurrent.CompletableFuture;
442
import java.util.concurrent.CompletionStage;
443
444
// Process multiple files concurrently
445
List<String> fileNames = Arrays.asList("file1.txt", "file2.txt", "file3.txt");
446
447
CompletionStage<Void> allFiles = Multi.create(fileNames)
448
.flatMapCompletionStage(fileName -> {
449
return CompletableFuture.supplyAsync(() -> {
450
try (FileInputStream fis = new FileInputStream(fileName)) {
451
Multi<ByteBuffer> fileData = IoMulti.multiFromStream(fis);
452
453
long size = fileData
454
.map(ByteBuffer::remaining)
455
.map(Integer::longValue)
456
.reduce(0L, Long::sum)
457
.await();
458
459
System.out.println(fileName + ": " + size + " bytes");
460
return null;
461
} catch (Exception e) {
462
throw new RuntimeException(e);
463
}
464
});
465
})
466
.ignoreElements()
467
.toStage();
468
469
allFiles.toCompletableFuture().join();
470
System.out.println("All files processed");
471
```
472
473
## Error Handling in I/O Operations
474
475
```java
476
import io.helidon.common.reactive.IoMulti;
477
import io.helidon.common.reactive.Multi;
478
import java.io.FileInputStream;
479
import java.io.FileNotFoundException;
480
481
// Robust file reading with error handling
482
Multi<String> fileContent = Multi.defer(() -> {
483
try {
484
FileInputStream fis = new FileInputStream("might-not-exist.txt");
485
return IoMulti.multiFromStream(fis)
486
.map(buffer -> {
487
byte[] bytes = new byte[buffer.remaining()];
488
buffer.get(bytes);
489
return new String(bytes);
490
});
491
} catch (FileNotFoundException e) {
492
return Multi.error(e);
493
}
494
})
495
.onErrorResumeWith(error -> {
496
System.err.println("File not found, using default content");
497
return Multi.just("Default content");
498
})
499
.retry(3); // Retry up to 3 times
500
501
fileContent.forEach(System.out::println);
502
```