or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

buffer-management.mdconnection-management.mdcontent-streaming.mdcore-io.mdindex.mdselector-management.mdssl-support.md

content-streaming.mddocs/

0

# Content Streaming

1

2

Jetty IO's content streaming system provides a demand-driven, backpressure-aware approach to handling data streams. It supports both synchronous and asynchronous operations with integration to reactive streams.

3

4

## Capabilities

5

6

### Content.Source Interface

7

8

The Source interface provides a demand-based content reading model with support for chunked data processing.

9

10

```java { .api }

11

/**

12

* Content source with read/demand model

13

*/

14

interface Content.Source {

15

/**

16

* Read next chunk of content (may return null if no data available)

17

* @return Content.Chunk or null if no data currently available

18

*/

19

Chunk read();

20

21

/**

22

* Request async notification when content becomes available

23

* @param demandCallback callback to invoke when content is available for reading

24

*/

25

void demand(Runnable demandCallback);

26

27

/**

28

* Fail the source with an error

29

* @param failure the error that caused the failure

30

*/

31

void fail(Throwable failure);

32

33

/**

34

* Get content length if known

35

* @return content length in bytes, or -1 if unknown

36

*/

37

default long getLength() {

38

return -1;

39

}

40

41

// Static factory methods

42

static Source from(ByteBuffer... buffers);

43

static Source from(Path path);

44

static Source from(InputStream inputStream);

45

static Source from(String string);

46

static Source from(String string, Charset charset);

47

48

// Static utility methods

49

static CompletableFuture<ByteBuffer> asByteBuffer(Source source);

50

static CompletableFuture<String> asString(Source source);

51

static CompletableFuture<String> asString(Source source, Charset charset);

52

static InputStream asInputStream(Source source);

53

static Flow.Publisher<Chunk> asPublisher(Source source);

54

static CompletableFuture<Void> consumeAll(Source source);

55

56

interface Factory {

57

Source newSource();

58

}

59

}

60

```

61

62

**Usage Examples:**

63

64

```java

65

// Reading from source synchronously

66

Content.Source source = Content.Source.from("Hello World");

67

Content.Chunk chunk;

68

while ((chunk = source.read()) != null) {

69

if (chunk.hasRemaining()) {

70

ByteBuffer data = chunk.getByteBuffer();

71

// Process data

72

System.out.println(StandardCharsets.UTF_8.decode(data.duplicate()));

73

}

74

if (chunk.isLast()) {

75

break;

76

}

77

}

78

79

// Reading asynchronously with demand

80

Content.Source asyncSource = Content.Source.from(inputStream);

81

readAsync(asyncSource);

82

83

void readAsync(Content.Source source) {

84

Content.Chunk chunk = source.read();

85

if (chunk == null) {

86

// No data available, request notification

87

source.demand(() -> readAsync(source));

88

return;

89

}

90

91

// Process chunk

92

processChunk(chunk);

93

94

if (!chunk.isLast()) {

95

// Continue reading

96

readAsync(source);

97

}

98

}

99

100

// Converting source to other formats

101

Content.Source source = Content.Source.from(path);

102

103

// As ByteBuffer

104

CompletableFuture<ByteBuffer> bufferFuture = Content.Source.asByteBuffer(source);

105

bufferFuture.thenAccept(buffer -> {

106

// Process complete buffer

107

});

108

109

// As String

110

CompletableFuture<String> stringFuture = Content.Source.asString(source, StandardCharsets.UTF_8);

111

stringFuture.thenAccept(content -> {

112

System.out.println("Content: " + content);

113

});

114

115

// As InputStream

116

InputStream inputStream = Content.Source.asInputStream(source);

117

// Use as regular InputStream

118

119

// As Publisher (reactive streams)

120

Flow.Publisher<Content.Chunk> publisher = Content.Source.asPublisher(source);

121

publisher.subscribe(new Flow.Subscriber<Content.Chunk>() {

122

@Override

123

public void onNext(Content.Chunk chunk) {

124

// Process chunk

125

}

126

// ... other methods

127

});

128

```

129

130

### Content.Sink Interface

131

132

The Sink interface provides async content writing capabilities.

133

134

