CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-amazonaws--aws-java-sdk-sqs

The AWS Java SDK for Amazon SQS module provides client classes for communicating with Amazon Simple Queue Service

Pending
Overview
Eval results
Files

async-operations.mddocs/

Asynchronous Operations

Asynchronous operations provide non-blocking execution of SQS operations using Future-based return types and optional callback handlers. This enables high-performance applications to efficiently manage multiple concurrent SQS operations without blocking threads.

Async Client Interface

AmazonSQSAsync Interface

Extended interface providing asynchronous versions of all SQS operations.

interface AmazonSQSAsync extends AmazonSQS {
    // Every synchronous operation has async equivalents
    Future<CreateQueueResult> createQueueAsync(CreateQueueRequest request);
    Future<CreateQueueResult> createQueueAsync(CreateQueueRequest request, 
        AsyncHandler<CreateQueueRequest, CreateQueueResult> asyncHandler);
    
    Future<SendMessageResult> sendMessageAsync(SendMessageRequest request);
    Future<SendMessageResult> sendMessageAsync(SendMessageRequest request,
        AsyncHandler<SendMessageRequest, SendMessageResult> asyncHandler);
    
    Future<SendMessageBatchResult> sendMessageBatchAsync(SendMessageBatchRequest request);
    Future<SendMessageBatchResult> sendMessageBatchAsync(SendMessageBatchRequest request,
        AsyncHandler<SendMessageBatchRequest, SendMessageBatchResult> asyncHandler);
    
    Future<ReceiveMessageResult> receiveMessageAsync(ReceiveMessageRequest request);
    Future<ReceiveMessageResult> receiveMessageAsync(ReceiveMessageRequest request,
        AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult> asyncHandler);
    
    Future<DeleteMessageResult> deleteMessageAsync(DeleteMessageRequest request);
    Future<DeleteMessageResult> deleteMessageAsync(DeleteMessageRequest request,
        AsyncHandler<DeleteMessageRequest, DeleteMessageResult> asyncHandler);
    
    Future<DeleteMessageBatchResult> deleteMessageBatchAsync(DeleteMessageBatchRequest request);
    Future<DeleteMessageBatchResult> deleteMessageBatchAsync(DeleteMessageBatchRequest request,
        AsyncHandler<DeleteMessageBatchRequest, DeleteMessageBatchResult> asyncHandler);
    
    // All other operations follow the same pattern...
}

// Callback interface for async operations
interface AsyncHandler<REQUEST extends AmazonWebServiceRequest, RESULT> {
    void onError(Exception exception);
    void onSuccess(REQUEST request, RESULT result);
}

Async Client Creation

Build Async Clients

Create asynchronous SQS clients with custom configuration.

class AmazonSQSAsyncClientBuilder extends AwsAsyncClientBuilder<AmazonSQSAsyncClientBuilder, AmazonSQSAsync> {
    static AmazonSQSAsyncClientBuilder standard();
    static AmazonSQSAsync defaultClient();
    AmazonSQSAsync build();
}

Usage Example:

// Default async client
AmazonSQSAsync asyncClient = AmazonSQSAsyncClientBuilder.defaultClient();

// Custom async client with thread pool
ExecutorService executor = Executors.newFixedThreadPool(20);

AmazonSQSAsync customAsyncClient = AmazonSQSAsyncClientBuilder.standard()
    .withRegion(Regions.US_WEST_2)
    .withExecutorFactory(() -> executor)
    .withCredentials(new ProfileCredentialsProvider())
    .build();

Future-Based Operations

Working with Futures

Use Future objects to manage asynchronous operation completion.

Usage Example:

// Send message asynchronously
Future<SendMessageResult> sendFuture = asyncClient.sendMessageAsync(
    new SendMessageRequest(queueUrl, "Async message"));

// Do other work while message is being sent
performOtherWork();

try {
    // Wait for completion and get result
    SendMessageResult result = sendFuture.get(30, TimeUnit.SECONDS);
    System.out.println("Message sent: " + result.getMessageId());
} catch (TimeoutException e) {
    System.err.println("Send operation timed out");
    sendFuture.cancel(true);
} catch (ExecutionException e) {
    System.err.println("Send failed: " + e.getCause().getMessage());
}

// Multiple concurrent operations
List<Future<SendMessageResult>> sendFutures = new ArrayList<>();

for (int i = 0; i < 10; i++) {
    Future<SendMessageResult> future = asyncClient.sendMessageAsync(
        new SendMessageRequest(queueUrl, "Message " + i));
    sendFutures.add(future);
}

// Wait for all to complete
for (Future<SendMessageResult> future : sendFutures) {
    try {
        SendMessageResult result = future.get();
        System.out.println("Sent: " + result.getMessageId());
    } catch (ExecutionException e) {
        System.err.println("Send failed: " + e.getCause().getMessage());
    }
}

Callback-Based Operations

AsyncHandler Implementation

Use callbacks for reactive processing of async operation results.

Usage Example:

