Core I/O components for Eclipse Jetty providing essential I/O utilities, buffer management, and network connection handling
—
Jetty IO's content streaming system provides a demand-driven, backpressure-aware approach to handling data streams. It supports both synchronous and asynchronous operations with integration to reactive streams.
The Source interface provides a demand-based content reading model with support for chunked data processing.
/**
* Content source with read/demand model
*/
interface Content.Source {
/**
* Read next chunk of content (may return null if no data available)
* @return Content.Chunk or null if no data currently available
*/
Chunk read();
/**
* Request async notification when content becomes available
* @param demandCallback callback to invoke when content is available for reading
*/
void demand(Runnable demandCallback);
/**
* Fail the source with an error
* @param failure the error that caused the failure
*/
void fail(Throwable failure);
/**
* Get content length if known
* @return content length in bytes, or -1 if unknown
*/
default long getLength() {
return -1;
}
// Static factory methods
static Source from(ByteBuffer... buffers);
static Source from(Path path);
static Source from(InputStream inputStream);
static Source from(String string);
static Source from(String string, Charset charset);
// Static utility methods
static CompletableFuture<ByteBuffer> asByteBuffer(Source source);
static CompletableFuture<String> asString(Source source);
static CompletableFuture<String> asString(Source source, Charset charset);
static InputStream asInputStream(Source source);
static Flow.Publisher<Chunk> asPublisher(Source source);
static CompletableFuture<Void> consumeAll(Source source);
interface Factory {
Source newSource();
}
}Usage Examples:
// Reading from source synchronously
Content.Source source = Content.Source.from("Hello World");
Content.Chunk chunk;
while ((chunk = source.read()) != null) {
if (chunk.hasRemaining()) {
ByteBuffer data = chunk.getByteBuffer();
// Process data
System.out.println(StandardCharsets.UTF_8.decode(data.duplicate()));
}
if (chunk.isLast()) {
break;
}
}
// Reading asynchronously with demand
Content.Source asyncSource = Content.Source.from(inputStream);
readAsync(asyncSource);
void readAsync(Content.Source source) {
Content.Chunk chunk = source.read();
if (chunk == null) {
// No data available, request notification
source.demand(() -> readAsync(source));
return;
}
// Process chunk
processChunk(chunk);
if (!chunk.isLast()) {
// Continue reading
readAsync(source);
}
}
// Converting source to other formats
Content.Source source = Content.Source.from(path);
// As ByteBuffer
CompletableFuture<ByteBuffer> bufferFuture = Content.Source.asByteBuffer(source);
bufferFuture.thenAccept(buffer -> {
// Process complete buffer
});
// As String
CompletableFuture<String> stringFuture = Content.Source.asString(source, StandardCharsets.UTF_8);
stringFuture.thenAccept(content -> {
System.out.println("Content: " + content);
});
// As InputStream
InputStream inputStream = Content.Source.asInputStream(source);
// Use as regular InputStream
// As Publisher (reactive streams)
Flow.Publisher<Content.Chunk> publisher = Content.Source.asPublisher(source);
publisher.subscribe(new Flow.Subscriber<Content.Chunk>() {
@Override
public void onNext(Content.Chunk chunk) {
// Process chunk
}
// ... other methods
});The Sink interface provides async content writing capabilities.
/**
* Content sink for writing content
*/
interface Content.Sink {
/**
* Write content chunk asynchronously
* @param last true if this is the last chunk
* @param byteBuffer data to write
* @param callback callback for completion notification
*/
void write(boolean last, ByteBuffer byteBuffer, Callback callback);
// Static factory methods
static Sink asBuffered(Sink sink);
static Sink asBuffered(Sink sink, ByteBufferPool pool, boolean direct, int size, int maxSize);
static OutputStream asOutputStream(Sink sink);
static Flow.Subscriber<Chunk> asSubscriber(Sink sink, Callback callback);
// Static utility methods
static void write(Sink sink, boolean last, ByteBuffer byteBuffer) throws IOException;
}Usage Examples:
// Basic sink writing
Content.Sink sink = createSink(); // Implementation specific
ByteBuffer data = ByteBuffer.wrap("Hello World".getBytes());
sink.write(true, data, new Callback() {
@Override
public void succeeded() {
System.out.println("Write completed successfully");
}
@Override
public void failed(Throwable x) {
System.err.println("Write failed: " + x.getMessage());
}
});
// Buffered sink for small writes
Content.Sink bufferedSink = Content.Sink.asBuffered(sink);
bufferedSink.write(false, ByteBuffer.wrap("Part 1".getBytes()), Callback.NOOP);
bufferedSink.write(false, ByteBuffer.wrap("Part 2".getBytes()), Callback.NOOP);
bufferedSink.write(true, ByteBuffer.wrap("Part 3".getBytes()), Callback.NOOP);
// As OutputStream
OutputStream outputStream = Content.Sink.asOutputStream(sink);
try {
outputStream.write("Hello World".getBytes());
outputStream.close(); // Writes final chunk with last=true
} catch (IOException e) {
// Handle error
}
// As Subscriber (reactive streams)
Flow.Subscriber<Content.Chunk> subscriber = Content.Sink.asSubscriber(sink, new Callback() {
@Override
public void succeeded() {
System.out.println("All chunks written successfully");
}
@Override
public void failed(Throwable x) {
System.err.println("Writing failed: " + x.getMessage());
}
});
// Use subscriber with publisher
Content.Source source = Content.Source.from(data);
Flow.Publisher<Content.Chunk> publisher = Content.Source.asPublisher(source);
publisher.subscribe(subscriber);Represents a chunk of content with metadata about position in stream and optional release semantics.
/**
* Content chunk with last-chunk indication and optional release function
*/
interface Content.Chunk extends Retainable {
/** Get chunk data as ByteBuffer */
ByteBuffer getByteBuffer();
/** Check if this is the last chunk in the stream */
boolean isLast();
/** Get failure information if chunk represents an error */
default Throwable getFailure() {
return null;
}
/** Check if chunk has remaining bytes */
default boolean hasRemaining() {
return getByteBuffer().hasRemaining();
}
// Static factory methods
static Chunk from(ByteBuffer buffer, boolean last);
static Chunk from(ByteBuffer buffer, boolean last, Runnable releaser);
static Chunk from(Throwable failure);
static Chunk from(Throwable failure, boolean last);
static Chunk asChunk(ByteBuffer buffer, boolean last, Retainable retainable);
// Static utility methods
static boolean isFailure(Chunk chunk);
static Chunk next(Chunk chunk);
// Constants
Chunk EMPTY = new EmptyChunk(false);
Chunk EOF = new EmptyChunk(true);
interface Processor {
void process(Chunk chunk, Callback callback);
}
}Usage Examples:
// Creating chunks
ByteBuffer data = ByteBuffer.wrap("Hello".getBytes());
Content.Chunk chunk = Content.Chunk.from(data, false);
// Processing chunk data
if (chunk.hasRemaining()) {
ByteBuffer buffer = chunk.getByteBuffer();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
String content = new String(data);
System.out.println("Chunk content: " + content);
}
// Creating chunk with release callback
Content.Chunk chunkWithReleaser = Content.Chunk.from(data, false, () -> {
System.out.println("Chunk data released");
// Perform cleanup
});
// Check for errors
if (Content.Chunk.isFailure(chunk)) {
Throwable error = chunk.getFailure();
System.err.println("Chunk contains error: " + error.getMessage());
}
// Retaining chunks for async processing
if (chunk.canRetain()) {
chunk.retain();
processAsync(chunk); // Will call release() when done
}
// Working with ByteBuffer directly for data manipulation
Content.Chunk dataChunk = Content.Chunk.from(ByteBuffer.wrap("0123456789".getBytes()), false);
ByteBuffer buffer = dataChunk.getByteBuffer();
buffer.position(5); // Skip first 5 bytes
String remaining = StandardCharsets.UTF_8.decode(buffer.slice()).toString();
// Remaining data is "56789"The Content class provides utilities for copying data between sources and sinks.
/**
* Content copying utilities
*/
class Content {
/**
* Copy content from source to sink asynchronously
* @param source content source to read from
* @param sink content sink to write to
* @param callback callback for completion notification
*/
static void copy(Source source, Sink sink, Callback callback);
}Usage Example:
// Copy from file to output
Content.Source fileSource = Content.Source.from(Paths.get("input.txt"));
Content.Sink outputSink = createOutputSink();
Content.copy(fileSource, outputSink, new Callback() {
@Override
public void succeeded() {
System.out.println("Copy completed successfully");
}
@Override
public void failed(Throwable x) {
System.err.println("Copy failed: " + x.getMessage());
}
});Bidirectional content buffer that can act as both source and sink.
/**
* Async content buffer that can be both written to and read from
*/
class AsyncContent implements Content.Sink, Content.Source, Closeable {
public AsyncContent();
public AsyncContent(ByteBufferPool pool);
// Source methods
public Chunk read();
public void demand(Runnable demandCallback);
public void fail(Throwable failure);
public long getLength();
// Sink methods
public void write(boolean last, ByteBuffer byteBuffer, Callback callback);
// Management
public void close();
public boolean isClosed();
public boolean isEOF();
}Content source backed by ByteBuffer arrays.
/**
* Content source backed by ByteBuffer array
*/
class ByteBufferContentSource implements Content.Source {
public ByteBufferContentSource(ByteBuffer... buffers);
public ByteBufferContentSource(Collection<ByteBuffer> buffers);
public Chunk read();
public void demand(Runnable demandCallback);
public long getLength();
public boolean rewind();
}Content source that reads from an InputStream.
/**
* Content source backed by InputStream
*/
class InputStreamContentSource implements Content.Source {
public InputStreamContentSource(InputStream inputStream);
public InputStreamContentSource(InputStream inputStream, ByteBufferPool pool);
public Chunk read();
public void demand(Runnable demandCallback);
public void fail(Throwable failure);
public long getLength();
}Content source that reads from a file Path.
/**
* Content source backed by file Path
*/
class PathContentSource implements Content.Source {
public PathContentSource(Path path);
public PathContentSource(Path path, ByteBufferPool pool);
public Chunk read();
public void demand(Runnable demandCallback);
public long getLength();
public boolean rewind();
}Implementation Usage Examples:
// AsyncContent for producer-consumer pattern
AsyncContent buffer = new AsyncContent();
// Producer writes data
CompletableFuture.runAsync(() -> {
buffer.write(false, ByteBuffer.wrap("Hello ".getBytes()), Callback.NOOP);
buffer.write(false, ByteBuffer.wrap("World".getBytes()), Callback.NOOP);
buffer.write(true, ByteBuffer.allocate(0), Callback.NOOP); // EOF
});
// Consumer reads data
Content.Chunk chunk;
while ((chunk = buffer.read()) != null) {
// Process chunk
if (chunk.isLast()) break;
}
// ByteBuffer source
ByteBuffer[] buffers = {
ByteBuffer.wrap("Hello ".getBytes()),
ByteBuffer.wrap("World".getBytes())
};
ByteBufferContentSource source = new ByteBufferContentSource(buffers);
// File source
PathContentSource fileSource = new PathContentSource(Paths.get("data.txt"));
System.out.println("File size: " + fileSource.getLength());
// InputStream source with custom pool
InputStream input = new FileInputStream("data.txt");
InputStreamContentSource streamSource = new InputStreamContentSource(input, customPool);Adapts Content.Source to Flow.Publisher for reactive streams integration.
/**
* Adapts Content.Source to reactive streams Publisher
*/
class ContentSourcePublisher implements Flow.Publisher<Content.Chunk> {
public ContentSourcePublisher(Content.Source source);
public void subscribe(Flow.Subscriber<? super Content.Chunk> subscriber);
}Adapts Content.Sink to Flow.Subscriber for reactive streams integration.
/**
* Adapts Content.Sink to reactive streams Subscriber
*/
class ContentSinkSubscriber implements Flow.Subscriber<Content.Chunk> {
public ContentSinkSubscriber(Content.Sink sink, Callback callback);
public void onSubscribe(Flow.Subscription subscription);
public void onNext(Content.Chunk chunk);
public void onError(Throwable throwable);
public void onComplete();
}Reactive Streams Examples:
// Publisher from source
Content.Source source = Content.Source.from(largeFile);
ContentSourcePublisher publisher = new ContentSourcePublisher(source);
// Subscribe with backpressure handling
publisher.subscribe(new Flow.Subscriber<Content.Chunk>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // Request first chunk
}
@Override
public void onNext(Content.Chunk chunk) {
// Process chunk
processChunk(chunk);
// Request next chunk
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.println("Stream error: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Stream completed");
}
});
// Subscriber to sink
Content.Sink sink = createOutputSink();
ContentSinkSubscriber subscriber = new ContentSinkSubscriber(sink, new Callback() {
@Override
public void succeeded() {
System.out.println("All data written to sink");
}
@Override
public void failed(Throwable x) {
System.err.println("Sink writing failed: " + x.getMessage());
}
});
// Connect publisher to subscriber
publisher.subscribe(subscriber);Install with Tessl CLI
npx tessl i tessl/maven-org-eclipse-jetty--jetty-io