CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-software-amazon-awssdk--http-client-spi

Service Provider Interface (SPI) for HTTP client implementations in the AWS SDK for Java v2

Overview
Eval results
Files

content-streaming.mddocs/

Content Streaming

Flexible content stream providers and reactive streams publishers for handling HTTP request and response bodies. Supports various input sources, asynchronous streaming patterns, and provides utilities for stream management and cancellation.

Capabilities

ContentStreamProvider

Functional interface for providing content streams for HTTP request bodies. Supports multiple creation patterns for different data sources.

/**
 * Functional interface for providing content streams for HTTP requests.
 * Implementations should create new streams on each call to newStream().
 */
@FunctionalInterface
public interface ContentStreamProvider {
    /**
     * Create a new content stream. Must return a new stream on each invocation.
     * @return New input stream containing the content data
     */
    InputStream newStream();
    
    /**
     * @return Implementation-specific name for debugging/logging (default: "Unknown")
     */
    default String name() {
        return "Unknown";
    }
    
    // Static factory methods for common content sources
    
    /**
     * Create provider from byte array (array is copied for safety)
     * @param bytes Source byte array
     * @return ContentStreamProvider that creates streams from the byte array
     */
    static ContentStreamProvider fromByteArray(byte[] bytes);
    
    /**
     * Create provider from byte array (array is NOT copied - use with caution)
     * @param bytes Source byte array (must not be modified after this call)
     * @return ContentStreamProvider that creates streams from the byte array
     */
    static ContentStreamProvider fromByteArrayUnsafe(byte[] bytes);
    
    /**
     * Create provider from string with specified charset
     * @param string Source string
     * @param charset Character encoding to use
     * @return ContentStreamProvider that creates streams from the string
     */
    static ContentStreamProvider fromString(String string, Charset charset);
    
    /**
     * Create provider from UTF-8 encoded string
     * @param string Source string
     * @return ContentStreamProvider that creates UTF-8 encoded streams
     */
    static ContentStreamProvider fromUtf8String(String string);
    
    /**
     * Create provider from input stream (stream will be read once and cached)
     * @param inputStream Source stream (will be consumed during creation)
     * @return ContentStreamProvider that creates streams from cached data
     */
    static ContentStreamProvider fromInputStream(InputStream inputStream);
    
    /**
     * Create provider from input stream supplier (supplier called for each new stream)
     * @param supplier Function that provides new input streams
     * @return ContentStreamProvider that delegates to the supplier
     */
    static ContentStreamProvider fromInputStreamSupplier(Supplier<InputStream> supplier);
}

Usage Examples:

// From string content
ContentStreamProvider jsonProvider = ContentStreamProvider.fromUtf8String(
    "{\"message\":\"Hello, World!\"}"
);

// From byte array
byte[] imageData = Files.readAllBytes(Paths.get("image.jpg"));
ContentStreamProvider imageProvider = ContentStreamProvider.fromByteArray(imageData);

// From file input stream
ContentStreamProvider fileProvider = ContentStreamProvider.fromInputStreamSupplier(() -> {
    try {
        return Files.newInputStream(Paths.get("large-file.dat"));
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
});

// Custom implementation
ContentStreamProvider customProvider = new ContentStreamProvider() {
    @Override
    public InputStream newStream() {
        // Generate dynamic content
        String timestamp = Instant.now().toString();
        return new ByteArrayInputStream(timestamp.getBytes(StandardCharsets.UTF_8));
    }
    
    @Override
    public String name() {
        return "TimestampProvider";
    }
};

// Using with HTTP request
SdkHttpFullRequest request = SdkHttpFullRequest.builder()
    .method(SdkHttpMethod.POST)
    .protocol("https")
    .host("api.example.com")
    .encodedPath("/upload")
    .contentStreamProvider(fileProvider)
    .build();

SdkHttpContentPublisher

Publisher interface for HTTP content in reactive streams-based asynchronous operations. Extends the standard reactive streams Publisher interface.

/**
 * Publisher for HTTP content data in streaming operations.
 * Implements reactive streams Publisher interface for ByteBuffer content.
 */
public interface SdkHttpContentPublisher extends Publisher<ByteBuffer> {
    /**
     * Get the content length of data being produced, if known
     * @return Optional content length in bytes, empty if unknown
     */
    Optional<Long> contentLength();
    
    /**
     * Subscribe to the content stream
     * @param subscriber Subscriber that will receive ByteBuffer chunks
     */
    @Override
    void subscribe(Subscriber<? super ByteBuffer> subscriber);
}

Usage Example:

// Custom content publisher implementation
public class FileContentPublisher implements SdkHttpContentPublisher {
    private final Path filePath;
    private final long contentLength;
    
    public FileContentPublisher(Path filePath) throws IOException {
        this.filePath = filePath;
        this.contentLength = Files.size(filePath);
    }
    
    @Override
    public Optional<Long> contentLength() {
        return Optional.of(contentLength);
    }
    
    @Override
    public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
        subscriber.onSubscribe(new FileSubscription(filePath, subscriber));
    }
}

