0
# Content Streaming
1
2
Flexible content stream providers and reactive streams publishers for handling HTTP request and response bodies. Supports various input sources, asynchronous streaming patterns, and provides utilities for stream management and cancellation.
3
4
## Capabilities
5
6
### ContentStreamProvider
7
8
Functional interface for providing content streams for HTTP request bodies. Supports multiple creation patterns for different data sources.
9
10
```java { .api }
11
/**
12
* Functional interface for providing content streams for HTTP requests.
13
* Implementations should create new streams on each call to newStream().
14
*/
15
@FunctionalInterface
16
public interface ContentStreamProvider {
17
/**
18
* Create a new content stream. Must return a new stream on each invocation.
19
* @return New input stream containing the content data
20
*/
21
InputStream newStream();
22
23
/**
24
* @return Implementation-specific name for debugging/logging (default: "Unknown")
25
*/
26
default String name() {
27
return "Unknown";
28
}
29
30
// Static factory methods for common content sources
31
32
/**
33
* Create provider from byte array (array is copied for safety)
34
* @param bytes Source byte array
35
* @return ContentStreamProvider that creates streams from the byte array
36
*/
37
static ContentStreamProvider fromByteArray(byte[] bytes);
38
39
/**
40
* Create provider from byte array (array is NOT copied - use with caution)
41
* @param bytes Source byte array (must not be modified after this call)
42
* @return ContentStreamProvider that creates streams from the byte array
43
*/
44
static ContentStreamProvider fromByteArrayUnsafe(byte[] bytes);
45
46
/**
47
* Create provider from string with specified charset
48
* @param string Source string
49
* @param charset Character encoding to use
50
* @return ContentStreamProvider that creates streams from the string
51
*/
52
static ContentStreamProvider fromString(String string, Charset charset);
53
54
/**
55
* Create provider from UTF-8 encoded string
56
* @param string Source string
57
* @return ContentStreamProvider that creates UTF-8 encoded streams
58
*/
59
static ContentStreamProvider fromUtf8String(String string);
60
61
/**
62
* Create provider from input stream (stream will be read once and cached)
63
* @param inputStream Source stream (will be consumed during creation)
64
* @return ContentStreamProvider that creates streams from cached data
65
*/
66
static ContentStreamProvider fromInputStream(InputStream inputStream);
67
68
/**
69
* Create provider from input stream supplier (supplier called for each new stream)
70
* @param supplier Function that provides new input streams
71
* @return ContentStreamProvider that delegates to the supplier
72
*/
73
static ContentStreamProvider fromInputStreamSupplier(Supplier<InputStream> supplier);
74
}
75
```
76
77
**Usage Examples:**
78
79
```java
80
// From string content
81
ContentStreamProvider jsonProvider = ContentStreamProvider.fromUtf8String(
82
"{\"message\":\"Hello, World!\"}"
83
);
84
85
// From byte array
86
byte[] imageData = Files.readAllBytes(Paths.get("image.jpg"));
87
ContentStreamProvider imageProvider = ContentStreamProvider.fromByteArray(imageData);
88
89
// From file input stream
90
ContentStreamProvider fileProvider = ContentStreamProvider.fromInputStreamSupplier(() -> {
91
try {
92
return Files.newInputStream(Paths.get("large-file.dat"));
93
} catch (IOException e) {
94
throw new UncheckedIOException(e);
95
}
96
});
97
98
// Custom implementation
99
ContentStreamProvider customProvider = new ContentStreamProvider() {
100
@Override
101
public InputStream newStream() {
102
// Generate dynamic content
103
String timestamp = Instant.now().toString();
104
return new ByteArrayInputStream(timestamp.getBytes(StandardCharsets.UTF_8));
105
}
106
107
@Override
108
public String name() {
109
return "TimestampProvider";
110
}
111
};
112
113
// Using with HTTP request
114
SdkHttpFullRequest request = SdkHttpFullRequest.builder()
115
.method(SdkHttpMethod.POST)
116
.protocol("https")
117
.host("api.example.com")
118
.encodedPath("/upload")
119
.contentStreamProvider(fileProvider)
120
.build();
121
```
122
123
### SdkHttpContentPublisher
124
125
Publisher interface for HTTP content in reactive streams-based asynchronous operations. Extends the standard reactive streams Publisher interface.
126
127
```java { .api }
128
/**
129
* Publisher for HTTP content data in streaming operations.
130
* Implements reactive streams Publisher interface for ByteBuffer content.
131
*/
132
public interface SdkHttpContentPublisher extends Publisher<ByteBuffer> {
133
/**
134
* Get the content length of data being produced, if known
135
* @return Optional content length in bytes, empty if unknown
136
*/
137
Optional<Long> contentLength();
138
139
/**
140
* Subscribe to the content stream
141
* @param subscriber Subscriber that will receive ByteBuffer chunks
142
*/
143
@Override
144
void subscribe(Subscriber<? super ByteBuffer> subscriber);
145
}
146
```
147
148
**Usage Example:**
149
150
```java
151
// Custom content publisher implementation
152
public class FileContentPublisher implements SdkHttpContentPublisher {
153
private final Path filePath;
154
private final long contentLength;
155
156
public FileContentPublisher(Path filePath) throws IOException {
157
this.filePath = filePath;
158
this.contentLength = Files.size(filePath);
159
}
160
161
@Override
162
public Optional<Long> contentLength() {
163
return Optional.of(contentLength);
164
}
165
166
@Override
167
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
168
subscriber.onSubscribe(new FileSubscription(filePath, subscriber));
169
}
170
}
171
172
// Using with async HTTP request
173
AsyncExecuteRequest asyncRequest = AsyncExecuteRequest.builder()
174
.request(httpRequest)
175
.requestContentPublisher(new FileContentPublisher(Paths.get("upload.dat")))
176
.responseHandler(responseHandler)
177
.build();
178
```
179
180
### AbortableInputStream
181
182
Input stream that can be aborted, useful for response body streams that may need to be cancelled.
183
184
```java { .api }
185
/**
186
* Input stream that can be aborted. Used for response body streams
187
* that may need to be cancelled before completion.
188
*/
189
public class AbortableInputStream extends FilterInputStream implements Abortable {
190
/**
191
* Construct an abortable input stream wrapping another stream
192
* @param inputStream The underlying input stream
193
*/
194
public AbortableInputStream(InputStream inputStream);
195
196
/**
197
* Abort the input stream, causing subsequent reads to fail
198
*/
199
@Override
200
public void abort();
201
202
// Standard InputStream methods
203
@Override
204
public int read() throws IOException;
205
206
@Override
207
public int read(byte[] b, int off, int len) throws IOException;
208
209
@Override
210
public void close() throws IOException;
211
}
212
```
213
214
**Usage Example:**
215
216
```java
217
// Processing response body with abort capability
218
try (AbortableInputStream responseBody = httpResponse.responseBody().orElse(null)) {
219
if (responseBody != null) {
220
// Set up abort condition (e.g., timeout)
221
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
222
ScheduledFuture<?> abortTimer = scheduler.schedule(() -> {
223
responseBody.abort();
224
}, 30, TimeUnit.SECONDS);
225
226
try {
227
// Process the stream
228
byte[] buffer = new byte[8192];
229
int bytesRead;
230
while ((bytesRead = responseBody.read(buffer)) != -1) {
231
// Process data
232
processData(buffer, 0, bytesRead);
233
}
234
abortTimer.cancel(false);
235
} catch (IOException e) {
236
// Stream may have been aborted
237
if (!abortTimer.isDone()) {
238
// Not aborted, real I/O error
239
throw e;
240
}
241
}
242
}
243
}
244
```
245
246
### Abortable Interface
247
248
Interface for operations that can be aborted or cancelled.
249
250
```java { .api }
251
/**
252
* Interface for operations that can be aborted
253
*/
254
public interface Abortable {
255
/**
256
* Abort the operation, causing it to fail or terminate early
257
*/
258
void abort();
259
}
260
```
261
262
### AbortableInputStreamSubscriber
263
264
Subscriber that converts a reactive ByteBuffer stream into an AbortableInputStream for synchronous processing.
265
266
```java { .api }
267
/**
268
* Subscriber that converts ByteBuffer stream to AbortableInputStream.
269
* Bridges reactive streams (async) to InputStream (sync) patterns.
270
*/
271
public class AbortableInputStreamSubscriber implements Subscriber<ByteBuffer> {
272
/**
273
* Get the future input stream that will contain the subscribed data
274
* @return CompletableFuture that resolves to AbortableInputStream
275
*/
276
public CompletableFuture<AbortableInputStream> futureInputStream();
277
278
/**
279
* Called when subscription is established
280
* @param subscription Subscription for controlling data flow
281
*/
282
@Override
283
public void onSubscribe(Subscription subscription);
284
285
/**
286
* Called when new data is available
287
* @param byteBuffer Next chunk of data
288
*/
289
@Override
290
public void onNext(ByteBuffer byteBuffer);
291
292
/**
293
* Called when an error occurs
294
* @param error The error that occurred
295
*/
296
@Override
297
public void onError(Throwable error);
298
299
/**
300
* Called when stream is complete
301
*/
302
@Override
303
public void onComplete();
304
}
305
```
306
307
**Usage Example:**
308
309
```java
310
// Converting async stream to sync input stream
311
public void processAsyncResponse(Publisher<ByteBuffer> contentPublisher) {
312
AbortableInputStreamSubscriber subscriber = new AbortableInputStreamSubscriber();
313
contentPublisher.subscribe(subscriber);
314
315
try {
316
// Get the input stream (this may block until data is available)
317
AbortableInputStream inputStream = subscriber.futureInputStream().get(30, TimeUnit.SECONDS);
318
319
// Process as normal input stream
320
try (inputStream) {
321
byte[] buffer = new byte[8192];
322
int bytesRead;
323
while ((bytesRead = inputStream.read(buffer)) != -1) {
324
processData(buffer, 0, bytesRead);
325
}
326
}
327
} catch (TimeoutException e) {
328
// Abort if taking too long
329
subscriber.futureInputStream().cancel(true);
330
}
331
}
332
```
333
334
### SimpleSubscriber
335
336
Simplified subscriber interface that provides default implementations for error handling and completion.
337
338
```java { .api }
339
/**
340
* Simplified subscriber interface with sensible defaults
341
*/
342
public interface SimpleSubscriber<T> extends Subscriber<T> {
343
/**
344
* Process the next item (required implementation)
345
* @param t Next item from the stream
346
*/
347
void onNext(T t);
348
349
/**
350
* Handle errors (default: empty implementation)
351
* @param error Error that occurred
352
*/
353
default void onError(Throwable error) {
354
// Default: do nothing
355
}
356
357
/**
358
* Handle completion (default: empty implementation)
359
*/
360
default void onComplete() {
361
// Default: do nothing
362
}
363
364
/**
365
* Create a simple subscriber from a consumer function
366
* @param onNext Function to handle each item
367
* @return SimpleSubscriber that delegates to the function
368
*/
369
static <T> SimpleSubscriber<T> create(Consumer<T> onNext);
370
}
371
```
372
373
**Usage Example:**
374
375
```java
376
// Simple data processing
377
SimpleSubscriber<ByteBuffer> dataProcessor = SimpleSubscriber.create(buffer -> {
378
// Process each buffer
379
byte[] data = new byte[buffer.remaining()];
380
buffer.get(data);
381
processChunk(data);
382
});
383
384
// Subscribe to content stream
385
contentPublisher.subscribe(dataProcessor);
386
387
// With error handling
388
SimpleSubscriber<ByteBuffer> robustProcessor = new SimpleSubscriber<ByteBuffer>() {
389
@Override
390
public void onNext(ByteBuffer buffer) {
391
processBuffer(buffer);
392
}
393
394
@Override
395
public void onError(Throwable error) {
396
logger.error("Stream processing failed", error);
397
notifyFailure(error);
398
}
399
400
@Override
401
public void onComplete() {
402
logger.info("Stream processing completed successfully");
403
notifySuccess();
404
}
405
};
406
```
407
408
## Async Response Handling
409
410
### SdkAsyncHttpResponseHandler
411
412
Handler interface for processing asynchronous HTTP responses with reactive streams.
413
414
```java { .api }
415
/**
416
* Handler for asynchronous HTTP responses using reactive streams
417
*/
418
public interface SdkAsyncHttpResponseHandler {
419
/**
420
* Called when response headers are received
421
* @param headers HTTP response headers and status
422
*/
423
void onHeaders(SdkHttpResponse headers);
424
425
/**
426
* Called when response body stream is ready
427
* @param stream Publisher of response body data
428
*/
429
void onStream(Publisher<ByteBuffer> stream);
430
431
/**
432
* Called when an error occurs during request or response processing
433
* @param error The error that occurred
434
*/
435
void onError(Throwable error);
436
}
437
```
438
439
### SdkHttpResponseHandler
440
441
Alternative response handler interface with different method signatures.
442
443
```java { .api }
444
/**
445
* Alternative response handler interface
446
*/
447
public interface SdkHttpResponseHandler {
448
/**
449
* Called when response headers are received
450
* @param response HTTP response headers and status
451
*/
452
void headersReceived(SdkHttpResponse response);
453
454
/**
455
* Called when response body stream is ready
456
* @param publisher Publisher of response body data
457
*/
458
void onStream(SdkHttpContentPublisher publisher);
459
460
/**
461
* Called when an error occurs
462
* @param exception The exception that occurred
463
*/
464
void exceptionOccurred(Exception exception);
465
}
466
```
467
468
**Complete Async Example:**
469
470
```java
471
// Complete async HTTP request with streaming response
472
public class StreamingDownloader {
473
public CompletableFuture<Void> downloadFile(SdkHttpRequest request, Path outputPath) {
474
CompletableFuture<Void> result = new CompletableFuture<>();
475
476
AsyncExecuteRequest asyncRequest = AsyncExecuteRequest.builder()
477
.request(request)
478
.responseHandler(new SdkAsyncHttpResponseHandler() {
479
@Override
480
public void onHeaders(SdkHttpResponse headers) {
481
if (!headers.isSuccessful()) {
482
result.completeExceptionally(
483
new IOException("HTTP " + headers.statusCode())
484
);
485
return;
486
}
487
488
// Headers look good, ready for body
489
}
490
491
@Override
492
public void onStream(Publisher<ByteBuffer> stream) {
493
try {
494
FileChannel fileChannel = FileChannel.open(outputPath,
495
StandardOpenOption.CREATE,
496
StandardOpenOption.WRITE,
497
StandardOpenOption.TRUNCATE_EXISTING);
498
499
stream.subscribe(new SimpleSubscriber<ByteBuffer>() {
500
@Override
501
public void onNext(ByteBuffer buffer) {
502
try {
503
fileChannel.write(buffer);
504
} catch (IOException e) {
505
result.completeExceptionally(e);
506
}
507
}
508
509
@Override
510
public void onError(Throwable error) {
511
try { fileChannel.close(); } catch (IOException e) { /* ignore */ }
512
result.completeExceptionally(error);
513
}
514
515
@Override
516
public void onComplete() {
517
try {
518
fileChannel.close();
519
result.complete(null);
520
} catch (IOException e) {
521
result.completeExceptionally(e);
522
}
523
}
524
});
525
} catch (IOException e) {
526
result.completeExceptionally(e);
527
}
528
}
529
530
@Override
531
public void onError(Throwable error) {
532
result.completeExceptionally(error);
533
}
534
})
535
.build();
536
537
// Execute the async request
538
httpClient.execute(asyncRequest).whenComplete((unused, throwable) -> {
539
if (throwable != null && !result.isDone()) {
540
result.completeExceptionally(throwable);
541
}
542
});
543
544
return result;
545
}
546
}
547
```