0
# Content Streaming
1
2
Jetty IO's content streaming system provides a demand-driven, backpressure-aware approach to handling data streams. It supports both synchronous and asynchronous operations with integration to reactive streams.
3
4
## Capabilities
5
6
### Content.Source Interface
7
8
The Source interface provides a demand-based content reading model with support for chunked data processing.
9
10
```java { .api }
11
/**
12
* Content source with read/demand model
13
*/
14
interface Content.Source {
15
/**
16
* Read next chunk of content (may return null if no data available)
17
* @return Content.Chunk or null if no data currently available
18
*/
19
Chunk read();
20
21
/**
22
* Request async notification when content becomes available
23
* @param demandCallback callback to invoke when content is available for reading
24
*/
25
void demand(Runnable demandCallback);
26
27
/**
28
* Fail the source with an error
29
* @param failure the error that caused the failure
30
*/
31
void fail(Throwable failure);
32
33
/**
34
* Get content length if known
35
* @return content length in bytes, or -1 if unknown
36
*/
37
default long getLength() {
38
return -1;
39
}
40
41
// Static factory methods
42
static Source from(ByteBuffer... buffers);
43
static Source from(Path path);
44
static Source from(InputStream inputStream);
45
static Source from(String string);
46
static Source from(String string, Charset charset);
47
48
// Static utility methods
49
static CompletableFuture<ByteBuffer> asByteBuffer(Source source);
50
static CompletableFuture<String> asString(Source source);
51
static CompletableFuture<String> asString(Source source, Charset charset);
52
static InputStream asInputStream(Source source);
53
static Flow.Publisher<Chunk> asPublisher(Source source);
54
static CompletableFuture<Void> consumeAll(Source source);
55
56
interface Factory {
57
Source newSource();
58
}
59
}
60
```
61
62
**Usage Examples:**
63
64
```java
65
// Reading from source synchronously
66
Content.Source source = Content.Source.from("Hello World");
67
Content.Chunk chunk;
68
while ((chunk = source.read()) != null) {
69
if (chunk.hasRemaining()) {
70
ByteBuffer data = chunk.getByteBuffer();
71
// Process data
72
System.out.println(StandardCharsets.UTF_8.decode(data.duplicate()));
73
}
74
if (chunk.isLast()) {
75
break;
76
}
77
}
78
79
// Reading asynchronously with demand
80
Content.Source asyncSource = Content.Source.from(inputStream);
81
readAsync(asyncSource);
82
83
void readAsync(Content.Source source) {
84
Content.Chunk chunk = source.read();
85
if (chunk == null) {
86
// No data available, request notification
87
source.demand(() -> readAsync(source));
88
return;
89
}
90
91
// Process chunk
92
processChunk(chunk);
93
94
if (!chunk.isLast()) {
95
// Continue reading
96
readAsync(source);
97
}
98
}
99
100
// Converting source to other formats
101
Content.Source source = Content.Source.from(path);
102
103
// As ByteBuffer
104
CompletableFuture<ByteBuffer> bufferFuture = Content.Source.asByteBuffer(source);
105
bufferFuture.thenAccept(buffer -> {
106
// Process complete buffer
107
});
108
109
// As String
110
CompletableFuture<String> stringFuture = Content.Source.asString(source, StandardCharsets.UTF_8);
111
stringFuture.thenAccept(content -> {
112
System.out.println("Content: " + content);
113
});
114
115
// As InputStream
116
InputStream inputStream = Content.Source.asInputStream(source);
117
// Use as regular InputStream
118
119
// As Publisher (reactive streams)
120
Flow.Publisher<Content.Chunk> publisher = Content.Source.asPublisher(source);
121
publisher.subscribe(new Flow.Subscriber<Content.Chunk>() {
122
@Override
123
public void onNext(Content.Chunk chunk) {
124
// Process chunk
125
}
126
// ... other methods
127
});
128
```
129
130
### Content.Sink Interface
131
132
The Sink interface provides async content writing capabilities.
133
134
```java { .api }
135
/**
136
* Content sink for writing content
137
*/
138
interface Content.Sink {
139
/**
140
* Write content chunk asynchronously
141
* @param last true if this is the last chunk
142
* @param byteBuffer data to write
143
* @param callback callback for completion notification
144
*/
145
void write(boolean last, ByteBuffer byteBuffer, Callback callback);
146
147
// Static factory methods
148
static Sink asBuffered(Sink sink);
149
static Sink asBuffered(Sink sink, ByteBufferPool pool, boolean direct, int size, int maxSize);
150
static OutputStream asOutputStream(Sink sink);
151
static Flow.Subscriber<Chunk> asSubscriber(Sink sink, Callback callback);
152
153
// Static utility methods
154
static void write(Sink sink, boolean last, ByteBuffer byteBuffer) throws IOException;
155
}
156
```
157
158
**Usage Examples:**
159
160
```java
161
// Basic sink writing
162
Content.Sink sink = createSink(); // Implementation specific
163
ByteBuffer data = ByteBuffer.wrap("Hello World".getBytes());
164
165
sink.write(true, data, new Callback() {
166
@Override
167
public void succeeded() {
168
System.out.println("Write completed successfully");
169
}
170
171
@Override
172
public void failed(Throwable x) {
173
System.err.println("Write failed: " + x.getMessage());
174
}
175
});
176
177
// Buffered sink for small writes
178
Content.Sink bufferedSink = Content.Sink.asBuffered(sink);
179
bufferedSink.write(false, ByteBuffer.wrap("Part 1".getBytes()), Callback.NOOP);
180
bufferedSink.write(false, ByteBuffer.wrap("Part 2".getBytes()), Callback.NOOP);
181
bufferedSink.write(true, ByteBuffer.wrap("Part 3".getBytes()), Callback.NOOP);
182
183
// As OutputStream
184
OutputStream outputStream = Content.Sink.asOutputStream(sink);
185
try {
186
outputStream.write("Hello World".getBytes());
187
outputStream.close(); // Writes final chunk with last=true
188
} catch (IOException e) {
189
// Handle error
190
}
191
192
// As Subscriber (reactive streams)
193
Flow.Subscriber<Content.Chunk> subscriber = Content.Sink.asSubscriber(sink, new Callback() {
194
@Override
195
public void succeeded() {
196
System.out.println("All chunks written successfully");
197
}
198
199
@Override
200
public void failed(Throwable x) {
201
System.err.println("Writing failed: " + x.getMessage());
202
}
203
});
204
205
// Use subscriber with publisher
206
Content.Source source = Content.Source.from(data);
207
Flow.Publisher<Content.Chunk> publisher = Content.Source.asPublisher(source);
208
publisher.subscribe(subscriber);
209
```
210
211
### Content.Chunk Interface
212
213
Represents a chunk of content with metadata about position in stream and optional release semantics.
214
215
```java { .api }
216
/**
217
* Content chunk with last-chunk indication and optional release function
218
*/
219
interface Content.Chunk extends Retainable {
220
/** Get chunk data as ByteBuffer */
221
ByteBuffer getByteBuffer();
222
223
/** Check if this is the last chunk in the stream */
224
boolean isLast();
225
226
/** Get failure information if chunk represents an error */
227
default Throwable getFailure() {
228
return null;
229
}
230
231
/** Check if chunk has remaining bytes */
232
default boolean hasRemaining() {
233
return getByteBuffer().hasRemaining();
234
}
235
236
// Static factory methods
237
static Chunk from(ByteBuffer buffer, boolean last);
238
static Chunk from(ByteBuffer buffer, boolean last, Runnable releaser);
239
static Chunk from(Throwable failure);
240
static Chunk from(Throwable failure, boolean last);
241
static Chunk asChunk(ByteBuffer buffer, boolean last, Retainable retainable);
242
243
// Static utility methods
244
static boolean isFailure(Chunk chunk);
245
static Chunk next(Chunk chunk);
246
247
// Constants
248
Chunk EMPTY = new EmptyChunk(false);
249
Chunk EOF = new EmptyChunk(true);
250
251
interface Processor {
252
void process(Chunk chunk, Callback callback);
253
}
254
}
255
```
256
257
**Usage Examples:**
258
259
```java
260
// Creating chunks
261
ByteBuffer data = ByteBuffer.wrap("Hello".getBytes());
262
Content.Chunk chunk = Content.Chunk.from(data, false);
263
264
// Processing chunk data
265
if (chunk.hasRemaining()) {
266
ByteBuffer buffer = chunk.getByteBuffer();
267
byte[] data = new byte[buffer.remaining()];
268
buffer.get(data);
269
String content = new String(data);
270
System.out.println("Chunk content: " + content);
271
}
272
273
// Creating chunk with release callback
274
Content.Chunk chunkWithReleaser = Content.Chunk.from(data, false, () -> {
275
System.out.println("Chunk data released");
276
// Perform cleanup
277
});
278
279
// Check for errors
280
if (Content.Chunk.isFailure(chunk)) {
281
Throwable error = chunk.getFailure();
282
System.err.println("Chunk contains error: " + error.getMessage());
283
}
284
285
// Retaining chunks for async processing
286
if (chunk.canRetain()) {
287
chunk.retain();
288
processAsync(chunk); // Will call release() when done
289
}
290
291
// Working with ByteBuffer directly for data manipulation
292
Content.Chunk dataChunk = Content.Chunk.from(ByteBuffer.wrap("0123456789".getBytes()), false);
293
ByteBuffer buffer = dataChunk.getByteBuffer();
294
buffer.position(5); // Skip first 5 bytes
295
String remaining = StandardCharsets.UTF_8.decode(buffer.slice()).toString();
296
// Remaining data is "56789"
297
```
298
299
### Content Copy Operations
300
301
The Content class provides utilities for copying data between sources and sinks.
302
303
```java { .api }
304
/**
305
* Content copying utilities
306
*/
307
class Content {
308
/**
309
* Copy content from source to sink asynchronously
310
* @param source content source to read from
311
* @param sink content sink to write to
312
* @param callback callback for completion notification
313
*/
314
static void copy(Source source, Sink sink, Callback callback);
315
}
316
```
317
318
**Usage Example:**
319
320
```java
321
// Copy from file to output
322
Content.Source fileSource = Content.Source.from(Paths.get("input.txt"));
323
Content.Sink outputSink = createOutputSink();
324
325
Content.copy(fileSource, outputSink, new Callback() {
326
@Override
327
public void succeeded() {
328
System.out.println("Copy completed successfully");
329
}
330
331
@Override
332
public void failed(Throwable x) {
333
System.err.println("Copy failed: " + x.getMessage());
334
}
335
});
336
```
337
338
### Content Source Implementations
339
340
#### AsyncContent
341
342
Bidirectional content buffer that can act as both source and sink.
343
344
```java { .api }
345
/**
346
* Async content buffer that can be both written to and read from
347
*/
348
class AsyncContent implements Content.Sink, Content.Source, Closeable {
349
public AsyncContent();
350
public AsyncContent(ByteBufferPool pool);
351
352
// Source methods
353
public Chunk read();
354
public void demand(Runnable demandCallback);
355
public void fail(Throwable failure);
356
public long getLength();
357
358
// Sink methods
359
public void write(boolean last, ByteBuffer byteBuffer, Callback callback);
360
361
// Management
362
public void close();
363
public boolean isClosed();
364
public boolean isEOF();
365
}
366
```
367
368
#### ByteBufferContentSource
369
370
Content source backed by ByteBuffer arrays.
371
372
```java { .api }
373
/**
374
* Content source backed by ByteBuffer array
375
*/
376
class ByteBufferContentSource implements Content.Source {
377
public ByteBufferContentSource(ByteBuffer... buffers);
378
public ByteBufferContentSource(Collection<ByteBuffer> buffers);
379
380
public Chunk read();
381
public void demand(Runnable demandCallback);
382
public long getLength();
383
public boolean rewind();
384
}
385
```
386
387
#### InputStreamContentSource
388
389
Content source that reads from an InputStream.
390
391
```java { .api }
392
/**
393
* Content source backed by InputStream
394
*/
395
class InputStreamContentSource implements Content.Source {
396
public InputStreamContentSource(InputStream inputStream);
397
public InputStreamContentSource(InputStream inputStream, ByteBufferPool pool);
398
399
public Chunk read();
400
public void demand(Runnable demandCallback);
401
public void fail(Throwable failure);
402
public long getLength();
403
}
404
```
405
406
#### PathContentSource
407
408
Content source that reads from a file Path.
409
410
```java { .api }
411
/**
412
* Content source backed by file Path
413
*/
414
class PathContentSource implements Content.Source {
415
public PathContentSource(Path path);
416
public PathContentSource(Path path, ByteBufferPool pool);
417
418
public Chunk read();
419
public void demand(Runnable demandCallback);
420
public long getLength();
421
public boolean rewind();
422
}
423
```
424
425
**Implementation Usage Examples:**
426
427
```java
428
// AsyncContent for producer-consumer pattern
429
AsyncContent buffer = new AsyncContent();
430
431
// Producer writes data
432
CompletableFuture.runAsync(() -> {
433
buffer.write(false, ByteBuffer.wrap("Hello ".getBytes()), Callback.NOOP);
434
buffer.write(false, ByteBuffer.wrap("World".getBytes()), Callback.NOOP);
435
buffer.write(true, ByteBuffer.allocate(0), Callback.NOOP); // EOF
436
});
437
438
// Consumer reads data
439
Content.Chunk chunk;
440
while ((chunk = buffer.read()) != null) {
441
// Process chunk
442
if (chunk.isLast()) break;
443
}
444
445
// ByteBuffer source
446
ByteBuffer[] buffers = {
447
ByteBuffer.wrap("Hello ".getBytes()),
448
ByteBuffer.wrap("World".getBytes())
449
};
450
ByteBufferContentSource source = new ByteBufferContentSource(buffers);
451
452
// File source
453
PathContentSource fileSource = new PathContentSource(Paths.get("data.txt"));
454
System.out.println("File size: " + fileSource.getLength());
455
456
// InputStream source with custom pool
457
InputStream input = new FileInputStream("data.txt");
458
InputStreamContentSource streamSource = new InputStreamContentSource(input, customPool);
459
```
460
461
### Reactive Streams Integration
462
463
#### ContentSourcePublisher
464
465
Adapts Content.Source to Flow.Publisher for reactive streams integration.
466
467
```java { .api }
468
/**
469
* Adapts Content.Source to reactive streams Publisher
470
*/
471
class ContentSourcePublisher implements Flow.Publisher<Content.Chunk> {
472
public ContentSourcePublisher(Content.Source source);
473
474
public void subscribe(Flow.Subscriber<? super Content.Chunk> subscriber);
475
}
476
```
477
478
#### ContentSinkSubscriber
479
480
Adapts Content.Sink to Flow.Subscriber for reactive streams integration.
481
482
```java { .api }
483
/**
484
* Adapts Content.Sink to reactive streams Subscriber
485
*/
486
class ContentSinkSubscriber implements Flow.Subscriber<Content.Chunk> {
487
public ContentSinkSubscriber(Content.Sink sink, Callback callback);
488
489
public void onSubscribe(Flow.Subscription subscription);
490
public void onNext(Content.Chunk chunk);
491
public void onError(Throwable throwable);
492
public void onComplete();
493
}
494
```
495
496
**Reactive Streams Examples:**
497
498
```java
499
// Publisher from source
500
Content.Source source = Content.Source.from(largeFile);
501
ContentSourcePublisher publisher = new ContentSourcePublisher(source);
502
503
// Subscribe with backpressure handling
504
publisher.subscribe(new Flow.Subscriber<Content.Chunk>() {
505
private Flow.Subscription subscription;
506
507
@Override
508
public void onSubscribe(Flow.Subscription subscription) {
509
this.subscription = subscription;
510
subscription.request(1); // Request first chunk
511
}
512
513
@Override
514
public void onNext(Content.Chunk chunk) {
515
// Process chunk
516
processChunk(chunk);
517
518
// Request next chunk
519
subscription.request(1);
520
}
521
522
@Override
523
public void onError(Throwable throwable) {
524
System.err.println("Stream error: " + throwable.getMessage());
525
}
526
527
@Override
528
public void onComplete() {
529
System.out.println("Stream completed");
530
}
531
});
532
533
// Subscriber to sink
534
Content.Sink sink = createOutputSink();
535
ContentSinkSubscriber subscriber = new ContentSinkSubscriber(sink, new Callback() {
536
@Override
537
public void succeeded() {
538
System.out.println("All data written to sink");
539
}
540
541
@Override
542
public void failed(Throwable x) {
543
System.err.println("Sink writing failed: " + x.getMessage());
544
}
545
});
546
547
// Connect publisher to subscriber
548
publisher.subscribe(subscriber);
549
```