CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-amazonaws--aws-java-sdk-sqs

The AWS Java SDK for Amazon SQS module provides client classes for communicating with Amazon Simple Queue Service

Pending
Overview
Eval results
Files

buffered-client.mddocs/

Buffered Client

The Amazon SQS Buffered Async Client provides automatic batching and prefetching capabilities to significantly improve throughput and reduce costs. It optimizes send, delete, and change visibility operations through client-side batching, while implementing intelligent message prefetching for receive operations.

Buffered Client Overview

AmazonSQSBufferedAsyncClient

High-performance client implementation with automatic optimization features.

class AmazonSQSBufferedAsyncClient implements AmazonSQSAsync {
    // Inherits all AmazonSQSAsync methods with optimized implementations
    
    // Standard constructors
    AmazonSQSBufferedAsyncClient();
    AmazonSQSBufferedAsyncClient(AmazonSQSAsync realSQS);
    AmazonSQSBufferedAsyncClient(AmazonSQSAsync realSQS, QueueBufferConfig config);
    
    // Configuration-based constructors
    AmazonSQSBufferedAsyncClient(AWSCredentialsProvider credentialsProvider);
    AmazonSQSBufferedAsyncClient(AWSCredentialsProvider credentialsProvider, 
        ClientConfiguration clientConfiguration, ExecutorService executorService, 
        QueueBufferConfig queueBufferConfig);
}

Configuration

QueueBufferConfig

Configuration class for customizing buffered client behavior.

class QueueBufferConfig {
    // Default configuration
    QueueBufferConfig();
    
    // Batch configuration
    int getMaxBatchSize();
    QueueBufferConfig withMaxBatchSize(int maxBatchSize);
    
    // Concurrency configuration  
    int getMaxInflightOutboundBatches();
    QueueBufferConfig withMaxInflightOutboundBatches(int maxInflightOutboundBatches);
    
    int getMaxInflightReceiveBatches();
    QueueBufferConfig withMaxInflightReceiveBatches(int maxInflightReceiveBatches);
    
    // Timing configuration
    long getMaxBatchOpenMs();
    QueueBufferConfig withMaxBatchOpenMs(long maxBatchOpenMs);
    
    int getVisibilityTimeoutSeconds();
    QueueBufferConfig withVisibilityTimeoutSeconds(int visibilityTimeoutSeconds);
    
    // Polling configuration
    boolean isLongPoll();
    QueueBufferConfig withLongPoll(boolean longPoll);
    
    int getMaxDoneReceiveBatches();
    QueueBufferConfig withMaxDoneReceiveBatches(int maxDoneReceiveBatches);
}

Usage Example:

// Create buffered client with default configuration
AmazonSQSBufferedAsyncClient bufferedClient = new AmazonSQSBufferedAsyncClient();

// Create buffered client with custom configuration
QueueBufferConfig config = new QueueBufferConfig()
    .withMaxBatchSize(25)                    // Larger batches
    .withMaxInflightOutboundBatches(10)     // More concurrent batches
    .withMaxBatchOpenMs(5000)                // 5 second batch timeout
    .withVisibilityTimeoutSeconds(120)       // 2 minute visibility
    .withLongPoll(true)                      // Enable long polling
    .withMaxDoneReceiveBatches(50);          // Larger receive buffer

AmazonSQSBufferedAsyncClient customBufferedClient = 
    new AmazonSQSBufferedAsyncClient(
        AmazonSQSAsyncClientBuilder.defaultClient(),
        config
    );

Batching Behavior

Send Message Batching

Automatic batching of individual send operations into batch requests.

Usage Example:

AmazonSQSBufferedAsyncClient bufferedClient = new AmazonSQSBufferedAsyncClient();

// These individual sends will be automatically batched
List<Future<SendMessageResult>> futures = new ArrayList<>();

for (int i = 0; i < 50; i++) {
    Future<SendMessageResult> future = bufferedClient.sendMessageAsync(
        new SendMessageRequest()
            .withQueueUrl(queueUrl)
            .withMessageBody("Message " + i)
    );
    futures.add(future);
}

// Wait for all sends to complete (they were sent in batches behind the scenes)
for (Future<SendMessageResult> future : futures) {
    try {
        SendMessageResult result = future.get();
        System.out.println("Sent message: " + result.getMessageId());
    } catch (ExecutionException e) {
        System.err.println("Send failed: " + e.getCause().getMessage());
    }
}

Delete Message Batching

Automatic batching of delete operations for improved efficiency.

// Receive messages
ReceiveMessageResult receiveResult = bufferedClient.receiveMessage(
    new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10)
);

// Delete messages individually - will be batched automatically
List<Future<DeleteMessageResult>> deleteFutures = new ArrayList<>();

