CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-helidon-common--helidon-common-reactive

Helidon Common Reactive Library - A reactive programming library providing Multi and Single abstractions

Pending
Overview
Eval results
Files

io-integration.mddocs/

I/O Integration

Reactive I/O utilities for integrating with Java I/O streams, channels, and file operations. Enables reactive processing of I/O data with proper backpressure handling and asynchronous execution.

Capabilities

IoMulti Utility Class

Factory class providing I/O integration utilities for creating reactive streams from I/O sources.

/**
 * Factory class for I/O integration utilities
 */
public final class IoMulti {
}

InputStream Integration

Convert InputStreams to reactive Multi streams with configurable buffering and execution.

Basic InputStream to Multi

/**
 * Create Multi<ByteBuffer> from InputStream
 * @param inputStream input stream to read from
 * @return Multi emitting ByteBuffers from stream
 * @throws NullPointerException if inputStream is null
 */
static Multi<ByteBuffer> multiFromStream(InputStream inputStream);

/**
 * Advanced builder for InputStream to Multi conversion
 * @param inputStream input stream to read from
 * @return builder for configuration
 * @throws NullPointerException if inputStream is null
 */
static MultiFromInputStreamBuilder multiFromStreamBuilder(InputStream inputStream);

MultiFromInputStreamBuilder Configuration

/**
 * Builder for advanced InputStream to Multi configuration
 */
public static final class MultiFromInputStreamBuilder {
    /**
     * Set buffer size for reads
     * @param size buffer size in bytes
     * @return builder for chaining
     */
    MultiFromInputStreamBuilder byteBufferSize(int size);
    
    /**
     * Set executor for blocking reads
     * @param executor executor service for I/O operations
     * @return builder for chaining
     * @throws NullPointerException if executor is null
     */
    MultiFromInputStreamBuilder executor(ExecutorService executor);
    
    /**
     * Build the configured Multi
     * @return Multi<ByteBuffer> from InputStream
     */
    Multi<ByteBuffer> build();
}

OutputStream Integration

Create reactive OutputStreams that publish written data as Multi streams.

Basic OutputStream Multi

/**
 * Create OutputStream that publishes written data as Multi<ByteBuffer>
 * @return OutputStreamMulti instance
 */
static OutputStreamMulti outputStreamMulti();

/**
 * Advanced builder for OutputStream Multi creation
 * @return builder for configuration
 */
static OutputStreamMultiBuilder outputStreamMultiBuilder();

OutputStreamMultiBuilder Configuration

/**
 * Builder for advanced OutputStream Multi configuration
 */
public static final class OutputStreamMultiBuilder {
    /**
     * Set write timeout when no downstream demand
     * @param timeout timeout duration
     * @return builder for chaining
     * @throws NullPointerException if timeout is null
     */
    OutputStreamMultiBuilder timeout(Duration timeout);
    
    /**
     * Set callback for demand notifications
     * @param onRequest callback receiving (requested, current_demand)
     * @return builder for chaining
     * @throws NullPointerException if onRequest is null
     */
    OutputStreamMultiBuilder onRequest(BiConsumer<Long, Long> onRequest);
    
    /**
     * Build the configured OutputStreamMulti
     * @return OutputStreamMulti instance
     */
    OutputStreamMulti build();
}

OutputStreamMulti Class

/**
 * OutputStream implementation that publishes written data as reactive stream
 */
public final class OutputStreamMulti extends OutputStream implements Multi<ByteBuffer> {
    /**
     * Standard OutputStream write methods
     */
    @Override
    void write(int b) throws IOException;
    
    @Override
    void write(byte[] b) throws IOException;
    
    @Override
    void write(byte[] b, int off, int len) throws IOException;
    
    @Override
    void flush() throws IOException;
    
    @Override
    void close() throws IOException;
}

ByteChannel Integration

Reactive integration with Java NIO ByteChannels for both reading and writing operations.

Reading from ByteChannels

/**
 * Create Multi from ReadableByteChannel
 * @param channel readable byte channel
 * @return Multi<ByteBuffer> from channel
 * @throws NullPointerException if channel is null
 */
static Multi<ByteBuffer> multiFromByteChannel(ReadableByteChannel channel);

/**
 * Advanced builder for ByteChannel reading
 * @param channel readable byte channel
 * @return builder for configuration
 * @throws NullPointerException if channel is null
 */
static MultiFromByteChannelBuilder multiFromByteChannelBuilder(ReadableByteChannel channel);

MultiFromByteChannelBuilder Configuration

