Helidon Common Reactive Library - A reactive programming library providing Multi and Single abstractions
—
Reactive I/O utilities for integrating with Java I/O streams, channels, and file operations. Enables reactive processing of I/O data with proper backpressure handling and asynchronous execution.
Factory class providing I/O integration utilities for creating reactive streams from I/O sources.
/**
* Factory class for I/O integration utilities
*/
public final class IoMulti {
}Convert InputStreams to reactive Multi streams with configurable buffering and execution.
/**
* Create Multi<ByteBuffer> from InputStream
* @param inputStream input stream to read from
* @return Multi emitting ByteBuffers from stream
* @throws NullPointerException if inputStream is null
*/
static Multi<ByteBuffer> multiFromStream(InputStream inputStream);
/**
* Advanced builder for InputStream to Multi conversion
* @param inputStream input stream to read from
* @return builder for configuration
* @throws NullPointerException if inputStream is null
*/
static MultiFromInputStreamBuilder multiFromStreamBuilder(InputStream inputStream);/**
* Builder for advanced InputStream to Multi configuration
*/
public static final class MultiFromInputStreamBuilder {
/**
* Set buffer size for reads
* @param size buffer size in bytes
* @return builder for chaining
*/
MultiFromInputStreamBuilder byteBufferSize(int size);
/**
* Set executor for blocking reads
* @param executor executor service for I/O operations
* @return builder for chaining
* @throws NullPointerException if executor is null
*/
MultiFromInputStreamBuilder executor(ExecutorService executor);
/**
* Build the configured Multi
* @return Multi<ByteBuffer> from InputStream
*/
Multi<ByteBuffer> build();
}Create reactive OutputStreams that publish written data as Multi streams.
/**
* Create OutputStream that publishes written data as Multi<ByteBuffer>
* @return OutputStreamMulti instance
*/
static OutputStreamMulti outputStreamMulti();
/**
* Advanced builder for OutputStream Multi creation
* @return builder for configuration
*/
static OutputStreamMultiBuilder outputStreamMultiBuilder();/**
* Builder for advanced OutputStream Multi configuration
*/
public static final class OutputStreamMultiBuilder {
/**
* Set write timeout when no downstream demand
* @param timeout timeout duration
* @return builder for chaining
* @throws NullPointerException if timeout is null
*/
OutputStreamMultiBuilder timeout(Duration timeout);
/**
* Set callback for demand notifications
* @param onRequest callback receiving (requested, current_demand)
* @return builder for chaining
* @throws NullPointerException if onRequest is null
*/
OutputStreamMultiBuilder onRequest(BiConsumer<Long, Long> onRequest);
/**
* Build the configured OutputStreamMulti
* @return OutputStreamMulti instance
*/
OutputStreamMulti build();
}/**
* OutputStream implementation that publishes written data as reactive stream
*/
public final class OutputStreamMulti extends OutputStream implements Multi<ByteBuffer> {
/**
* Standard OutputStream write methods
*/
@Override
void write(int b) throws IOException;
@Override
void write(byte[] b) throws IOException;
@Override
void write(byte[] b, int off, int len) throws IOException;
@Override
void flush() throws IOException;
@Override
void close() throws IOException;
}Reactive integration with Java NIO ByteChannels for both reading and writing operations.
/**
* Create Multi from ReadableByteChannel
* @param channel readable byte channel
* @return Multi<ByteBuffer> from channel
* @throws NullPointerException if channel is null
*/
static Multi<ByteBuffer> multiFromByteChannel(ReadableByteChannel channel);
/**
* Advanced builder for ByteChannel reading
* @param channel readable byte channel
* @return builder for configuration
* @throws NullPointerException if channel is null
*/
static MultiFromByteChannelBuilder multiFromByteChannelBuilder(ReadableByteChannel channel);/**
* Builder for advanced ByteChannel reading configuration
*/
public static final class MultiFromByteChannelBuilder {
/**
* Set executor for async reads
* @param executor scheduled executor service
* @return builder for chaining
* @throws NullPointerException if executor is null
*/
MultiFromByteChannelBuilder executor(ScheduledExecutorService executor);
/**
* Set retry delays for failed reads
* @param retrySchema retry delay strategy
* @return builder for chaining
* @throws NullPointerException if retrySchema is null
*/
MultiFromByteChannelBuilder retrySchema(RetrySchema retrySchema);
/**
* Set read buffer size
* @param capacity buffer capacity in bytes
* @return builder for chaining
*/
MultiFromByteChannelBuilder bufferCapacity(int capacity);
/**
* Build the configured Multi
* @return Multi<ByteBuffer> from channel
*/
Multi<ByteBuffer> build();
}/**
* Create function to write Multi<ByteBuffer> to WritableByteChannel
* @param channel writable byte channel
* @return function that writes Multi data to channel
* @throws NullPointerException if channel is null
*/
static Function<Multi<ByteBuffer>, CompletionStage<Void>> multiToByteChannel(WritableByteChannel channel);
/**
* Advanced builder for ByteChannel writing
* @param channel writable byte channel
* @return builder for configuration
* @throws NullPointerException if channel is null
*/
static MultiToByteChannelBuilder multiToByteChannelBuilder(WritableByteChannel channel);
/**
* Convenience method for writing Multi to file
* @param path file path to write to
* @return function that writes Multi data to file
* @throws NullPointerException if path is null
*/
static Function<Multi<ByteBuffer>, CompletionStage<Void>> writeToFile(Path path);/**
* Builder for advanced ByteChannel writing configuration
*/
public static final class MultiToByteChannelBuilder {
/**
* Set executor for blocking writes
* @param executor executor for I/O operations
* @return builder for chaining
* @throws NullPointerException if executor is null
*/
MultiToByteChannelBuilder executor(Executor executor);
/**
* Build the configured write function
* @return function to write Multi<ByteBuffer> to channel
*/
Function<Multi<ByteBuffer>, CompletionStage<Void>> build();
}import io.helidon.common.reactive.IoMulti;
import io.helidon.common.reactive.Multi;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
// Basic file reading
try (FileInputStream fis = new FileInputStream("data.txt")) {
Multi<ByteBuffer> fileData = IoMulti.multiFromStream(fis);
// Process data
fileData
.map(buffer -> {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new String(bytes);
})
.forEach(System.out::println);
} catch (IOException e) {
e.printStackTrace();
}
// Advanced file reading with custom buffer size
try (FileInputStream fis = new FileInputStream("large-file.dat")) {
Multi<ByteBuffer> fileData = IoMulti.multiFromStreamBuilder(fis)
.byteBufferSize(8192) // 8KB buffer
.executor(Executors.newCachedThreadPool())
.build();
long totalBytes = fileData
.map(ByteBuffer::remaining)
.map(Integer::longValue)
.reduce(0L, Long::sum)
.await();
System.out.println("Total bytes read: " + totalBytes);
} catch (IOException e) {
e.printStackTrace();
}import io.helidon.common.reactive.IoMulti;
import io.helidon.common.reactive.Multi;
import java.nio.ByteBuffer;
import java.time.Duration;
// Create reactive OutputStream
IoMulti.OutputStreamMulti outputStream = IoMulti.outputStreamMultiBuilder()
.timeout(Duration.ofSeconds(5))
.onRequest((requested, totalDemand) ->
System.out.println("Requested: " + requested + ", Total demand: " + totalDemand))
.build();
// Write data to the stream from another thread
new Thread(() -> {
try {
outputStream.write("Hello ".getBytes());
outputStream.write("reactive ".getBytes());
outputStream.write("world!".getBytes());
outputStream.close();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
// Read the written data reactively
outputStream
.map(buffer -> {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new String(bytes);
})
.forEach(System.out::print); // Prints: Hello reactive world!import io.helidon.common.reactive.IoMulti;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.RetrySchema;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// Reading from channel with retry
try (FileChannel readChannel = FileChannel.open(Path.of("input.txt"), StandardOpenOption.READ)) {
Multi<ByteBuffer> data = IoMulti.multiFromByteChannelBuilder(readChannel)
.executor(executor)
.retrySchema(RetrySchema.linear(100, 50, 1000)) // Linear backoff
.bufferCapacity(4096)
.build();
// Process and write to another file
Function<Multi<ByteBuffer>, CompletionStage<Void>> writer =
IoMulti.writeToFile(Path.of("output.txt"));
CompletionStage<Void> completion = data
.map(buffer -> {
// Transform data (e.g., uppercase text)
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String text = new String(bytes).toUpperCase();
return ByteBuffer.wrap(text.getBytes());
})
.to(writer);
completion.toCompletableFuture().join(); // Wait for completion
System.out.println("File processing completed");
} catch (Exception e) {
e.printStackTrace();
} finally {
executor.shutdown();
}import io.helidon.common.reactive.IoMulti;
import io.helidon.common.reactive.Multi;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
// Reactive file copy with transformation
try (FileInputStream input = new FileInputStream("source.txt");
FileOutputStream output = new FileOutputStream("destination.txt")) {
// Create reactive streams
Multi<ByteBuffer> source = IoMulti.multiFromStream(input);
IoMulti.OutputStreamMulti destination = IoMulti.outputStreamMulti();
// Transform and pipe data
source
.map(buffer -> {
// Example transformation: add prefix to each line
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String text = new String(bytes);
String transformed = text.replaceAll("(?m)^", ">> ");
return ByteBuffer.wrap(transformed.getBytes());
})
.forEach(buffer -> {
try {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
output.write(bytes);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
System.out.println("File transformation completed");
} catch (Exception e) {
e.printStackTrace();
}import io.helidon.common.reactive.IoMulti;
import io.helidon.common.reactive.Multi;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
// Process multiple files concurrently
List<String> fileNames = Arrays.asList("file1.txt", "file2.txt", "file3.txt");
CompletionStage<Void> allFiles = Multi.create(fileNames)
.flatMapCompletionStage(fileName -> {
return CompletableFuture.supplyAsync(() -> {
try (FileInputStream fis = new FileInputStream(fileName)) {
Multi<ByteBuffer> fileData = IoMulti.multiFromStream(fis);
long size = fileData
.map(ByteBuffer::remaining)
.map(Integer::longValue)
.reduce(0L, Long::sum)
.await();
System.out.println(fileName + ": " + size + " bytes");
return null;
} catch (Exception e) {
throw new RuntimeException(e);
}
});
})
.ignoreElements()
.toStage();
allFiles.toCompletableFuture().join();
System.out.println("All files processed");import io.helidon.common.reactive.IoMulti;
import io.helidon.common.reactive.Multi;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
// Robust file reading with error handling
Multi<String> fileContent = Multi.defer(() -> {
try {
FileInputStream fis = new FileInputStream("might-not-exist.txt");
return IoMulti.multiFromStream(fis)
.map(buffer -> {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new String(bytes);
});
} catch (FileNotFoundException e) {
return Multi.error(e);
}
})
.onErrorResumeWith(error -> {
System.err.println("File not found, using default content");
return Multi.just("Default content");
})
.retry(3); // Retry up to 3 times
fileContent.forEach(System.out::println);Install with Tessl CLI
npx tessl i tessl/maven-io-helidon-common--helidon-common-reactive