The AWS Java SDK for Amazon SQS module provides client classes for communicating with Amazon Simple Queue Service
—
The Amazon SQS Buffered Async Client provides automatic batching and prefetching capabilities to significantly improve throughput and reduce costs. It optimizes send, delete, and change visibility operations through client-side batching, while implementing intelligent message prefetching for receive operations.
High-performance client implementation with automatic optimization features.
class AmazonSQSBufferedAsyncClient implements AmazonSQSAsync {
// Inherits all AmazonSQSAsync methods with optimized implementations
// Standard constructors
AmazonSQSBufferedAsyncClient();
AmazonSQSBufferedAsyncClient(AmazonSQSAsync realSQS);
AmazonSQSBufferedAsyncClient(AmazonSQSAsync realSQS, QueueBufferConfig config);
// Configuration-based constructors
AmazonSQSBufferedAsyncClient(AWSCredentialsProvider credentialsProvider);
AmazonSQSBufferedAsyncClient(AWSCredentialsProvider credentialsProvider,
ClientConfiguration clientConfiguration, ExecutorService executorService,
QueueBufferConfig queueBufferConfig);
}Configuration class for customizing buffered client behavior.
class QueueBufferConfig {
// Default configuration
QueueBufferConfig();
// Batch configuration
int getMaxBatchSize();
QueueBufferConfig withMaxBatchSize(int maxBatchSize);
// Concurrency configuration
int getMaxInflightOutboundBatches();
QueueBufferConfig withMaxInflightOutboundBatches(int maxInflightOutboundBatches);
int getMaxInflightReceiveBatches();
QueueBufferConfig withMaxInflightReceiveBatches(int maxInflightReceiveBatches);
// Timing configuration
long getMaxBatchOpenMs();
QueueBufferConfig withMaxBatchOpenMs(long maxBatchOpenMs);
int getVisibilityTimeoutSeconds();
QueueBufferConfig withVisibilityTimeoutSeconds(int visibilityTimeoutSeconds);
// Polling configuration
boolean isLongPoll();
QueueBufferConfig withLongPoll(boolean longPoll);
int getMaxDoneReceiveBatches();
QueueBufferConfig withMaxDoneReceiveBatches(int maxDoneReceiveBatches);
}Usage Example:
// Create buffered client with default configuration
AmazonSQSBufferedAsyncClient bufferedClient = new AmazonSQSBufferedAsyncClient();
// Create buffered client with custom configuration
QueueBufferConfig config = new QueueBufferConfig()
.withMaxBatchSize(25) // Larger batches
.withMaxInflightOutboundBatches(10) // More concurrent batches
.withMaxBatchOpenMs(5000) // 5 second batch timeout
.withVisibilityTimeoutSeconds(120) // 2 minute visibility
.withLongPoll(true) // Enable long polling
.withMaxDoneReceiveBatches(50); // Larger receive buffer
AmazonSQSBufferedAsyncClient customBufferedClient =
new AmazonSQSBufferedAsyncClient(
AmazonSQSAsyncClientBuilder.defaultClient(),
config
);Automatic batching of individual send operations into batch requests.
Usage Example:
AmazonSQSBufferedAsyncClient bufferedClient = new AmazonSQSBufferedAsyncClient();
// These individual sends will be automatically batched
List<Future<SendMessageResult>> futures = new ArrayList<>();
for (int i = 0; i < 50; i++) {
Future<SendMessageResult> future = bufferedClient.sendMessageAsync(
new SendMessageRequest()
.withQueueUrl(queueUrl)
.withMessageBody("Message " + i)
);
futures.add(future);
}
// Wait for all sends to complete (they were sent in batches behind the scenes)
for (Future<SendMessageResult> future : futures) {
try {
SendMessageResult result = future.get();
System.out.println("Sent message: " + result.getMessageId());
} catch (ExecutionException e) {
System.err.println("Send failed: " + e.getCause().getMessage());
}
}Automatic batching of delete operations for improved efficiency.
// Receive messages
ReceiveMessageResult receiveResult = bufferedClient.receiveMessage(
new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10)
);
// Delete messages individually - will be batched automatically
List<Future<DeleteMessageResult>> deleteFutures = new ArrayList<>();
for (Message message : receiveResult.getMessages()) {
// Process message
processMessage(message);
// Delete (will be batched with other deletes)
Future<DeleteMessageResult> deleteFuture = bufferedClient.deleteMessageAsync(
new DeleteMessageRequest()
.withQueueUrl(queueUrl)
.withReceiptHandle(message.getReceiptHandle())
);
deleteFutures.add(deleteFuture);
}
// Ensure all deletes complete
for (Future<DeleteMessageResult> future : deleteFutures) {
future.get();
}Intelligent prefetching to reduce receive operation latency.
public class PrefetchingConsumer {
private final AmazonSQSBufferedAsyncClient bufferedClient;
private final String queueUrl;
public PrefetchingConsumer(String queueUrl) {
// Configure for aggressive prefetching
QueueBufferConfig config = new QueueBufferConfig()
.withMaxInflightReceiveBatches(20) // More prefetch requests
.withMaxDoneReceiveBatches(100) // Larger buffer
.withLongPoll(true) // Long polling
.withVisibilityTimeoutSeconds(300); // 5 minute processing time
this.bufferedClient = new AmazonSQSBufferedAsyncClient(
AmazonSQSAsyncClientBuilder.defaultClient(), config);
this.queueUrl = queueUrl;
}
public void startConsuming() {
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
// This receive will be served from prefetched messages when available
ReceiveMessageResult result = bufferedClient.receiveMessage(
new ReceiveMessageRequest(queueUrl)
.withMaxNumberOfMessages(10)
);
for (Message message : result.getMessages()) {
processMessage(message);
// Delete will be batched
bufferedClient.deleteMessage(queueUrl, message.getReceiptHandle());
}
} catch (Exception e) {
System.err.println("Consumer error: " + e.getMessage());
}
}
});
}
}
private void processMessage(Message message) {
// Message processing logic
System.out.println("Processing: " + message.getBody());
try {
Thread.sleep(100); // Simulate processing time
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}Optimize configuration based on usage patterns and requirements.
public class BufferedClientTuning {
public static QueueBufferConfig forHighThroughputSend() {
return new QueueBufferConfig()
.withMaxBatchSize(25) // Max batch size for cost efficiency
.withMaxInflightOutboundBatches(50) // High concurrency
.withMaxBatchOpenMs(1000) // 1 second batch timeout for low latency
.withLongPoll(false); // Not needed for send-heavy workload
}
public static QueueBufferConfig forHighThroughputReceive() {
return new QueueBufferConfig()
.withMaxInflightReceiveBatches(100) // Aggressive prefetching
.withMaxDoneReceiveBatches(200) // Large buffer
.withLongPoll(true) // Reduce empty receives
.withVisibilityTimeoutSeconds(600); // 10 minutes for processing
}
public static QueueBufferConfig forLowLatency() {
return new QueueBufferConfig()
.withMaxBatchSize(5) // Smaller batches
.withMaxBatchOpenMs(100) // 100ms batch timeout for very low latency
.withMaxInflightOutboundBatches(10) // Moderate concurrency
.withLongPoll(false); // Immediate response
}
public static QueueBufferConfig forCostOptimization() {
return new QueueBufferConfig()
.withMaxBatchSize(25) // Maximum batch size
.withMaxBatchOpenMs(10000) // 10 second batch timeout to maximize batching
.withMaxInflightOutboundBatches(5) // Lower concurrency
.withLongPoll(true) // Reduce API calls
.withMaxInflightReceiveBatches(5); // Conservative prefetching
}
}Track buffered client performance and behavior.
public class BufferedClientMonitor {
private final AmazonSQSBufferedAsyncClient bufferedClient;
private final ScheduledExecutorService monitor;
public BufferedClientMonitor(AmazonSQSBufferedAsyncClient bufferedClient) {
this.bufferedClient = bufferedClient;
this.monitor = Executors.newSingleThreadScheduledExecutor();
}
public void startMonitoring() {
monitor.scheduleAtFixedRate(this::logMetrics, 0, 30, TimeUnit.SECONDS);
}
private void logMetrics() {
// Note: Actual metrics would require instrumentation
// This is conceptual - real implementation would track:
System.out.println("=== Buffered Client Metrics ===");
// - Batch fill ratios
// - Average batch size
// - Flush timeout occurrences
// - Prefetch buffer hit rate
// - API call reduction percentage
// - Latency improvements
System.out.println("Monitoring buffered client performance...");
}
public void shutdown() {
monitor.shutdown();
}
}Guidelines for maximizing buffered client benefits.
public class BufferedClientBestPractices {
// DO: Use for high-volume operations
public void goodHighVolumePattern() {
AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient();
// Sending many messages - batching provides significant benefits
for (int i = 0; i < 1000; i++) {
client.sendMessageAsync(new SendMessageRequest(queueUrl, "Message " + i));
}
}
// DO: Configure appropriately for workload
public void goodConfigurationPattern() {
QueueBufferConfig config = new QueueBufferConfig();
if (isHighThroughputWorkload()) {
config.withMaxBatchSize(25)
.withMaxInflightOutboundBatches(20);
} else if (isLowLatencyWorkload()) {
config.withMaxBatchOpenMs(100)
.withMaxBatchSize(5);
}
AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient(
AmazonSQSAsyncClientBuilder.defaultClient(), config);
}
// DON'T: Use for single operations
public void poorSingleOperationPattern() {
AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient();
// Only sending one message - no batching benefit
client.sendMessage(new SendMessageRequest(queueUrl, "Single message"));
// Better to use regular client for single operations
AmazonSQS regularClient = AmazonSQSClientBuilder.defaultClient();
regularClient.sendMessage(new SendMessageRequest(queueUrl, "Single message"));
}
// DON'T: Create multiple buffered clients for same queue
public void poorMultipleClientPattern() {
// Creates separate buffers - reduces batching efficiency
AmazonSQSBufferedAsyncClient client1 = new AmazonSQSBufferedAsyncClient();
AmazonSQSBufferedAsyncClient client2 = new AmazonSQSBufferedAsyncClient();
// Better: Share single buffered client
AmazonSQSBufferedAsyncClient sharedClient = new AmazonSQSBufferedAsyncClient();
// Use sharedClient in multiple threads/components
}
private boolean isHighThroughputWorkload() {
return true; // Your logic here
}
private boolean isLowLatencyWorkload() {
return false; // Your logic here
}
}Handle buffered client specific considerations.
// Buffered client preserves individual operation results
List<Future<SendMessageResult>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
futures.add(bufferedClient.sendMessageAsync(
new SendMessageRequest(queueUrl, "Message " + i)));
}
// Each future represents individual operation result, even though batched
for (int i = 0; i < futures.size(); i++) {
try {
SendMessageResult result = futures.get(i).get();
System.out.println("Message " + i + " sent: " + result.getMessageId());
} catch (ExecutionException e) {
System.err.println("Message " + i + " failed: " + e.getCause().getMessage());
// Individual message failure doesn't affect others in batch
}
}Straightforward migration from standard to buffered client.
// Before: Standard client
AmazonSQS standardClient = AmazonSQSClientBuilder.defaultClient();
// After: Buffered client (drop-in replacement)
AmazonSQSBufferedAsyncClient bufferedClient = new AmazonSQSBufferedAsyncClient();
// All existing code works unchanged
SendMessageResult result = bufferedClient.sendMessage(
new SendMessageRequest(queueUrl, "message"));
// Async variants also available
Future<SendMessageResult> future = bufferedClient.sendMessageAsync(
new SendMessageRequest(queueUrl, "async message"));Approach for safely migrating production systems.
public class GradualMigration {
private final AmazonSQS standardClient;
private final AmazonSQSBufferedAsyncClient bufferedClient;
private final double bufferedClientRatio = 0.1; // Start with 10%
public GradualMigration() {
this.standardClient = AmazonSQSClientBuilder.defaultClient();
this.bufferedClient = new AmazonSQSBufferedAsyncClient();
}
public SendMessageResult sendMessage(SendMessageRequest request) {
if (Math.random() < bufferedClientRatio) {
// Use buffered client for percentage of traffic
return bufferedClient.sendMessage(request);
} else {
// Use standard client for majority of traffic
return standardClient.sendMessage(request);
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-com-amazonaws--aws-java-sdk-sqs