for (Message message : receiveResult.getMessages()) {
    // Process message
    processMessage(message);
    
    // Delete (will be batched with other deletes)
    Future<DeleteMessageResult> deleteFuture = bufferedClient.deleteMessageAsync(
        new DeleteMessageRequest()
            .withQueueUrl(queueUrl)
            .withReceiptHandle(message.getReceiptHandle())
    );
    deleteFutures.add(deleteFuture);
}

// Ensure all deletes complete
for (Future<DeleteMessageResult> future : deleteFutures) {
    future.get();
}

Message Prefetching

Receive Optimization

Intelligent prefetching to reduce receive operation latency.

public class PrefetchingConsumer {
    private final AmazonSQSBufferedAsyncClient bufferedClient;
    private final String queueUrl;
    
    public PrefetchingConsumer(String queueUrl) {
        // Configure for aggressive prefetching
        QueueBufferConfig config = new QueueBufferConfig()
            .withMaxInflightReceiveBatches(20)   // More prefetch requests
            .withMaxDoneReceiveBatches(100)      // Larger buffer
            .withLongPoll(true)                  // Long polling
            .withVisibilityTimeoutSeconds(300);  // 5 minute processing time
        
        this.bufferedClient = new AmazonSQSBufferedAsyncClient(
            AmazonSQSAsyncClientBuilder.defaultClient(), config);
        this.queueUrl = queueUrl;
    }
    