// Create async handler
AsyncHandler<SendMessageRequest, SendMessageResult> handler = 
    new AsyncHandler<SendMessageRequest, SendMessageResult>() {
        @Override
        public void onSuccess(SendMessageRequest request, SendMessageResult result) {
            System.out.println("Successfully sent message: " + result.getMessageId());
            // Process success...
        }
        
        @Override
        public void onError(Exception exception) {
            System.err.println("Failed to send message: " + exception.getMessage());
            // Handle error...
        }
    };

// Send with callback
asyncClient.sendMessageAsync(new SendMessageRequest(queueUrl, "Callback message"), handler);

// Lambda-based handlers (Java 8+)
asyncClient.receiveMessageAsync(
    new ReceiveMessageRequest(queueUrl),
    new AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult>() {
        @Override
        public void onSuccess(ReceiveMessageRequest request, ReceiveMessageResult result) {
            result.getMessages().forEach(message -> {
                System.out.println("Received: " + message.getBody());
                
                // Process message asynchronously
                asyncClient.deleteMessageAsync(
                    new DeleteMessageRequest(queueUrl, message.getReceiptHandle()),
                    new AsyncHandler<DeleteMessageRequest, DeleteMessageResult>() {
                        @Override
                        public void onSuccess(DeleteMessageRequest req, DeleteMessageResult res) {
                            System.out.println("Deleted message: " + message.getMessageId());
                        }
                        
                        @Override
                        public void onError(Exception exception) {
                            System.err.println("Delete failed: " + exception.getMessage());
                        }
                    }
                );
            });
        }
        
        @Override
        public void onError(Exception exception) {
            System.err.println("Receive failed: " + exception.getMessage());
        }
    }
);

Concurrent Processing Patterns

Producer-Consumer Pattern

Implement high-throughput producer-consumer patterns with async operations.

public class AsyncProducerConsumer {
    private final AmazonSQSAsync asyncClient;
    private final String queueUrl;
    private final ExecutorService executorService;
    
    public AsyncProducerConsumer(AmazonSQSAsync asyncClient, String queueUrl) {
        this.asyncClient = asyncClient;
        this.queueUrl = queueUrl;
        this.executorService = Executors.newFixedThreadPool(10);
    }
    
