or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

content-streaming.mdhttp-clients.mdhttp-messages.mdindex.mdmetrics.mdservice-discovery.mdtls-configuration.md

content-streaming.mddocs/

0

# Content Streaming

1

2

Flexible content stream providers and reactive streams publishers for handling HTTP request and response bodies. Supports various input sources, asynchronous streaming patterns, and provides utilities for stream management and cancellation.

3

4

## Capabilities

5

6

### ContentStreamProvider

7

8

Functional interface for providing content streams for HTTP request bodies. Supports multiple creation patterns for different data sources.

9

10

```java { .api }

11

/**

12

* Functional interface for providing content streams for HTTP requests.

13

* Implementations should create new streams on each call to newStream().

14

*/

15

@FunctionalInterface

16

public interface ContentStreamProvider {

17

/**

18

* Create a new content stream. Must return a new stream on each invocation.

19

* @return New input stream containing the content data

20

*/

21

InputStream newStream();

22

23

/**

24

* @return Implementation-specific name for debugging/logging (default: "Unknown")

25

*/

26

default String name() {

27

return "Unknown";

28

}

29

30

// Static factory methods for common content sources

31

32

/**

33

* Create provider from byte array (array is copied for safety)

34

* @param bytes Source byte array

35

* @return ContentStreamProvider that creates streams from the byte array

36

*/

37

static ContentStreamProvider fromByteArray(byte[] bytes);

38

39

/**

40

* Create provider from byte array (array is NOT copied - use with caution)

41

* @param bytes Source byte array (must not be modified after this call)

42

* @return ContentStreamProvider that creates streams from the byte array

43

*/

44

static ContentStreamProvider fromByteArrayUnsafe(byte[] bytes);

45

46

/**

47

* Create provider from string with specified charset

48

* @param string Source string

49

* @param charset Character encoding to use

50

* @return ContentStreamProvider that creates streams from the string

51

*/

52

static ContentStreamProvider fromString(String string, Charset charset);

53

54

/**

55

* Create provider from UTF-8 encoded string

56

* @param string Source string

57

* @return ContentStreamProvider that creates UTF-8 encoded streams

58

*/

59

static ContentStreamProvider fromUtf8String(String string);

60

61

/**

62

* Create provider from input stream (stream will be read once and cached)

63

* @param inputStream Source stream (will be consumed during creation)

64

* @return ContentStreamProvider that creates streams from cached data

65

*/

66

static ContentStreamProvider fromInputStream(InputStream inputStream);

67

68

/**

69

* Create provider from input stream supplier (supplier called for each new stream)

70

* @param supplier Function that provides new input streams

71

* @return ContentStreamProvider that delegates to the supplier

72

*/

73

static ContentStreamProvider fromInputStreamSupplier(Supplier<InputStream> supplier);

74

}

75

```

76

77

**Usage Examples:**

78

79

```java

80

// From string content

81

ContentStreamProvider jsonProvider = ContentStreamProvider.fromUtf8String(

82

"{\"message\":\"Hello, World!\"}"

83

);

84

85

// From byte array

86

byte[] imageData = Files.readAllBytes(Paths.get("image.jpg"));

87

ContentStreamProvider imageProvider = ContentStreamProvider.fromByteArray(imageData);

88

89

// From file input stream

90

ContentStreamProvider fileProvider = ContentStreamProvider.fromInputStreamSupplier(() -> {

91

try {

92

return Files.newInputStream(Paths.get("large-file.dat"));

93

} catch (IOException e) {

94

throw new UncheckedIOException(e);

95

}

96

});

97

98

// Custom implementation

99

ContentStreamProvider customProvider = new ContentStreamProvider() {

100

@Override

101

public InputStream newStream() {

102

// Generate dynamic content

103

String timestamp = Instant.now().toString();

104

return new ByteArrayInputStream(timestamp.getBytes(StandardCharsets.UTF_8));

105

}

106

107

@Override

108

public String name() {

109

return "TimestampProvider";

110

}

111

};

112

113

// Using with HTTP request

114

SdkHttpFullRequest request = SdkHttpFullRequest.builder()

115

.method(SdkHttpMethod.POST)

116

.protocol("https")

117

.host("api.example.com")

118

.encodedPath("/upload")

119

.contentStreamProvider(fileProvider)

120

.build();

121

```