// Using with async HTTP request
AsyncExecuteRequest asyncRequest = AsyncExecuteRequest.builder()
    .request(httpRequest)
    .requestContentPublisher(new FileContentPublisher(Paths.get("upload.dat")))
    .responseHandler(responseHandler)
    .build();

AbortableInputStream

Input stream that can be aborted, useful for response body streams that may need to be cancelled.

/**
 * Input stream that can be aborted. Used for response body streams 
 * that may need to be cancelled before completion.
 */
public class AbortableInputStream extends FilterInputStream implements Abortable {
    /**
     * Construct an abortable input stream wrapping another stream
     * @param inputStream The underlying input stream
     */
    public AbortableInputStream(InputStream inputStream);
    
    /**
     * Abort the input stream, causing subsequent reads to fail
     */
    @Override
    public void abort();
    
    // Standard InputStream methods
    @Override
    public int read() throws IOException;
    
    @Override
    public int read(byte[] b, int off, int len) throws IOException;
    
    @Override
    public void close() throws IOException;
}

Usage Example:

// Processing response body with abort capability
try (AbortableInputStream responseBody = httpResponse.responseBody().orElse(null)) {
    if (responseBody != null) {
        // Set up abort condition (e.g., timeout)
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> abortTimer = scheduler.schedule(() -> {
            responseBody.abort();
        }, 30, TimeUnit.SECONDS);
        
        try {
            // Process the stream
            byte[] buffer = new byte[8192];
            int bytesRead;
            while ((bytesRead = responseBody.read(buffer)) != -1) {
                // Process data
                processData(buffer, 0, bytesRead);
            }
            abortTimer.cancel(false);
        } catch (IOException e) {
            // Stream may have been aborted
            if (!abortTimer.isDone()) {
                // Not aborted, real I/O error
                throw e;
            }
        }
    }
}

Abortable Interface

Interface for operations that can be aborted or cancelled.

/**
 * Interface for operations that can be aborted
 */
public interface Abortable {
    /**
     * Abort the operation, causing it to fail or terminate early
     */
    void abort();
}

AbortableInputStreamSubscriber

Subscriber that converts a reactive ByteBuffer stream into an AbortableInputStream for synchronous processing.

/**
 * Subscriber that converts ByteBuffer stream to AbortableInputStream.
 * Bridges reactive streams (async) to InputStream (sync) patterns.
 */
public class AbortableInputStreamSubscriber implements Subscriber<ByteBuffer> {
    /**
     * Get the future input stream that will contain the subscribed data
     * @return CompletableFuture that resolves to AbortableInputStream
     */
    public CompletableFuture<AbortableInputStream> futureInputStream();
    
    /**
     * Called when subscription is established
     * @param subscription Subscription for controlling data flow
     */
    @Override
    public void onSubscribe(Subscription subscription);
    
    /**
     * Called when new data is available
     * @param byteBuffer Next chunk of data
     */
    @Override
    public void onNext(ByteBuffer byteBuffer);
    
    /**
     * Called when an error occurs
     * @param error The error that occurred
     */
    @Override
    public void onError(Throwable error);
    
    /**
     * Called when stream is complete
     */
    @Override
    public void onComplete();
}

Usage Example:

// Converting async stream to sync input stream
public void processAsyncResponse(Publisher<ByteBuffer> contentPublisher) {
    AbortableInputStreamSubscriber subscriber = new AbortableInputStreamSubscriber();
    contentPublisher.subscribe(subscriber);
    
    try {
        // Get the input stream (this may block until data is available)
        AbortableInputStream inputStream = subscriber.futureInputStream().get(30, TimeUnit.SECONDS);
        
        // Process as normal input stream
        try (inputStream) {
            byte[] buffer = new byte[8192];
            int bytesRead;
            while ((bytesRead = inputStream.read(buffer)) != -1) {
                processData(buffer, 0, bytesRead);
            }
        }
    } catch (TimeoutException e) {
        // Abort if taking too long
        subscriber.futureInputStream().cancel(true);
    }
}

SimpleSubscriber

Simplified subscriber interface that provides default implementations for error handling and completion.

/**
 * Simplified subscriber interface with sensible defaults
 */
public interface SimpleSubscriber<T> extends Subscriber<T> {
    /**
     * Process the next item (required implementation)
     * @param t Next item from the stream
     */
    void onNext(T t);
    
    /**
     * Handle errors (default: empty implementation)
     * @param error Error that occurred
     */
    default void onError(Throwable error) {
        // Default: do nothing
    }
    