    public void startConsuming() {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        
        for (int i = 0; i < 10; i++) {
            executor.submit(() -> {
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        // This receive will be served from prefetched messages when available
                        ReceiveMessageResult result = bufferedClient.receiveMessage(
                            new ReceiveMessageRequest(queueUrl)
                                .withMaxNumberOfMessages(10)
                        );
                        
                        for (Message message : result.getMessages()) {
                            processMessage(message);
                            
                            // Delete will be batched
                            bufferedClient.deleteMessage(queueUrl, message.getReceiptHandle());
                        }
                        
                    } catch (Exception e) {
                        System.err.println("Consumer error: " + e.getMessage());
                    }
                }
            });
        }
    }
    
    private void processMessage(Message message) {
        // Message processing logic
        System.out.println("Processing: " + message.getBody());
        try {
            Thread.sleep(100); // Simulate processing time
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

Performance Tuning

Configuration Guidelines

Optimize configuration based on usage patterns and requirements.

public class BufferedClientTuning {
    
    public static QueueBufferConfig forHighThroughputSend() {
        return new QueueBufferConfig()
            .withMaxBatchSize(25)                    // Max batch size for cost efficiency
            .withMaxInflightOutboundBatches(50)     // High concurrency
            .withMaxBatchOpenMs(1000)                // 1 second batch timeout for low latency
            .withLongPoll(false);                    // Not needed for send-heavy workload
    }
    
    public static QueueBufferConfig forHighThroughputReceive() {
        return new QueueBufferConfig()
            .withMaxInflightReceiveBatches(100)     // Aggressive prefetching
            .withMaxDoneReceiveBatches(200)         // Large buffer
            .withLongPoll(true)                     // Reduce empty receives
            .withVisibilityTimeoutSeconds(600);     // 10 minutes for processing
    }
    
    public static QueueBufferConfig forLowLatency() {
        return new QueueBufferConfig()
            .withMaxBatchSize(5)                    // Smaller batches
            .withMaxBatchOpenMs(100)                 // 100ms batch timeout for very low latency
            .withMaxInflightOutboundBatches(10)     // Moderate concurrency
            .withLongPoll(false);                   // Immediate response
    }
    
    public static QueueBufferConfig forCostOptimization() {
        return new QueueBufferConfig()
            .withMaxBatchSize(25)                   // Maximum batch size
            .withMaxBatchOpenMs(10000)               // 10 second batch timeout to maximize batching
            .withMaxInflightOutboundBatches(5)      // Lower concurrency
            .withLongPoll(true)                     // Reduce API calls
            .withMaxInflightReceiveBatches(5);      // Conservative prefetching
    }
}

Monitoring and Metrics

Track buffered client performance and behavior.

public class BufferedClientMonitor {
    private final AmazonSQSBufferedAsyncClient bufferedClient;
    private final ScheduledExecutorService monitor;
    
    public BufferedClientMonitor(AmazonSQSBufferedAsyncClient bufferedClient) {
        this.bufferedClient = bufferedClient;
        this.monitor = Executors.newSingleThreadScheduledExecutor();
    }
    
    public void startMonitoring() {
        monitor.scheduleAtFixedRate(this::logMetrics, 0, 30, TimeUnit.SECONDS);
    }
    
    private void logMetrics() {
        // Note: Actual metrics would require instrumentation
        // This is conceptual - real implementation would track:
        
        System.out.println("=== Buffered Client Metrics ===");
        // - Batch fill ratios
        // - Average batch size
        // - Flush timeout occurrences
        // - Prefetch buffer hit rate
        // - API call reduction percentage
        // - Latency improvements
        System.out.println("Monitoring buffered client performance...");
    }
    
    public void shutdown() {
        monitor.shutdown();
    }
}

Best Practices

Optimal Usage Patterns

Guidelines for maximizing buffered client benefits.

public class BufferedClientBestPractices {
    
    // DO: Use for high-volume operations
    public void goodHighVolumePattern() {
        AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient();
        
        // Sending many messages - batching provides significant benefits
        for (int i = 0; i < 1000; i++) {
            client.sendMessageAsync(new SendMessageRequest(queueUrl, "Message " + i));
        }
    }
    
    // DO: Configure appropriately for workload
    public void goodConfigurationPattern() {
        QueueBufferConfig config = new QueueBufferConfig();
        
        if (isHighThroughputWorkload()) {
            config.withMaxBatchSize(25)
                  .withMaxInflightOutboundBatches(20);
        } else if (isLowLatencyWorkload()) {
            config.withMaxBatchOpenMs(100)
                  .withMaxBatchSize(5);
        }
        
        AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient(
            AmazonSQSAsyncClientBuilder.defaultClient(), config);
    }
    
    // DON'T: Use for single operations
    public void poorSingleOperationPattern() {
        AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient();
        
        // Only sending one message - no batching benefit
        client.sendMessage(new SendMessageRequest(queueUrl, "Single message"));
        
        // Better to use regular client for single operations
        AmazonSQS regularClient = AmazonSQSClientBuilder.defaultClient();
        regularClient.sendMessage(new SendMessageRequest(queueUrl, "Single message"));
    }
    
    // DON'T: Create multiple buffered clients for same queue
    public void poorMultipleClientPattern() {
        // Creates separate buffers - reduces batching efficiency
        AmazonSQSBufferedAsyncClient client1 = new AmazonSQSBufferedAsyncClient();
        AmazonSQSBufferedAsyncClient client2 = new AmazonSQSBufferedAsyncClient();
        
        // Better: Share single buffered client
        AmazonSQSBufferedAsyncClient sharedClient = new AmazonSQSBufferedAsyncClient();
        // Use sharedClient in multiple threads/components
    }
    
    private boolean isHighThroughputWorkload() {
        return true; // Your logic here
    }
    
    private boolean isLowLatencyWorkload() {
        return false; // Your logic here
    }
}

Error Handling

Handle buffered client specific considerations.

// Buffered client preserves individual operation results
List<Future<SendMessageResult>> futures = new ArrayList<>();

for (int i = 0; i < 10; i++) {
    futures.add(bufferedClient.sendMessageAsync(
        new SendMessageRequest(queueUrl, "Message " + i)));
}

// Each future represents individual operation result, even though batched
for (int i = 0; i < futures.size(); i++) {
    try {
        SendMessageResult result = futures.get(i).get();
        System.out.println("Message " + i + " sent: " + result.getMessageId());
    } catch (ExecutionException e) {
        System.err.println("Message " + i + " failed: " + e.getCause().getMessage());
        // Individual message failure doesn't affect others in batch
    }
}

Migration from Standard Client

Converting Existing Code

Straightforward migration from standard to buffered client.

// Before: Standard client
AmazonSQS standardClient = AmazonSQSClientBuilder.defaultClient();

// After: Buffered client (drop-in replacement)
AmazonSQSBufferedAsyncClient bufferedClient = new AmazonSQSBufferedAsyncClient();

// All existing code works unchanged
SendMessageResult result = bufferedClient.sendMessage(
    new SendMessageRequest(queueUrl, "message"));

// Async variants also available
Future<SendMessageResult> future = bufferedClient.sendMessageAsync(
    new SendMessageRequest(queueUrl, "async message"));

Gradual Migration Strategy

Approach for safely migrating production systems.

public class GradualMigration {
    private final AmazonSQS standardClient;
    private final AmazonSQSBufferedAsyncClient bufferedClient;
    private final double bufferedClientRatio = 0.1; // Start with 10%
    
    public GradualMigration() {
        this.standardClient = AmazonSQSClientBuilder.defaultClient();
        this.bufferedClient = new AmazonSQSBufferedAsyncClient();
    }
    
    public SendMessageResult sendMessage(SendMessageRequest request) {
        if (Math.random() < bufferedClientRatio) {
            // Use buffered client for percentage of traffic
            return bufferedClient.sendMessage(request);
        } else {
            // Use standard client for majority of traffic
            return standardClient.sendMessage(request);
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-com-amazonaws--aws-java-sdk-sqs

docs

async-operations.md

buffered-client.md

client-management.md

dead-letter-queues.md

index.md

message-operations.md

message-visibility.md

queue-operations.md

queue-permissions.md

queue-tagging.md

tile.json