122

123

### SdkHttpContentPublisher

124

125

Publisher interface for HTTP content in reactive streams-based asynchronous operations. Extends the standard reactive streams Publisher interface.

126

127

```java { .api }

128

/**

129

* Publisher for HTTP content data in streaming operations.

130

* Implements reactive streams Publisher interface for ByteBuffer content.

131

*/

132

public interface SdkHttpContentPublisher extends Publisher<ByteBuffer> {

133

/**

134

* Get the content length of data being produced, if known

135

* @return Optional content length in bytes, empty if unknown

136

*/

137

Optional<Long> contentLength();

138

139

/**

140

* Subscribe to the content stream

141

* @param subscriber Subscriber that will receive ByteBuffer chunks

142

*/

143

@Override

144

void subscribe(Subscriber<? super ByteBuffer> subscriber);

145

}

146

```

147

148

**Usage Example:**

149

150

```java

151

// Custom content publisher implementation

152

public class FileContentPublisher implements SdkHttpContentPublisher {

153

private final Path filePath;

154

private final long contentLength;

155

156

public FileContentPublisher(Path filePath) throws IOException {

157

this.filePath = filePath;

158

this.contentLength = Files.size(filePath);

159

}

160

161

@Override

162

public Optional<Long> contentLength() {

163

return Optional.of(contentLength);

164

}

165

166

@Override

167

public void subscribe(Subscriber<? super ByteBuffer> subscriber) {

168

subscriber.onSubscribe(new FileSubscription(filePath, subscriber));

169

}

170

}

171

172

// Using with async HTTP request

173

AsyncExecuteRequest asyncRequest = AsyncExecuteRequest.builder()

174

.request(httpRequest)

175

.requestContentPublisher(new FileContentPublisher(Paths.get("upload.dat")))

176

.responseHandler(responseHandler)

177

.build();

178

```

179

180

### AbortableInputStream

181

182

Input stream that can be aborted, useful for response body streams that may need to be cancelled.

183

184

```java { .api }

185

/**

186

* Input stream that can be aborted. Used for response body streams

187

* that may need to be cancelled before completion.

188

*/

189

public class AbortableInputStream extends FilterInputStream implements Abortable {

190

/**

191

* Construct an abortable input stream wrapping another stream

192

* @param inputStream The underlying input stream

193

*/

194

public AbortableInputStream(InputStream inputStream);

195

196

/**

197

* Abort the input stream, causing subsequent reads to fail

198

*/

199

@Override

200

public void abort();

201

202

// Standard InputStream methods

203

@Override

204

public int read() throws IOException;

205

206

@Override

207

public int read(byte[] b, int off, int len) throws IOException;

208

209

@Override

210

public void close() throws IOException;

211

}

212

```

213

214

**Usage Example:**

215

216

```java

217

// Processing response body with abort capability

218

try (AbortableInputStream responseBody = httpResponse.responseBody().orElse(null)) {

219

if (responseBody != null) {

220

// Set up abort condition (e.g., timeout)

221

ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

222

ScheduledFuture<?> abortTimer = scheduler.schedule(() -> {

223

responseBody.abort();

224

}, 30, TimeUnit.SECONDS);

225

226

try {

227

// Process the stream

228

byte[] buffer = new byte[8192];

229

int bytesRead;

230

while ((bytesRead = responseBody.read(buffer)) != -1) {

231

// Process data

232

processData(buffer, 0, bytesRead);

233

}

234

abortTimer.cancel(false);

235

} catch (IOException e) {

236

// Stream may have been aborted

237

if (!abortTimer.isDone()) {

238

// Not aborted, real I/O error

239

throw e;

240

}

241

}

242

}

243

}

244

```

245

246

### Abortable Interface

247

248

Interface for operations that can be aborted or cancelled.

249

250