/**
 * Builder for advanced ByteChannel reading configuration
 */
public static final class MultiFromByteChannelBuilder {
    /**
     * Set executor for async reads
     * @param executor scheduled executor service
     * @return builder for chaining
     * @throws NullPointerException if executor is null
     */
    MultiFromByteChannelBuilder executor(ScheduledExecutorService executor);
    
    /**
     * Set retry delays for failed reads
     * @param retrySchema retry delay strategy
     * @return builder for chaining
     * @throws NullPointerException if retrySchema is null
     */
    MultiFromByteChannelBuilder retrySchema(RetrySchema retrySchema);
    
    /**
     * Set read buffer size
     * @param capacity buffer capacity in bytes
     * @return builder for chaining
     */
    MultiFromByteChannelBuilder bufferCapacity(int capacity);
    
    /**
     * Build the configured Multi
     * @return Multi<ByteBuffer> from channel
     */
    Multi<ByteBuffer> build();
}

Writing to ByteChannels

/**
 * Create function to write Multi<ByteBuffer> to WritableByteChannel
 * @param channel writable byte channel
 * @return function that writes Multi data to channel
 * @throws NullPointerException if channel is null
 */
static Function<Multi<ByteBuffer>, CompletionStage<Void>> multiToByteChannel(WritableByteChannel channel);

/**
 * Advanced builder for ByteChannel writing
 * @param channel writable byte channel
 * @return builder for configuration
 * @throws NullPointerException if channel is null
 */
static MultiToByteChannelBuilder multiToByteChannelBuilder(WritableByteChannel channel);

/**
 * Convenience method for writing Multi to file
 * @param path file path to write to
 * @return function that writes Multi data to file
 * @throws NullPointerException if path is null
 */
static Function<Multi<ByteBuffer>, CompletionStage<Void>> writeToFile(Path path);

MultiToByteChannelBuilder Configuration

/**
 * Builder for advanced ByteChannel writing configuration
 */
public static final class MultiToByteChannelBuilder {
    /**
     * Set executor for blocking writes
     * @param executor executor for I/O operations
     * @return builder for chaining
     * @throws NullPointerException if executor is null
     */
    MultiToByteChannelBuilder executor(Executor executor);
    
    /**
     * Build the configured write function
     * @return function to write Multi<ByteBuffer> to channel
     */
    Function<Multi<ByteBuffer>, CompletionStage<Void>> build();
}

Usage Examples

Reading Files Reactively

import io.helidon.common.reactive.IoMulti;
import io.helidon.common.reactive.Multi;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;

// Basic file reading
try (FileInputStream fis = new FileInputStream("data.txt")) {
    Multi<ByteBuffer> fileData = IoMulti.multiFromStream(fis);
    
    // Process data
    fileData
        .map(buffer -> {
            byte[] bytes = new byte[buffer.remaining()];
            buffer.get(bytes);
            return new String(bytes);
        })
        .forEach(System.out::println);
} catch (IOException e) {
    e.printStackTrace();
}

// Advanced file reading with custom buffer size
try (FileInputStream fis = new FileInputStream("large-file.dat")) {
    Multi<ByteBuffer> fileData = IoMulti.multiFromStreamBuilder(fis)
        .byteBufferSize(8192)  // 8KB buffer
        .executor(Executors.newCachedThreadPool())
        .build();
        
    long totalBytes = fileData
        .map(ByteBuffer::remaining)
        .map(Integer::longValue)
        .reduce(0L, Long::sum)
        .await();
        
    System.out.println("Total bytes read: " + totalBytes);
} catch (IOException e) {
    e.printStackTrace();
}

Writing Data Reactively

import io.helidon.common.reactive.IoMulti;
import io.helidon.common.reactive.Multi;
import java.nio.ByteBuffer;
import java.time.Duration;

// Create reactive OutputStream
IoMulti.OutputStreamMulti outputStream = IoMulti.outputStreamMultiBuilder()
    .timeout(Duration.ofSeconds(5))
    .onRequest((requested, totalDemand) -> 
        System.out.println("Requested: " + requested + ", Total demand: " + totalDemand))
    .build();

// Write data to the stream from another thread
new Thread(() -> {
    try {
        outputStream.write("Hello ".getBytes());
        outputStream.write("reactive ".getBytes());
        outputStream.write("world!".getBytes());
        outputStream.close();
    } catch (Exception e) {
        e.printStackTrace();
    }
}).start();

