CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-eclipse-jetty--jetty-io

Core I/O components for Eclipse Jetty providing essential I/O utilities, buffer management, and network connection handling

Pending
Overview
Eval results
Files

content-streaming.mddocs/

Content Streaming

Jetty IO's content streaming system provides a demand-driven, backpressure-aware approach to handling data streams. It supports both synchronous and asynchronous operations with integration to reactive streams.

Capabilities

Content.Source Interface

The Source interface provides a demand-based content reading model with support for chunked data processing.

/**
 * Content source with read/demand model
 */
interface Content.Source {
    /** 
     * Read next chunk of content (may return null if no data available)
     * @return Content.Chunk or null if no data currently available
     */
    Chunk read();
    
    /** 
     * Request async notification when content becomes available
     * @param demandCallback callback to invoke when content is available for reading
     */
    void demand(Runnable demandCallback);
    
    /** 
     * Fail the source with an error
     * @param failure the error that caused the failure
     */
    void fail(Throwable failure);
    
    /** 
     * Get content length if known
     * @return content length in bytes, or -1 if unknown
     */
    default long getLength() {
        return -1;
    }
    
    // Static factory methods
    static Source from(ByteBuffer... buffers);
    static Source from(Path path);
    static Source from(InputStream inputStream);
    static Source from(String string);
    static Source from(String string, Charset charset);
    
    // Static utility methods
    static CompletableFuture<ByteBuffer> asByteBuffer(Source source);
    static CompletableFuture<String> asString(Source source);
    static CompletableFuture<String> asString(Source source, Charset charset);
    static InputStream asInputStream(Source source);
    static Flow.Publisher<Chunk> asPublisher(Source source);
    static CompletableFuture<Void> consumeAll(Source source);
    
    interface Factory {
        Source newSource();
    }
}

Usage Examples:

// Reading from source synchronously
Content.Source source = Content.Source.from("Hello World");
Content.Chunk chunk;
while ((chunk = source.read()) != null) {
    if (chunk.hasRemaining()) {
        ByteBuffer data = chunk.getByteBuffer();
        // Process data
        System.out.println(StandardCharsets.UTF_8.decode(data.duplicate()));
    }
    if (chunk.isLast()) {
        break;
    }
}

// Reading asynchronously with demand
Content.Source asyncSource = Content.Source.from(inputStream);
readAsync(asyncSource);

void readAsync(Content.Source source) {
    Content.Chunk chunk = source.read();
    if (chunk == null) {
        // No data available, request notification
        source.demand(() -> readAsync(source));
        return;
    }
    
    // Process chunk
    processChunk(chunk);
    
    if (!chunk.isLast()) {
        // Continue reading
        readAsync(source);
    }
}

// Converting source to other formats
Content.Source source = Content.Source.from(path);

// As ByteBuffer
CompletableFuture<ByteBuffer> bufferFuture = Content.Source.asByteBuffer(source);
bufferFuture.thenAccept(buffer -> {
    // Process complete buffer
});

// As String
CompletableFuture<String> stringFuture = Content.Source.asString(source, StandardCharsets.UTF_8);
stringFuture.thenAccept(content -> {
    System.out.println("Content: " + content);
});

// As InputStream
InputStream inputStream = Content.Source.asInputStream(source);
// Use as regular InputStream

// As Publisher (reactive streams)
Flow.Publisher<Content.Chunk> publisher = Content.Source.asPublisher(source);
publisher.subscribe(new Flow.Subscriber<Content.Chunk>() {
    @Override
    public void onNext(Content.Chunk chunk) {
        // Process chunk
    }
    // ... other methods
});

Content.Sink Interface

The Sink interface provides async content writing capabilities.

/**
 * Content sink for writing content
 */
interface Content.Sink {
    /** 
     * Write content chunk asynchronously
     * @param last true if this is the last chunk
     * @param byteBuffer data to write
     * @param callback callback for completion notification
     */
    void write(boolean last, ByteBuffer byteBuffer, Callback callback);
    
    // Static factory methods
    static Sink asBuffered(Sink sink);
    static Sink asBuffered(Sink sink, ByteBufferPool pool, boolean direct, int size, int maxSize);
    static OutputStream asOutputStream(Sink sink);
    static Flow.Subscriber<Chunk> asSubscriber(Sink sink, Callback callback);
    
    // Static utility methods
    static void write(Sink sink, boolean last, ByteBuffer byteBuffer) throws IOException;
}

Usage Examples:

// Basic sink writing
Content.Sink sink = createSink(); // Implementation specific
ByteBuffer data = ByteBuffer.wrap("Hello World".getBytes());

sink.write(true, data, new Callback() {
    @Override
    public void succeeded() {
        System.out.println("Write completed successfully");
    }
    
    @Override
    public void failed(Throwable x) {
        System.err.println("Write failed: " + x.getMessage());
    }
});