```java { .api }

251

/**

252

* Interface for operations that can be aborted

253

*/

254

public interface Abortable {

255

/**

256

* Abort the operation, causing it to fail or terminate early

257

*/

258

void abort();

259

}

260

```

261

262

### AbortableInputStreamSubscriber

263

264

Subscriber that converts a reactive ByteBuffer stream into an AbortableInputStream for synchronous processing.

265

266

```java { .api }

267

/**

268

* Subscriber that converts ByteBuffer stream to AbortableInputStream.

269

* Bridges reactive streams (async) to InputStream (sync) patterns.

270

*/

271

public class AbortableInputStreamSubscriber implements Subscriber<ByteBuffer> {

272

/**

273

* Get the future input stream that will contain the subscribed data

274

* @return CompletableFuture that resolves to AbortableInputStream

275

*/

276

public CompletableFuture<AbortableInputStream> futureInputStream();

277

278

/**

279

* Called when subscription is established

280

* @param subscription Subscription for controlling data flow

281

*/

282

@Override

283

public void onSubscribe(Subscription subscription);

284

285

/**

286

* Called when new data is available

287

* @param byteBuffer Next chunk of data

288

*/

289

@Override

290

public void onNext(ByteBuffer byteBuffer);

291

292

/**

293

* Called when an error occurs

294

* @param error The error that occurred

295

*/

296

@Override

297

public void onError(Throwable error);

298

299

/**

300

* Called when stream is complete

301

*/

302

@Override

303

public void onComplete();

304

}

305

```

306

307

**Usage Example:**

308

309

```java

310

// Converting async stream to sync input stream

311

public void processAsyncResponse(Publisher<ByteBuffer> contentPublisher) {

312

AbortableInputStreamSubscriber subscriber = new AbortableInputStreamSubscriber();

313

contentPublisher.subscribe(subscriber);

314

315

try {

316

// Get the input stream (this may block until data is available)

317

AbortableInputStream inputStream = subscriber.futureInputStream().get(30, TimeUnit.SECONDS);

318

319

// Process as normal input stream

320

try (inputStream) {

321

byte[] buffer = new byte[8192];

322

int bytesRead;

323

while ((bytesRead = inputStream.read(buffer)) != -1) {

324

processData(buffer, 0, bytesRead);

325

}

326

}

327

} catch (TimeoutException e) {

328

// Abort if taking too long

329

subscriber.futureInputStream().cancel(true);

330

}

331

}

332

```

333

334

### SimpleSubscriber

335

336

Simplified subscriber interface that provides default implementations for error handling and completion.

337

338

```java { .api }

339

/**

340

* Simplified subscriber interface with sensible defaults

341

*/

342

public interface SimpleSubscriber<T> extends Subscriber<T> {

343

/**

344

* Process the next item (required implementation)

345

* @param t Next item from the stream

346

*/

347

void onNext(T t);

348

349

/**

350

* Handle errors (default: empty implementation)

351

* @param error Error that occurred

352

*/

353

default void onError(Throwable error) {

354

// Default: do nothing

355

}

356

357

/**

358

* Handle completion (default: empty implementation)

359

*/

360

default void onComplete() {

361

// Default: do nothing

362

}

363

364

/**

365

* Create a simple subscriber from a consumer function

366

* @param onNext Function to handle each item

367

* @return SimpleSubscriber that delegates to the function

368

*/

369

static <T> SimpleSubscriber<T> create(Consumer<T> onNext);

370

}

371

```

372

373

**Usage Example:**

374

375

```java

376

// Simple data processing

377

SimpleSubscriber<ByteBuffer> dataProcessor = SimpleSubscriber.create(buffer -> {

378

// Process each buffer

379

byte[] data = new byte[buffer.remaining()];

380

buffer.get(data);

381

processChunk(data);

382

});

383

384

// Subscribe to content stream

385

contentPublisher.subscribe(dataProcessor);

386

387

// With error handling

388

SimpleSubscriber<ByteBuffer> robustProcessor = new SimpleSubscriber<ByteBuffer>() {

389

@Override

390

public void onNext(ByteBuffer buffer) {

391

processBuffer(buffer);

392

}

393

394

@Override

395

public void onError(Throwable error) {

396

logger.error("Stream processing failed", error);

397

notifyFailure(error);

398

}

399

400

@Override

401

public void onComplete() {

402

logger.info("Stream processing completed successfully");

403

notifySuccess();

404

}

405

};

406

```