    // High-throughput producer
    public void startProducer(BlockingQueue<String> messageQueue) {
        executorService.submit(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    String messageBody = messageQueue.take();
                    
                    asyncClient.sendMessageAsync(
                        new SendMessageRequest(queueUrl, messageBody),
                        new AsyncHandler<SendMessageRequest, SendMessageResult>() {
                            @Override
                            public void onSuccess(SendMessageRequest request, SendMessageResult result) {
                                System.out.println("Produced: " + result.getMessageId());
                            }
                            
                            @Override
                            public void onError(Exception exception) {
                                System.err.println("Production failed: " + exception.getMessage());
                                // Re-queue message for retry
                                messageQueue.offer(messageBody);
                            }
                        }
                    );
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
    }
    
    // High-throughput consumer
    public void startConsumer(Consumer<Message> messageProcessor) {
        executorService.submit(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                asyncClient.receiveMessageAsync(
                    new ReceiveMessageRequest(queueUrl)
                        .withMaxNumberOfMessages(10)
                        .withWaitTimeSeconds(20),
                    new AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult>() {
                        @Override
                        public void onSuccess(ReceiveMessageRequest request, ReceiveMessageResult result) {
                            for (Message message : result.getMessages()) {
                                // Process message asynchronously
                                CompletableFuture.runAsync(() -> {
                                    try {
                                        messageProcessor.accept(message);
                                        
                                        // Delete after successful processing
                                        asyncClient.deleteMessageAsync(
                                            new DeleteMessageRequest(queueUrl, message.getReceiptHandle())
                                        );
                                    } catch (Exception e) {
                                        System.err.println("Processing failed: " + e.getMessage());
                                    }
                                }, executorService);
                            }
                        }
                        
                        @Override
                        public void onError(Exception exception) {
                            System.err.println("Receive failed: " + exception.getMessage());
                        }
                    }
                );
                
                try {
                    Thread.sleep(1000); // Brief pause between receive operations
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
    }
}

Batch Operations with CompletableFuture

Combine async operations with CompletableFuture for advanced flow control.

public class AsyncBatchProcessor {
    private final AmazonSQSAsync asyncClient;
    private final String queueUrl;
    
    public AsyncBatchProcessor(AmazonSQSAsync asyncClient, String queueUrl) {
        this.asyncClient = asyncClient;
        this.queueUrl = queueUrl;
    }
    
    public CompletableFuture<List<String>> sendMessageBatch(List<String> messages) {
        // Convert to batch entries
        List<SendMessageBatchRequestEntry> entries = IntStream.range(0, messages.size())
            .mapToObj(i -> new SendMessageBatchRequestEntry()
                .withId("msg-" + i)
                .withMessageBody(messages.get(i)))
            .collect(Collectors.toList());
        
        // Create CompletableFuture from AWS Future
        CompletableFuture<SendMessageBatchResult> batchFuture = 
            CompletableFuture.supplyAsync(() -> {
                try {
                    return asyncClient.sendMessageBatchAsync(
                        new SendMessageBatchRequest(queueUrl, entries)).get();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        
        // Transform result to list of message IDs
        return batchFuture.thenApply(result -> 
            result.getSuccessful().stream()
                .map(SendMessageBatchResultEntry::getMessageId)
                .collect(Collectors.toList())
        );
    }
    
    public CompletableFuture<List<Message>> receiveAndProcessBatch(int maxMessages) {
        CompletableFuture<ReceiveMessageResult> receiveFuture = 
            CompletableFuture.supplyAsync(() -> {
                try {
                    return asyncClient.receiveMessageAsync(
                        new ReceiveMessageRequest(queueUrl)
                            .withMaxNumberOfMessages(maxMessages)
                            .withWaitTimeSeconds(20)
                    ).get();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        
        return receiveFuture.thenCompose(result -> {
            List<CompletableFuture<Message>> processingFutures = 
                result.getMessages().stream()
                    .map(this::processMessageAsync)
                    .collect(Collectors.toList());
            
            return CompletableFuture.allOf(
                processingFutures.toArray(new CompletableFuture[0])
            ).thenApply(v -> 
                processingFutures.stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList())
            );
        });
    }
    
    private CompletableFuture<Message> processMessageAsync(Message message) {
        return CompletableFuture.supplyAsync(() -> {
            // Simulate processing
            try {
                Thread.sleep(100);
                
                // Delete message after processing
                asyncClient.deleteMessageAsync(
                    new DeleteMessageRequest(queueUrl, message.getReceiptHandle())
                );
                
                return message;
            } catch (Exception e) {
                throw new RuntimeException("Processing failed", e);
            }
        });
    }
}

Error Handling in Async Operations

Exception Handling Patterns

Handle exceptions in both Future-based and callback-based async operations.

// Future-based error handling
Future<SendMessageResult> future = asyncClient.sendMessageAsync(request);

try {
    SendMessageResult result = future.get(30, TimeUnit.SECONDS);
    // Handle success
} catch (TimeoutException e) {
    System.err.println("Operation timed out");
    future.cancel(true);
} catch (ExecutionException e) {
    Throwable cause = e.getCause();
    if (cause instanceof AmazonSQSException) {
        AmazonSQSException sqsException = (AmazonSQSException) cause;
        System.err.println("SQS Error: " + sqsException.getErrorCode());
    } else {
        System.err.println("Unexpected error: " + cause.getMessage());
    }
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    System.err.println("Operation interrupted");
}

// Callback-based error handling with retry logic
public class RetryingAsyncHandler<REQUEST extends AmazonWebServiceRequest, RESULT> 
        implements AsyncHandler<REQUEST, RESULT> {
    
    private final int maxRetries;
    private final Function<REQUEST, Future<RESULT>> retryFunction;
    private int attempt = 0;
    
    public RetryingAsyncHandler(int maxRetries, Function<REQUEST, Future<RESULT>> retryFunction) {
        this.maxRetries = maxRetries;
        this.retryFunction = retryFunction;
    }
    
    @Override
    public void onSuccess(REQUEST request, RESULT result) {
        System.out.println("Operation succeeded on attempt " + (attempt + 1));
        // Handle success
    }
    
    @Override
    public void onError(Exception exception) {
        attempt++;
        
        if (attempt <= maxRetries && isRetryableException(exception)) {
            System.out.println("Retrying operation, attempt " + attempt);
            
            // Exponential backoff
            int delay = (int) Math.pow(2, attempt - 1) * 1000;
            
            CompletableFuture.delayedExecutor(delay, TimeUnit.MILLISECONDS)
                .execute(() -> {
                    // This would need the original request, which requires capturing it
                    // In practice, you'd need to design this differently
                });
        } else {
            System.err.println("Operation failed after " + attempt + " attempts: " + 
                exception.getMessage());
            // Handle permanent failure
        }
    }
    
    private boolean isRetryableException(Exception exception) {
        return exception instanceof RequestThrottledException ||
               exception instanceof AmazonClientException;
    }
}

Performance Considerations

Thread Pool Management

Configure thread pools appropriately for async operations.

// Custom thread pool configuration
ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(
    10,                          // Core pool size
    50,                          // Maximum pool size  
    60L, TimeUnit.SECONDS,       // Keep alive time
    new LinkedBlockingQueue<>(1000), // Work queue
    new ThreadFactoryBuilder()
        .setNameFormat("sqs-async-%d")
        .setDaemon(true)
        .build()
);

AmazonSQSAsync asyncClient = AmazonSQSAsyncClientBuilder.standard()
    .withExecutorFactory(() -> customExecutor)
    .build();

// Monitor thread pool
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {
    System.out.println("Active threads: " + customExecutor.getActiveCount());
    System.out.println("Queue size: " + customExecutor.getQueue().size());
}, 0, 30, TimeUnit.SECONDS);

Install with Tessl CLI

npx tessl i tessl/maven-com-amazonaws--aws-java-sdk-sqs

docs

async-operations.md

buffered-client.md

client-management.md

dead-letter-queues.md

index.md

message-operations.md

message-visibility.md

queue-operations.md

queue-permissions.md

queue-tagging.md

tile.json