// Buffered sink for small writes
Content.Sink bufferedSink = Content.Sink.asBuffered(sink);
bufferedSink.write(false, ByteBuffer.wrap("Part 1".getBytes()), Callback.NOOP);
bufferedSink.write(false, ByteBuffer.wrap("Part 2".getBytes()), Callback.NOOP);
bufferedSink.write(true, ByteBuffer.wrap("Part 3".getBytes()), Callback.NOOP);

// As OutputStream
OutputStream outputStream = Content.Sink.asOutputStream(sink);
try {
    outputStream.write("Hello World".getBytes());
    outputStream.close(); // Writes final chunk with last=true
} catch (IOException e) {
    // Handle error
}

// As Subscriber (reactive streams)
Flow.Subscriber<Content.Chunk> subscriber = Content.Sink.asSubscriber(sink, new Callback() {
    @Override
    public void succeeded() {
        System.out.println("All chunks written successfully");
    }
    
    @Override
    public void failed(Throwable x) {
        System.err.println("Writing failed: " + x.getMessage());
    }
});

// Use subscriber with publisher
Content.Source source = Content.Source.from(data);
Flow.Publisher<Content.Chunk> publisher = Content.Source.asPublisher(source);
publisher.subscribe(subscriber);

Content.Chunk Interface

Represents a chunk of content with metadata about position in stream and optional release semantics.

/**
 * Content chunk with last-chunk indication and optional release function
 */
interface Content.Chunk extends Retainable {
    /** Get chunk data as ByteBuffer */
    ByteBuffer getByteBuffer();
    
    /** Check if this is the last chunk in the stream */
    boolean isLast();
    
    /** Get failure information if chunk represents an error */
    default Throwable getFailure() {
        return null;
    }
    
    /** Check if chunk has remaining bytes */
    default boolean hasRemaining() {
        return getByteBuffer().hasRemaining();
    }
    
    // Static factory methods
    static Chunk from(ByteBuffer buffer, boolean last);
    static Chunk from(ByteBuffer buffer, boolean last, Runnable releaser);
    static Chunk from(Throwable failure);
    static Chunk from(Throwable failure, boolean last);
    static Chunk asChunk(ByteBuffer buffer, boolean last, Retainable retainable);
    
    // Static utility methods
    static boolean isFailure(Chunk chunk);
    static Chunk next(Chunk chunk);
    
    // Constants
    Chunk EMPTY = new EmptyChunk(false);
    Chunk EOF = new EmptyChunk(true);
    
    interface Processor {
        void process(Chunk chunk, Callback callback);
    }
}

Usage Examples:

// Creating chunks
ByteBuffer data = ByteBuffer.wrap("Hello".getBytes());
Content.Chunk chunk = Content.Chunk.from(data, false);

// Processing chunk data
if (chunk.hasRemaining()) {
    ByteBuffer buffer = chunk.getByteBuffer();
    byte[] data = new byte[buffer.remaining()];
    buffer.get(data);
    String content = new String(data);
    System.out.println("Chunk content: " + content);
}

// Creating chunk with release callback
Content.Chunk chunkWithReleaser = Content.Chunk.from(data, false, () -> {
    System.out.println("Chunk data released");
    // Perform cleanup
});

// Check for errors
if (Content.Chunk.isFailure(chunk)) {
    Throwable error = chunk.getFailure();
    System.err.println("Chunk contains error: " + error.getMessage());
}

// Retaining chunks for async processing
if (chunk.canRetain()) {
    chunk.retain();
    processAsync(chunk); // Will call release() when done
}

// Working with ByteBuffer directly for data manipulation
Content.Chunk dataChunk = Content.Chunk.from(ByteBuffer.wrap("0123456789".getBytes()), false);
ByteBuffer buffer = dataChunk.getByteBuffer();
buffer.position(5); // Skip first 5 bytes
String remaining = StandardCharsets.UTF_8.decode(buffer.slice()).toString();
// Remaining data is "56789"

Content Copy Operations

The Content class provides utilities for copying data between sources and sinks.

/**
 * Content copying utilities
 */
class Content {
    /** 
     * Copy content from source to sink asynchronously
     * @param source content source to read from
     * @param sink content sink to write to  
     * @param callback callback for completion notification
     */
    static void copy(Source source, Sink sink, Callback callback);
}

Usage Example:

// Copy from file to output
Content.Source fileSource = Content.Source.from(Paths.get("input.txt"));
Content.Sink outputSink = createOutputSink();

Content.copy(fileSource, outputSink, new Callback() {
    @Override
    public void succeeded() {
        System.out.println("Copy completed successfully");
    }
    
    @Override
    public void failed(Throwable x) {
        System.err.println("Copy failed: " + x.getMessage());
    }
});

Content Source Implementations

AsyncContent

Bidirectional content buffer that can act as both source and sink.

/**
 * Async content buffer that can be both written to and read from
 */
class AsyncContent implements Content.Sink, Content.Source, Closeable {
    public AsyncContent();
    public AsyncContent(ByteBufferPool pool);
    
    // Source methods
    public Chunk read();
    public void demand(Runnable demandCallback);
    public void fail(Throwable failure);
    public long getLength();
    