    /**
     * Handle completion (default: empty implementation)
     */
    default void onComplete() {
        // Default: do nothing  
    }
    
    /**
     * Create a simple subscriber from a consumer function
     * @param onNext Function to handle each item
     * @return SimpleSubscriber that delegates to the function
     */
    static <T> SimpleSubscriber<T> create(Consumer<T> onNext);
}

Usage Example:

// Simple data processing
SimpleSubscriber<ByteBuffer> dataProcessor = SimpleSubscriber.create(buffer -> {
    // Process each buffer
    byte[] data = new byte[buffer.remaining()];
    buffer.get(data);
    processChunk(data);
});

// Subscribe to content stream
contentPublisher.subscribe(dataProcessor);

// With error handling
SimpleSubscriber<ByteBuffer> robustProcessor = new SimpleSubscriber<ByteBuffer>() {
    @Override
    public void onNext(ByteBuffer buffer) {
        processBuffer(buffer);
    }
    
    @Override
    public void onError(Throwable error) {
        logger.error("Stream processing failed", error);
        notifyFailure(error);
    }
    
    @Override
    public void onComplete() {
        logger.info("Stream processing completed successfully");
        notifySuccess();
    }
};

Async Response Handling

SdkAsyncHttpResponseHandler

Handler interface for processing asynchronous HTTP responses with reactive streams.

/**
 * Handler for asynchronous HTTP responses using reactive streams
 */
public interface SdkAsyncHttpResponseHandler {
    /**
     * Called when response headers are received
     * @param headers HTTP response headers and status
     */
    void onHeaders(SdkHttpResponse headers);
    
    /**
     * Called when response body stream is ready
     * @param stream Publisher of response body data
     */
    void onStream(Publisher<ByteBuffer> stream);
    
    /**
     * Called when an error occurs during request or response processing
     * @param error The error that occurred
     */
    void onError(Throwable error);
}

SdkHttpResponseHandler

Alternative response handler interface with different method signatures.

/**
 * Alternative response handler interface
 */
public interface SdkHttpResponseHandler {
    /**
     * Called when response headers are received
     * @param response HTTP response headers and status
     */
    void headersReceived(SdkHttpResponse response);
    
    /**
     * Called when response body stream is ready
     * @param publisher Publisher of response body data
     */
    void onStream(SdkHttpContentPublisher publisher);
    
    /**
     * Called when an error occurs
     * @param exception The exception that occurred
     */
    void exceptionOccurred(Exception exception);
}

Complete Async Example:

// Complete async HTTP request with streaming response
public class StreamingDownloader {
    public CompletableFuture<Void> downloadFile(SdkHttpRequest request, Path outputPath) {
        CompletableFuture<Void> result = new CompletableFuture<>();
        
        AsyncExecuteRequest asyncRequest = AsyncExecuteRequest.builder()
            .request(request)
            .responseHandler(new SdkAsyncHttpResponseHandler() {
                @Override
                public void onHeaders(SdkHttpResponse headers) {
                    if (!headers.isSuccessful()) {
                        result.completeExceptionally(
                            new IOException("HTTP " + headers.statusCode())
                        );
                        return;
                    }
                    
                    // Headers look good, ready for body
                }
                
                @Override
                public void onStream(Publisher<ByteBuffer> stream) {
                    try {
                        FileChannel fileChannel = FileChannel.open(outputPath, 
                            StandardOpenOption.CREATE, 
                            StandardOpenOption.WRITE, 
                            StandardOpenOption.TRUNCATE_EXISTING);
                        
                        stream.subscribe(new SimpleSubscriber<ByteBuffer>() {
                            @Override
                            public void onNext(ByteBuffer buffer) {
                                try {
                                    fileChannel.write(buffer);
                                } catch (IOException e) {
                                    result.completeExceptionally(e);
                                }
                            }
                            
                            @Override
                            public void onError(Throwable error) {
                                try { fileChannel.close(); } catch (IOException e) { /* ignore */ }
                                result.completeExceptionally(error);
                            }
                            
                            @Override
                            public void onComplete() {
                                try { 
                                    fileChannel.close();
                                    result.complete(null);
                                } catch (IOException e) {
                                    result.completeExceptionally(e);
                                }
                            }
                        });
                    } catch (IOException e) {
                        result.completeExceptionally(e);
                    }
                }
                
                @Override
                public void onError(Throwable error) {
                    result.completeExceptionally(error);
                }
            })
            .build();
        
        // Execute the async request
        httpClient.execute(asyncRequest).whenComplete((unused, throwable) -> {
            if (throwable != null && !result.isDone()) {
                result.completeExceptionally(throwable);
            }
        });
        
        return result;
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-software-amazon-awssdk--http-client-spi

docs

content-streaming.md

http-clients.md

http-messages.md

index.md

metrics.md

service-discovery.md

tls-configuration.md

tile.json