The AWS Java SDK for Amazon SQS module provides client classes for communicating with Amazon Simple Queue Service
—
Asynchronous operations provide non-blocking execution of SQS operations using Future-based return types and optional callback handlers. This enables high-performance applications to efficiently manage multiple concurrent SQS operations without blocking threads.
Extended interface providing asynchronous versions of all SQS operations.
interface AmazonSQSAsync extends AmazonSQS {
// Every synchronous operation has async equivalents
Future<CreateQueueResult> createQueueAsync(CreateQueueRequest request);
Future<CreateQueueResult> createQueueAsync(CreateQueueRequest request,
AsyncHandler<CreateQueueRequest, CreateQueueResult> asyncHandler);
Future<SendMessageResult> sendMessageAsync(SendMessageRequest request);
Future<SendMessageResult> sendMessageAsync(SendMessageRequest request,
AsyncHandler<SendMessageRequest, SendMessageResult> asyncHandler);
Future<SendMessageBatchResult> sendMessageBatchAsync(SendMessageBatchRequest request);
Future<SendMessageBatchResult> sendMessageBatchAsync(SendMessageBatchRequest request,
AsyncHandler<SendMessageBatchRequest, SendMessageBatchResult> asyncHandler);
Future<ReceiveMessageResult> receiveMessageAsync(ReceiveMessageRequest request);
Future<ReceiveMessageResult> receiveMessageAsync(ReceiveMessageRequest request,
AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult> asyncHandler);
Future<DeleteMessageResult> deleteMessageAsync(DeleteMessageRequest request);
Future<DeleteMessageResult> deleteMessageAsync(DeleteMessageRequest request,
AsyncHandler<DeleteMessageRequest, DeleteMessageResult> asyncHandler);
Future<DeleteMessageBatchResult> deleteMessageBatchAsync(DeleteMessageBatchRequest request);
Future<DeleteMessageBatchResult> deleteMessageBatchAsync(DeleteMessageBatchRequest request,
AsyncHandler<DeleteMessageBatchRequest, DeleteMessageBatchResult> asyncHandler);
// All other operations follow the same pattern...
}
// Callback interface for async operations
interface AsyncHandler<REQUEST extends AmazonWebServiceRequest, RESULT> {
void onError(Exception exception);
void onSuccess(REQUEST request, RESULT result);
}Create asynchronous SQS clients with custom configuration.
class AmazonSQSAsyncClientBuilder extends AwsAsyncClientBuilder<AmazonSQSAsyncClientBuilder, AmazonSQSAsync> {
static AmazonSQSAsyncClientBuilder standard();
static AmazonSQSAsync defaultClient();
AmazonSQSAsync build();
}Usage Example:
// Default async client
AmazonSQSAsync asyncClient = AmazonSQSAsyncClientBuilder.defaultClient();
// Custom async client with thread pool
ExecutorService executor = Executors.newFixedThreadPool(20);
AmazonSQSAsync customAsyncClient = AmazonSQSAsyncClientBuilder.standard()
.withRegion(Regions.US_WEST_2)
.withExecutorFactory(() -> executor)
.withCredentials(new ProfileCredentialsProvider())
.build();Use Future objects to manage asynchronous operation completion.
Usage Example:
// Send message asynchronously
Future<SendMessageResult> sendFuture = asyncClient.sendMessageAsync(
new SendMessageRequest(queueUrl, "Async message"));
// Do other work while message is being sent
performOtherWork();
try {
// Wait for completion and get result
SendMessageResult result = sendFuture.get(30, TimeUnit.SECONDS);
System.out.println("Message sent: " + result.getMessageId());
} catch (TimeoutException e) {
System.err.println("Send operation timed out");
sendFuture.cancel(true);
} catch (ExecutionException e) {
System.err.println("Send failed: " + e.getCause().getMessage());
}
// Multiple concurrent operations
List<Future<SendMessageResult>> sendFutures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Future<SendMessageResult> future = asyncClient.sendMessageAsync(
new SendMessageRequest(queueUrl, "Message " + i));
sendFutures.add(future);
}
// Wait for all to complete
for (Future<SendMessageResult> future : sendFutures) {
try {
SendMessageResult result = future.get();
System.out.println("Sent: " + result.getMessageId());
} catch (ExecutionException e) {
System.err.println("Send failed: " + e.getCause().getMessage());
}
}Use callbacks for reactive processing of async operation results.
Usage Example:
// Create async handler
AsyncHandler<SendMessageRequest, SendMessageResult> handler =
new AsyncHandler<SendMessageRequest, SendMessageResult>() {
@Override
public void onSuccess(SendMessageRequest request, SendMessageResult result) {
System.out.println("Successfully sent message: " + result.getMessageId());
// Process success...
}
@Override
public void onError(Exception exception) {
System.err.println("Failed to send message: " + exception.getMessage());
// Handle error...
}
};
// Send with callback
asyncClient.sendMessageAsync(new SendMessageRequest(queueUrl, "Callback message"), handler);
// Lambda-based handlers (Java 8+)
asyncClient.receiveMessageAsync(
new ReceiveMessageRequest(queueUrl),
new AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult>() {
@Override
public void onSuccess(ReceiveMessageRequest request, ReceiveMessageResult result) {
result.getMessages().forEach(message -> {
System.out.println("Received: " + message.getBody());
// Process message asynchronously
asyncClient.deleteMessageAsync(
new DeleteMessageRequest(queueUrl, message.getReceiptHandle()),
new AsyncHandler<DeleteMessageRequest, DeleteMessageResult>() {
@Override
public void onSuccess(DeleteMessageRequest req, DeleteMessageResult res) {
System.out.println("Deleted message: " + message.getMessageId());
}
@Override
public void onError(Exception exception) {
System.err.println("Delete failed: " + exception.getMessage());
}
}
);
});
}
@Override
public void onError(Exception exception) {
System.err.println("Receive failed: " + exception.getMessage());
}
}
);Implement high-throughput producer-consumer patterns with async operations.
public class AsyncProducerConsumer {
private final AmazonSQSAsync asyncClient;
private final String queueUrl;
private final ExecutorService executorService;
public AsyncProducerConsumer(AmazonSQSAsync asyncClient, String queueUrl) {
this.asyncClient = asyncClient;
this.queueUrl = queueUrl;
this.executorService = Executors.newFixedThreadPool(10);
}
// High-throughput producer
public void startProducer(BlockingQueue<String> messageQueue) {
executorService.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
String messageBody = messageQueue.take();
asyncClient.sendMessageAsync(
new SendMessageRequest(queueUrl, messageBody),
new AsyncHandler<SendMessageRequest, SendMessageResult>() {
@Override
public void onSuccess(SendMessageRequest request, SendMessageResult result) {
System.out.println("Produced: " + result.getMessageId());
}
@Override
public void onError(Exception exception) {
System.err.println("Production failed: " + exception.getMessage());
// Re-queue message for retry
messageQueue.offer(messageBody);
}
}
);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
// High-throughput consumer
public void startConsumer(Consumer<Message> messageProcessor) {
executorService.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
asyncClient.receiveMessageAsync(
new ReceiveMessageRequest(queueUrl)
.withMaxNumberOfMessages(10)
.withWaitTimeSeconds(20),
new AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult>() {
@Override
public void onSuccess(ReceiveMessageRequest request, ReceiveMessageResult result) {
for (Message message : result.getMessages()) {
// Process message asynchronously
CompletableFuture.runAsync(() -> {
try {
messageProcessor.accept(message);
// Delete after successful processing
asyncClient.deleteMessageAsync(
new DeleteMessageRequest(queueUrl, message.getReceiptHandle())
);
} catch (Exception e) {
System.err.println("Processing failed: " + e.getMessage());
}
}, executorService);
}
}
@Override
public void onError(Exception exception) {
System.err.println("Receive failed: " + exception.getMessage());
}
}
);
try {
Thread.sleep(1000); // Brief pause between receive operations
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
}Combine async operations with CompletableFuture for advanced flow control.
public class AsyncBatchProcessor {
private final AmazonSQSAsync asyncClient;
private final String queueUrl;
public AsyncBatchProcessor(AmazonSQSAsync asyncClient, String queueUrl) {
this.asyncClient = asyncClient;
this.queueUrl = queueUrl;
}
public CompletableFuture<List<String>> sendMessageBatch(List<String> messages) {
// Convert to batch entries
List<SendMessageBatchRequestEntry> entries = IntStream.range(0, messages.size())
.mapToObj(i -> new SendMessageBatchRequestEntry()
.withId("msg-" + i)
.withMessageBody(messages.get(i)))
.collect(Collectors.toList());
// Create CompletableFuture from AWS Future
CompletableFuture<SendMessageBatchResult> batchFuture =
CompletableFuture.supplyAsync(() -> {
try {
return asyncClient.sendMessageBatchAsync(
new SendMessageBatchRequest(queueUrl, entries)).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
// Transform result to list of message IDs
return batchFuture.thenApply(result ->
result.getSuccessful().stream()
.map(SendMessageBatchResultEntry::getMessageId)
.collect(Collectors.toList())
);
}
public CompletableFuture<List<Message>> receiveAndProcessBatch(int maxMessages) {
CompletableFuture<ReceiveMessageResult> receiveFuture =
CompletableFuture.supplyAsync(() -> {
try {
return asyncClient.receiveMessageAsync(
new ReceiveMessageRequest(queueUrl)
.withMaxNumberOfMessages(maxMessages)
.withWaitTimeSeconds(20)
).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return receiveFuture.thenCompose(result -> {
List<CompletableFuture<Message>> processingFutures =
result.getMessages().stream()
.map(this::processMessageAsync)
.collect(Collectors.toList());
return CompletableFuture.allOf(
processingFutures.toArray(new CompletableFuture[0])
).thenApply(v ->
processingFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
});
}
private CompletableFuture<Message> processMessageAsync(Message message) {
return CompletableFuture.supplyAsync(() -> {
// Simulate processing
try {
Thread.sleep(100);
// Delete message after processing
asyncClient.deleteMessageAsync(
new DeleteMessageRequest(queueUrl, message.getReceiptHandle())
);
return message;
} catch (Exception e) {
throw new RuntimeException("Processing failed", e);
}
});
}
}Handle exceptions in both Future-based and callback-based async operations.
// Future-based error handling
Future<SendMessageResult> future = asyncClient.sendMessageAsync(request);
try {
SendMessageResult result = future.get(30, TimeUnit.SECONDS);
// Handle success
} catch (TimeoutException e) {
System.err.println("Operation timed out");
future.cancel(true);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof AmazonSQSException) {
AmazonSQSException sqsException = (AmazonSQSException) cause;
System.err.println("SQS Error: " + sqsException.getErrorCode());
} else {
System.err.println("Unexpected error: " + cause.getMessage());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Operation interrupted");
}
// Callback-based error handling with retry logic
public class RetryingAsyncHandler<REQUEST extends AmazonWebServiceRequest, RESULT>
implements AsyncHandler<REQUEST, RESULT> {
private final int maxRetries;
private final Function<REQUEST, Future<RESULT>> retryFunction;
private int attempt = 0;
public RetryingAsyncHandler(int maxRetries, Function<REQUEST, Future<RESULT>> retryFunction) {
this.maxRetries = maxRetries;
this.retryFunction = retryFunction;
}
@Override
public void onSuccess(REQUEST request, RESULT result) {
System.out.println("Operation succeeded on attempt " + (attempt + 1));
// Handle success
}
@Override
public void onError(Exception exception) {
attempt++;
if (attempt <= maxRetries && isRetryableException(exception)) {
System.out.println("Retrying operation, attempt " + attempt);
// Exponential backoff
int delay = (int) Math.pow(2, attempt - 1) * 1000;
CompletableFuture.delayedExecutor(delay, TimeUnit.MILLISECONDS)
.execute(() -> {
// This would need the original request, which requires capturing it
// In practice, you'd need to design this differently
});
} else {
System.err.println("Operation failed after " + attempt + " attempts: " +
exception.getMessage());
// Handle permanent failure
}
}
private boolean isRetryableException(Exception exception) {
return exception instanceof RequestThrottledException ||
exception instanceof AmazonClientException;
}
}Configure thread pools appropriately for async operations.
// Custom thread pool configuration
ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(
10, // Core pool size
50, // Maximum pool size
60L, TimeUnit.SECONDS, // Keep alive time
new LinkedBlockingQueue<>(1000), // Work queue
new ThreadFactoryBuilder()
.setNameFormat("sqs-async-%d")
.setDaemon(true)
.build()
);
AmazonSQSAsync asyncClient = AmazonSQSAsyncClientBuilder.standard()
.withExecutorFactory(() -> customExecutor)
.build();
// Monitor thread pool
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {
System.out.println("Active threads: " + customExecutor.getActiveCount());
System.out.println("Queue size: " + customExecutor.getQueue().size());
}, 0, 30, TimeUnit.SECONDS);Install with Tessl CLI
npx tessl i tessl/maven-com-amazonaws--aws-java-sdk-sqs