    // Sink methods  
    public void write(boolean last, ByteBuffer byteBuffer, Callback callback);
    
    // Management
    public void close();
    public boolean isClosed();
    public boolean isEOF();
}

ByteBufferContentSource

Content source backed by ByteBuffer arrays.

/**
 * Content source backed by ByteBuffer array
 */
class ByteBufferContentSource implements Content.Source {
    public ByteBufferContentSource(ByteBuffer... buffers);
    public ByteBufferContentSource(Collection<ByteBuffer> buffers);
    
    public Chunk read();
    public void demand(Runnable demandCallback);
    public long getLength();
    public boolean rewind();
}

InputStreamContentSource

Content source that reads from an InputStream.

/**
 * Content source backed by InputStream
 */
class InputStreamContentSource implements Content.Source {
    public InputStreamContentSource(InputStream inputStream);
    public InputStreamContentSource(InputStream inputStream, ByteBufferPool pool);
    
    public Chunk read();
    public void demand(Runnable demandCallback);
    public void fail(Throwable failure);
    public long getLength();
}

PathContentSource

Content source that reads from a file Path.

/**
 * Content source backed by file Path
 */
class PathContentSource implements Content.Source {
    public PathContentSource(Path path);
    public PathContentSource(Path path, ByteBufferPool pool);
    
    public Chunk read();
    public void demand(Runnable demandCallback);
    public long getLength();
    public boolean rewind();
}

Implementation Usage Examples:

// AsyncContent for producer-consumer pattern
AsyncContent buffer = new AsyncContent();

// Producer writes data
CompletableFuture.runAsync(() -> {
    buffer.write(false, ByteBuffer.wrap("Hello ".getBytes()), Callback.NOOP);
    buffer.write(false, ByteBuffer.wrap("World".getBytes()), Callback.NOOP);
    buffer.write(true, ByteBuffer.allocate(0), Callback.NOOP); // EOF
});

// Consumer reads data
Content.Chunk chunk;
while ((chunk = buffer.read()) != null) {
    // Process chunk
    if (chunk.isLast()) break;
}

// ByteBuffer source
ByteBuffer[] buffers = {
    ByteBuffer.wrap("Hello ".getBytes()),
    ByteBuffer.wrap("World".getBytes())
};
ByteBufferContentSource source = new ByteBufferContentSource(buffers);

// File source
PathContentSource fileSource = new PathContentSource(Paths.get("data.txt"));
System.out.println("File size: " + fileSource.getLength());

// InputStream source with custom pool
InputStream input = new FileInputStream("data.txt");
InputStreamContentSource streamSource = new InputStreamContentSource(input, customPool);

Reactive Streams Integration

ContentSourcePublisher

Adapts Content.Source to Flow.Publisher for reactive streams integration.

/**
 * Adapts Content.Source to reactive streams Publisher
 */
class ContentSourcePublisher implements Flow.Publisher<Content.Chunk> {
    public ContentSourcePublisher(Content.Source source);
    
    public void subscribe(Flow.Subscriber<? super Content.Chunk> subscriber);
}

ContentSinkSubscriber

Adapts Content.Sink to Flow.Subscriber for reactive streams integration.

/**
 * Adapts Content.Sink to reactive streams Subscriber
 */
class ContentSinkSubscriber implements Flow.Subscriber<Content.Chunk> {
    public ContentSinkSubscriber(Content.Sink sink, Callback callback);
    
    public void onSubscribe(Flow.Subscription subscription);
    public void onNext(Content.Chunk chunk);
    public void onError(Throwable throwable);
    public void onComplete();
}

Reactive Streams Examples:

// Publisher from source
Content.Source source = Content.Source.from(largeFile);
ContentSourcePublisher publisher = new ContentSourcePublisher(source);

// Subscribe with backpressure handling
publisher.subscribe(new Flow.Subscriber<Content.Chunk>() {
    private Flow.Subscription subscription;
    
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1); // Request first chunk
    }
    
    @Override
    public void onNext(Content.Chunk chunk) {
        // Process chunk
        processChunk(chunk);
        
        // Request next chunk
        subscription.request(1);
    }
    
    @Override
    public void onError(Throwable throwable) {
        System.err.println("Stream error: " + throwable.getMessage());
    }
    
    @Override
    public void onComplete() {
        System.out.println("Stream completed");
    }
});

// Subscriber to sink
Content.Sink sink = createOutputSink();
ContentSinkSubscriber subscriber = new ContentSinkSubscriber(sink, new Callback() {
    @Override
    public void succeeded() {
        System.out.println("All data written to sink");
    }
    
    @Override
    public void failed(Throwable x) {
        System.err.println("Sink writing failed: " + x.getMessage());
    }
});

// Connect publisher to subscriber
publisher.subscribe(subscriber);

Install with Tessl CLI

npx tessl i tessl/maven-org-eclipse-jetty--jetty-io

docs

buffer-management.md

connection-management.md

content-streaming.md

core-io.md

index.md

selector-management.md

ssl-support.md

tile.json