```java { .api }

135

/**

136

* Content sink for writing content

137

*/

138

interface Content.Sink {

139

/**

140

* Write content chunk asynchronously

141

* @param last true if this is the last chunk

142

* @param byteBuffer data to write

143

* @param callback callback for completion notification

144

*/

145

void write(boolean last, ByteBuffer byteBuffer, Callback callback);

146

147

// Static factory methods

148

static Sink asBuffered(Sink sink);

149

static Sink asBuffered(Sink sink, ByteBufferPool pool, boolean direct, int size, int maxSize);

150

static OutputStream asOutputStream(Sink sink);

151

static Flow.Subscriber<Chunk> asSubscriber(Sink sink, Callback callback);

152

153

// Static utility methods

154

static void write(Sink sink, boolean last, ByteBuffer byteBuffer) throws IOException;

155

}

156

```

157

158

**Usage Examples:**

159

160

```java

161

// Basic sink writing

162

Content.Sink sink = createSink(); // Implementation specific

163

ByteBuffer data = ByteBuffer.wrap("Hello World".getBytes());

164

165

sink.write(true, data, new Callback() {

166

@Override

167

public void succeeded() {

168

System.out.println("Write completed successfully");

169

}

170

171

@Override

172

public void failed(Throwable x) {

173

System.err.println("Write failed: " + x.getMessage());

174

}

175

});

176

177

// Buffered sink for small writes

178

Content.Sink bufferedSink = Content.Sink.asBuffered(sink);

179

bufferedSink.write(false, ByteBuffer.wrap("Part 1".getBytes()), Callback.NOOP);

180

bufferedSink.write(false, ByteBuffer.wrap("Part 2".getBytes()), Callback.NOOP);

181

bufferedSink.write(true, ByteBuffer.wrap("Part 3".getBytes()), Callback.NOOP);

182

183

// As OutputStream

184

OutputStream outputStream = Content.Sink.asOutputStream(sink);

185

try {

186

outputStream.write("Hello World".getBytes());

187

outputStream.close(); // Writes final chunk with last=true

188

} catch (IOException e) {

189

// Handle error

190

}

191

192

// As Subscriber (reactive streams)

193

Flow.Subscriber<Content.Chunk> subscriber = Content.Sink.asSubscriber(sink, new Callback() {

194

@Override

195

public void succeeded() {

196

System.out.println("All chunks written successfully");

197

}

198

199

@Override

200

public void failed(Throwable x) {

201

System.err.println("Writing failed: " + x.getMessage());

202

}

203

});

204

205

// Use subscriber with publisher

206

Content.Source source = Content.Source.from(data);

207

Flow.Publisher<Content.Chunk> publisher = Content.Source.asPublisher(source);

208

publisher.subscribe(subscriber);

209

```

210

211

### Content.Chunk Interface

212

213

Represents a chunk of content with metadata about position in stream and optional release semantics.

214

215

```java { .api }

216

/**

217

* Content chunk with last-chunk indication and optional release function

218

*/

219

interface Content.Chunk extends Retainable {

220

/** Get chunk data as ByteBuffer */

221

ByteBuffer getByteBuffer();

222

223

/** Check if this is the last chunk in the stream */

224

boolean isLast();

225

226

/** Get failure information if chunk represents an error */

227

default Throwable getFailure() {

228

return null;

229

}

230

231

/** Check if chunk has remaining bytes */

232

default boolean hasRemaining() {

233

return getByteBuffer().hasRemaining();

234

}

235

236

// Static factory methods

237

static Chunk from(ByteBuffer buffer, boolean last);

238

static Chunk from(ByteBuffer buffer, boolean last, Runnable releaser);

239

static Chunk from(Throwable failure);

240

static Chunk from(Throwable failure, boolean last);

241

static Chunk asChunk(ByteBuffer buffer, boolean last, Retainable retainable);

242

243

// Static utility methods

244

static boolean isFailure(Chunk chunk);

245

static Chunk next(Chunk chunk);

246

247

// Constants

248

Chunk EMPTY = new EmptyChunk(false);

249

Chunk EOF = new EmptyChunk(true);

250

251

interface Processor {

252

void process(Chunk chunk, Callback callback);

253

}

254

}

255

```

256

257

**Usage Examples:**

258

259