407

408

## Async Response Handling

409

410

### SdkAsyncHttpResponseHandler

411

412

Handler interface for processing asynchronous HTTP responses with reactive streams.

413

414

```java { .api }

415

/**

416

* Handler for asynchronous HTTP responses using reactive streams

417

*/

418

public interface SdkAsyncHttpResponseHandler {

419

/**

420

* Called when response headers are received

421

* @param headers HTTP response headers and status

422

*/

423

void onHeaders(SdkHttpResponse headers);

424

425

/**

426

* Called when response body stream is ready

427

* @param stream Publisher of response body data

428

*/

429

void onStream(Publisher<ByteBuffer> stream);

430

431

/**

432

* Called when an error occurs during request or response processing

433

* @param error The error that occurred

434

*/

435

void onError(Throwable error);

436

}

437

```

438

439

### SdkHttpResponseHandler

440

441

Alternative response handler interface with different method signatures.

442

443

```java { .api }

444

/**

445

* Alternative response handler interface

446

*/

447

public interface SdkHttpResponseHandler {

448

/**

449

* Called when response headers are received

450

* @param response HTTP response headers and status

451

*/

452

void headersReceived(SdkHttpResponse response);

453

454

/**

455

* Called when response body stream is ready

456

* @param publisher Publisher of response body data

457

*/

458

void onStream(SdkHttpContentPublisher publisher);

459

460

/**

461

* Called when an error occurs

462

* @param exception The exception that occurred

463

*/

464

void exceptionOccurred(Exception exception);

465

}

466

```

467

468

**Complete Async Example:**

469

470

```java

471

// Complete async HTTP request with streaming response

472

public class StreamingDownloader {

473

public CompletableFuture<Void> downloadFile(SdkHttpRequest request, Path outputPath) {

474

CompletableFuture<Void> result = new CompletableFuture<>();

475

476

AsyncExecuteRequest asyncRequest = AsyncExecuteRequest.builder()

477

.request(request)

478

.responseHandler(new SdkAsyncHttpResponseHandler() {

479

@Override

480

public void onHeaders(SdkHttpResponse headers) {

481

if (!headers.isSuccessful()) {

482

result.completeExceptionally(

483

new IOException("HTTP " + headers.statusCode())

484

);

485

return;

486

}

487

488

// Headers look good, ready for body

489

}

490

491

@Override

492

public void onStream(Publisher<ByteBuffer> stream) {

493

try {

494

FileChannel fileChannel = FileChannel.open(outputPath,

495

StandardOpenOption.CREATE,

496

StandardOpenOption.WRITE,

497

StandardOpenOption.TRUNCATE_EXISTING);

498

499

stream.subscribe(new SimpleSubscriber<ByteBuffer>() {

500

@Override

501

public void onNext(ByteBuffer buffer) {

502

try {

503

fileChannel.write(buffer);

504

} catch (IOException e) {

505

result.completeExceptionally(e);

506

}

507

}

508

509

@Override

510

public void onError(Throwable error) {

511

try { fileChannel.close(); } catch (IOException e) { /* ignore */ }

512

result.completeExceptionally(error);

513

}

514

515

@Override

516

public void onComplete() {

517

try {

518

fileChannel.close();

519

result.complete(null);

520

} catch (IOException e) {

521

result.completeExceptionally(e);

522

}

523

}

524

});

525

} catch (IOException e) {

526

result.completeExceptionally(e);

527

}

528

}

529

530

@Override

531

public void onError(Throwable error) {

532

result.completeExceptionally(error);

533

}

534

})

535

.build();

536

537

// Execute the async request

538

httpClient.execute(asyncRequest).whenComplete((unused, throwable) -> {

539

if (throwable != null && !result.isDone()) {

540

result.completeExceptionally(throwable);

541

}

542

});

543

544

return result;

545

}

546

}

547

```