// Read the written data reactively
outputStream
    .map(buffer -> {
        byte[] bytes = new byte[buffer.remaining()];
        buffer.get(bytes);
        return new String(bytes);
    })
    .forEach(System.out::print); // Prints: Hello reactive world!

ByteChannel Operations

import io.helidon.common.reactive.IoMulti;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.RetrySchema;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

// Reading from channel with retry
try (FileChannel readChannel = FileChannel.open(Path.of("input.txt"), StandardOpenOption.READ)) {
    Multi<ByteBuffer> data = IoMulti.multiFromByteChannelBuilder(readChannel)
        .executor(executor)
        .retrySchema(RetrySchema.linear(100, 50, 1000)) // Linear backoff
        .bufferCapacity(4096)
        .build();
        
    // Process and write to another file
    Function<Multi<ByteBuffer>, CompletionStage<Void>> writer = 
        IoMulti.writeToFile(Path.of("output.txt"));
        
    CompletionStage<Void> completion = data
        .map(buffer -> {
            // Transform data (e.g., uppercase text)
            byte[] bytes = new byte[buffer.remaining()];
            buffer.get(bytes);
            String text = new String(bytes).toUpperCase();
            return ByteBuffer.wrap(text.getBytes());
        })
        .to(writer);
        
    completion.toCompletableFuture().join(); // Wait for completion
    System.out.println("File processing completed");
} catch (Exception e) {
    e.printStackTrace();
} finally {
    executor.shutdown();
}

Piping Data Between Streams

import io.helidon.common.reactive.IoMulti;
import io.helidon.common.reactive.Multi;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;

// Reactive file copy with transformation
try (FileInputStream input = new FileInputStream("source.txt");
     FileOutputStream output = new FileOutputStream("destination.txt")) {
     
    // Create reactive streams
    Multi<ByteBuffer> source = IoMulti.multiFromStream(input);
    IoMulti.OutputStreamMulti destination = IoMulti.outputStreamMulti();
    
    // Transform and pipe data
    source
        .map(buffer -> {
            // Example transformation: add prefix to each line
            byte[] bytes = new byte[buffer.remaining()];
            buffer.get(bytes);
            String text = new String(bytes);
            String transformed = text.replaceAll("(?m)^", ">> ");
            return ByteBuffer.wrap(transformed.getBytes());
        })
        .forEach(buffer -> {
            try {
                byte[] bytes = new byte[buffer.remaining()];
                buffer.get(bytes);
                output.write(bytes);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        
    System.out.println("File transformation completed");
} catch (Exception e) {
    e.printStackTrace();
}

Async File Processing

import io.helidon.common.reactive.IoMulti;
import io.helidon.common.reactive.Multi;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

// Process multiple files concurrently
List<String> fileNames = Arrays.asList("file1.txt", "file2.txt", "file3.txt");

CompletionStage<Void> allFiles = Multi.create(fileNames)
    .flatMapCompletionStage(fileName -> {
        return CompletableFuture.supplyAsync(() -> {
            try (FileInputStream fis = new FileInputStream(fileName)) {
                Multi<ByteBuffer> fileData = IoMulti.multiFromStream(fis);
                
                long size = fileData
                    .map(ByteBuffer::remaining)
                    .map(Integer::longValue)
                    .reduce(0L, Long::sum)
                    .await();
                    
                System.out.println(fileName + ": " + size + " bytes");
                return null;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    })
    .ignoreElements()
    .toStage();

allFiles.toCompletableFuture().join();
System.out.println("All files processed");

Error Handling in I/O Operations

import io.helidon.common.reactive.IoMulti;
import io.helidon.common.reactive.Multi;
import java.io.FileInputStream;
import java.io.FileNotFoundException;

// Robust file reading with error handling
Multi<String> fileContent = Multi.defer(() -> {
    try {
        FileInputStream fis = new FileInputStream("might-not-exist.txt");
        return IoMulti.multiFromStream(fis)
            .map(buffer -> {
                byte[] bytes = new byte[buffer.remaining()];
                buffer.get(bytes);
                return new String(bytes);
            });
    } catch (FileNotFoundException e) {
        return Multi.error(e);
    }
})
.onErrorResumeWith(error -> {
    System.err.println("File not found, using default content");
    return Multi.just("Default content");
})
.retry(3); // Retry up to 3 times

fileContent.forEach(System.out::println);

Install with Tessl CLI

npx tessl i tessl/maven-io-helidon-common--helidon-common-reactive

docs

index.md

io-integration.md

multi.md

single.md

supporting-types.md

tile.json