```java

260

// Creating chunks

261

ByteBuffer data = ByteBuffer.wrap("Hello".getBytes());

262

Content.Chunk chunk = Content.Chunk.from(data, false);

263

264

// Processing chunk data

265

if (chunk.hasRemaining()) {

266

ByteBuffer buffer = chunk.getByteBuffer();

267

byte[] data = new byte[buffer.remaining()];

268

buffer.get(data);

269

String content = new String(data);

270

System.out.println("Chunk content: " + content);

271

}

272

273

// Creating chunk with release callback

274

Content.Chunk chunkWithReleaser = Content.Chunk.from(data, false, () -> {

275

System.out.println("Chunk data released");

276

// Perform cleanup

277

});

278

279

// Check for errors

280

if (Content.Chunk.isFailure(chunk)) {

281

Throwable error = chunk.getFailure();

282

System.err.println("Chunk contains error: " + error.getMessage());

283

}

284

285

// Retaining chunks for async processing

286

if (chunk.canRetain()) {

287

chunk.retain();

288

processAsync(chunk); // Will call release() when done

289

}

290

291

// Working with ByteBuffer directly for data manipulation

292

Content.Chunk dataChunk = Content.Chunk.from(ByteBuffer.wrap("0123456789".getBytes()), false);

293

ByteBuffer buffer = dataChunk.getByteBuffer();

294

buffer.position(5); // Skip first 5 bytes

295

String remaining = StandardCharsets.UTF_8.decode(buffer.slice()).toString();

296

// Remaining data is "56789"

297

```

298

299

### Content Copy Operations

300

301

The Content class provides utilities for copying data between sources and sinks.

302

303

```java { .api }

304

/**

305

* Content copying utilities

306

*/

307

class Content {

308

/**

309

* Copy content from source to sink asynchronously

310

* @param source content source to read from

311

* @param sink content sink to write to

312

* @param callback callback for completion notification

313

*/

314

static void copy(Source source, Sink sink, Callback callback);

315

}

316

```

317

318

**Usage Example:**

319

320

```java

321

// Copy from file to output

322

Content.Source fileSource = Content.Source.from(Paths.get("input.txt"));

323

Content.Sink outputSink = createOutputSink();

324

325

Content.copy(fileSource, outputSink, new Callback() {

326

@Override

327

public void succeeded() {

328

System.out.println("Copy completed successfully");

329

}

330

331

@Override

332

public void failed(Throwable x) {

333

System.err.println("Copy failed: " + x.getMessage());

334

}

335

});

336

```

337

338

### Content Source Implementations

339

340

#### AsyncContent

341

342

Bidirectional content buffer that can act as both source and sink.

343

344

```java { .api }

345

/**

346

* Async content buffer that can be both written to and read from

347

*/

348

class AsyncContent implements Content.Sink, Content.Source, Closeable {

349

public AsyncContent();

350

public AsyncContent(ByteBufferPool pool);

351

352

// Source methods

353

public Chunk read();

354

public void demand(Runnable demandCallback);

355

public void fail(Throwable failure);

356

public long getLength();

357

358

// Sink methods

359

public void write(boolean last, ByteBuffer byteBuffer, Callback callback);

360

361

// Management

362

public void close();

363

public boolean isClosed();

364

public boolean isEOF();

365

}

366

```

367

368

#### ByteBufferContentSource

369

370

Content source backed by ByteBuffer arrays.

371

372

```java { .api }

373

/**

374

* Content source backed by ByteBuffer array

375

*/

376

class ByteBufferContentSource implements Content.Source {

377

public ByteBufferContentSource(ByteBuffer... buffers);

378

public ByteBufferContentSource(Collection<ByteBuffer> buffers);

379

380

public Chunk read();

381

public void demand(Runnable demandCallback);

382

public long getLength();

383

public boolean rewind();

384

}

385

```

386

387

#### InputStreamContentSource

388

389

Content source that reads from an InputStream.

390

391

```java { .api }

392

/**

393

* Content source backed by InputStream

394

*/

395

class InputStreamContentSource implements Content.Source {

396

public InputStreamContentSource(InputStream inputStream);

397

public InputStreamContentSource(InputStream inputStream, ByteBufferPool pool);

398

399

public Chunk read();

400

public void demand(Runnable demandCallback);

401

public void fail(Throwable failure);

402

public long getLength();

403

}

404

```

405

406

#### PathContentSource

407

408

Content source that reads from a file Path.

409

410

```java { .api }

411

/**

412

* Content source backed by file Path

413

*/

414

class PathContentSource implements Content.Source {

415

public PathContentSource(Path path);

416

public PathContentSource(Path path, ByteBufferPool pool);

417

418

public Chunk read();

419

public void demand(Runnable demandCallback);

420

public long getLength();

421

public boolean rewind();

422

}

423

```

424

425

**Implementation Usage Examples:**

426

427

```java

428

// AsyncContent for producer-consumer pattern

429

AsyncContent buffer = new AsyncContent();

430

431

// Producer writes data

432

CompletableFuture.runAsync(() -> {

433

buffer.write(false, ByteBuffer.wrap("Hello ".getBytes()), Callback.NOOP);

434

buffer.write(false, ByteBuffer.wrap("World".getBytes()), Callback.NOOP);

435

buffer.write(true, ByteBuffer.allocate(0), Callback.NOOP); // EOF

436

});

437

438

// Consumer reads data

439

Content.Chunk chunk;

440

while ((chunk = buffer.read()) != null) {

441

// Process chunk

442

if (chunk.isLast()) break;

443

}

444

445

// ByteBuffer source

446

ByteBuffer[] buffers = {

447

ByteBuffer.wrap("Hello ".getBytes()),

448

ByteBuffer.wrap("World".getBytes())

449

};

450

ByteBufferContentSource source = new ByteBufferContentSource(buffers);

451

452

// File source

453

PathContentSource fileSource = new PathContentSource(Paths.get("data.txt"));

454

System.out.println("File size: " + fileSource.getLength());

455

456

// InputStream source with custom pool

457

InputStream input = new FileInputStream("data.txt");

458

InputStreamContentSource streamSource = new InputStreamContentSource(input, customPool);

459

```

460

461

### Reactive Streams Integration

462

463

#### ContentSourcePublisher

464

465

Adapts Content.Source to Flow.Publisher for reactive streams integration.

466

467

```java { .api }

468

/**

469

* Adapts Content.Source to reactive streams Publisher

470

*/

471

class ContentSourcePublisher implements Flow.Publisher<Content.Chunk> {

472

public ContentSourcePublisher(Content.Source source);

473

474

public void subscribe(Flow.Subscriber<? super Content.Chunk> subscriber);

475

}

476

```

477

478

#### ContentSinkSubscriber

479

480

Adapts Content.Sink to Flow.Subscriber for reactive streams integration.

481

482

```java { .api }

483

/**

484

* Adapts Content.Sink to reactive streams Subscriber

485

*/

486

class ContentSinkSubscriber implements Flow.Subscriber<Content.Chunk> {

487

public ContentSinkSubscriber(Content.Sink sink, Callback callback);

488

489

public void onSubscribe(Flow.Subscription subscription);

490

public void onNext(Content.Chunk chunk);

491

public void onError(Throwable throwable);

492

public void onComplete();

493

}

494

```

495

496

**Reactive Streams Examples:**

497

498

```java

499

// Publisher from source

500

Content.Source source = Content.Source.from(largeFile);

501

ContentSourcePublisher publisher = new ContentSourcePublisher(source);

502

503

// Subscribe with backpressure handling

504

publisher.subscribe(new Flow.Subscriber<Content.Chunk>() {

505

private Flow.Subscription subscription;

506

507

@Override

508

public void onSubscribe(Flow.Subscription subscription) {

509

this.subscription = subscription;

510

subscription.request(1); // Request first chunk

511

}

512

513

@Override

514

public void onNext(Content.Chunk chunk) {

515

// Process chunk

516

processChunk(chunk);

517

518

// Request next chunk

519

subscription.request(1);

520

}

521

522

@Override

523

public void onError(Throwable throwable) {

524

System.err.println("Stream error: " + throwable.getMessage());

525

}

526

527

@Override

528

public void onComplete() {

529

System.out.println("Stream completed");

530

}

531

});

532

533

// Subscriber to sink

534

Content.Sink sink = createOutputSink();

535

ContentSinkSubscriber subscriber = new ContentSinkSubscriber(sink, new Callback() {

536

@Override

537

public void succeeded() {

538

System.out.println("All data written to sink");

539

}

540

541

@Override

542

public void failed(Throwable x) {

543

System.err.println("Sink writing failed: " + x.getMessage());

544

}

545

});

546

547

// Connect publisher to subscriber

548

publisher.subscribe(subscriber);

549

```