0
# Asynchronous Operations
1
2
Asynchronous operations provide non-blocking execution of SQS operations using Future-based return types and optional callback handlers. This enables high-performance applications to efficiently manage multiple concurrent SQS operations without blocking threads.
3
4
## Async Client Interface
5
6
### AmazonSQSAsync Interface
7
8
Extended interface providing asynchronous versions of all SQS operations.
9
10
```java { .api }
11
interface AmazonSQSAsync extends AmazonSQS {
12
// Every synchronous operation has async equivalents
13
Future<CreateQueueResult> createQueueAsync(CreateQueueRequest request);
14
Future<CreateQueueResult> createQueueAsync(CreateQueueRequest request,
15
AsyncHandler<CreateQueueRequest, CreateQueueResult> asyncHandler);
16
17
Future<SendMessageResult> sendMessageAsync(SendMessageRequest request);
18
Future<SendMessageResult> sendMessageAsync(SendMessageRequest request,
19
AsyncHandler<SendMessageRequest, SendMessageResult> asyncHandler);
20
21
Future<SendMessageBatchResult> sendMessageBatchAsync(SendMessageBatchRequest request);
22
Future<SendMessageBatchResult> sendMessageBatchAsync(SendMessageBatchRequest request,
23
AsyncHandler<SendMessageBatchRequest, SendMessageBatchResult> asyncHandler);
24
25
Future<ReceiveMessageResult> receiveMessageAsync(ReceiveMessageRequest request);
26
Future<ReceiveMessageResult> receiveMessageAsync(ReceiveMessageRequest request,
27
AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult> asyncHandler);
28
29
Future<DeleteMessageResult> deleteMessageAsync(DeleteMessageRequest request);
30
Future<DeleteMessageResult> deleteMessageAsync(DeleteMessageRequest request,
31
AsyncHandler<DeleteMessageRequest, DeleteMessageResult> asyncHandler);
32
33
Future<DeleteMessageBatchResult> deleteMessageBatchAsync(DeleteMessageBatchRequest request);
34
Future<DeleteMessageBatchResult> deleteMessageBatchAsync(DeleteMessageBatchRequest request,
35
AsyncHandler<DeleteMessageBatchRequest, DeleteMessageBatchResult> asyncHandler);
36
37
// All other operations follow the same pattern...
38
}
39
40
// Callback interface for async operations
41
interface AsyncHandler<REQUEST extends AmazonWebServiceRequest, RESULT> {
42
void onError(Exception exception);
43
void onSuccess(REQUEST request, RESULT result);
44
}
45
```
46
47
## Async Client Creation
48
49
### Build Async Clients
50
51
Create asynchronous SQS clients with custom configuration.
52
53
```java { .api }
54
class AmazonSQSAsyncClientBuilder extends AwsAsyncClientBuilder<AmazonSQSAsyncClientBuilder, AmazonSQSAsync> {
55
static AmazonSQSAsyncClientBuilder standard();
56
static AmazonSQSAsync defaultClient();
57
AmazonSQSAsync build();
58
}
59
```
60
61
**Usage Example:**
62
63
```java
64
// Default async client
65
AmazonSQSAsync asyncClient = AmazonSQSAsyncClientBuilder.defaultClient();
66
67
// Custom async client with thread pool
68
ExecutorService executor = Executors.newFixedThreadPool(20);
69
70
AmazonSQSAsync customAsyncClient = AmazonSQSAsyncClientBuilder.standard()
71
.withRegion(Regions.US_WEST_2)
72
.withExecutorFactory(() -> executor)
73
.withCredentials(new ProfileCredentialsProvider())
74
.build();
75
```
76
77
## Future-Based Operations
78
79
### Working with Futures
80
81
Use Future objects to manage asynchronous operation completion.
82
83
**Usage Example:**
84
85
```java
86
// Send message asynchronously
87
Future<SendMessageResult> sendFuture = asyncClient.sendMessageAsync(
88
new SendMessageRequest(queueUrl, "Async message"));
89
90
// Do other work while message is being sent
91
performOtherWork();
92
93
try {
94
// Wait for completion and get result
95
SendMessageResult result = sendFuture.get(30, TimeUnit.SECONDS);
96
System.out.println("Message sent: " + result.getMessageId());
97
} catch (TimeoutException e) {
98
System.err.println("Send operation timed out");
99
sendFuture.cancel(true);
100
} catch (ExecutionException e) {
101
System.err.println("Send failed: " + e.getCause().getMessage());
102
}
103
104
// Multiple concurrent operations
105
List<Future<SendMessageResult>> sendFutures = new ArrayList<>();
106
107
for (int i = 0; i < 10; i++) {
108
Future<SendMessageResult> future = asyncClient.sendMessageAsync(
109
new SendMessageRequest(queueUrl, "Message " + i));
110
sendFutures.add(future);
111
}
112
113
// Wait for all to complete
114
for (Future<SendMessageResult> future : sendFutures) {
115
try {
116
SendMessageResult result = future.get();
117
System.out.println("Sent: " + result.getMessageId());
118
} catch (ExecutionException e) {
119
System.err.println("Send failed: " + e.getCause().getMessage());
120
}
121
}
122
```
123
124
## Callback-Based Operations
125
126
### AsyncHandler Implementation
127
128
Use callbacks for reactive processing of async operation results.
129
130
**Usage Example:**
131
132
```java
133
// Create async handler
134
AsyncHandler<SendMessageRequest, SendMessageResult> handler =
135
new AsyncHandler<SendMessageRequest, SendMessageResult>() {
136
@Override
137
public void onSuccess(SendMessageRequest request, SendMessageResult result) {
138
System.out.println("Successfully sent message: " + result.getMessageId());
139
// Process success...
140
}
141
142
@Override
143
public void onError(Exception exception) {
144
System.err.println("Failed to send message: " + exception.getMessage());
145
// Handle error...
146
}
147
};
148
149
// Send with callback
150
asyncClient.sendMessageAsync(new SendMessageRequest(queueUrl, "Callback message"), handler);
151
152
// Lambda-based handlers (Java 8+)
153
asyncClient.receiveMessageAsync(
154
new ReceiveMessageRequest(queueUrl),
155
new AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult>() {
156
@Override
157
public void onSuccess(ReceiveMessageRequest request, ReceiveMessageResult result) {
158
result.getMessages().forEach(message -> {
159
System.out.println("Received: " + message.getBody());
160
161
// Process message asynchronously
162
asyncClient.deleteMessageAsync(
163
new DeleteMessageRequest(queueUrl, message.getReceiptHandle()),
164
new AsyncHandler<DeleteMessageRequest, DeleteMessageResult>() {
165
@Override
166
public void onSuccess(DeleteMessageRequest req, DeleteMessageResult res) {
167
System.out.println("Deleted message: " + message.getMessageId());
168
}
169
170
@Override
171
public void onError(Exception exception) {
172
System.err.println("Delete failed: " + exception.getMessage());
173
}
174
}
175
);
176
});
177
}
178
179
@Override
180
public void onError(Exception exception) {
181
System.err.println("Receive failed: " + exception.getMessage());
182
}
183
}
184
);
185
```
186
187
## Concurrent Processing Patterns
188
189
### Producer-Consumer Pattern
190
191
Implement high-throughput producer-consumer patterns with async operations.
192
193
```java
194
public class AsyncProducerConsumer {
195
private final AmazonSQSAsync asyncClient;
196
private final String queueUrl;
197
private final ExecutorService executorService;
198
199
public AsyncProducerConsumer(AmazonSQSAsync asyncClient, String queueUrl) {
200
this.asyncClient = asyncClient;
201
this.queueUrl = queueUrl;
202
this.executorService = Executors.newFixedThreadPool(10);
203
}
204
205
// High-throughput producer
206
public void startProducer(BlockingQueue<String> messageQueue) {
207
executorService.submit(() -> {
208
while (!Thread.currentThread().isInterrupted()) {
209
try {
210
String messageBody = messageQueue.take();
211
212
asyncClient.sendMessageAsync(
213
new SendMessageRequest(queueUrl, messageBody),
214
new AsyncHandler<SendMessageRequest, SendMessageResult>() {
215
@Override
216
public void onSuccess(SendMessageRequest request, SendMessageResult result) {
217
System.out.println("Produced: " + result.getMessageId());
218
}
219
220
@Override
221
public void onError(Exception exception) {
222
System.err.println("Production failed: " + exception.getMessage());
223
// Re-queue message for retry
224
messageQueue.offer(messageBody);
225
}
226
}
227
);
228
} catch (InterruptedException e) {
229
Thread.currentThread().interrupt();
230
break;
231
}
232
}
233
});
234
}
235
236
// High-throughput consumer
237
public void startConsumer(Consumer<Message> messageProcessor) {
238
executorService.submit(() -> {
239
while (!Thread.currentThread().isInterrupted()) {
240
asyncClient.receiveMessageAsync(
241
new ReceiveMessageRequest(queueUrl)
242
.withMaxNumberOfMessages(10)
243
.withWaitTimeSeconds(20),
244
new AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult>() {
245
@Override
246
public void onSuccess(ReceiveMessageRequest request, ReceiveMessageResult result) {
247
for (Message message : result.getMessages()) {
248
// Process message asynchronously
249
CompletableFuture.runAsync(() -> {
250
try {
251
messageProcessor.accept(message);
252
253
// Delete after successful processing
254
asyncClient.deleteMessageAsync(
255
new DeleteMessageRequest(queueUrl, message.getReceiptHandle())
256
);
257
} catch (Exception e) {
258
System.err.println("Processing failed: " + e.getMessage());
259
}
260
}, executorService);
261
}
262
}
263
264
@Override
265
public void onError(Exception exception) {
266
System.err.println("Receive failed: " + exception.getMessage());
267
}
268
}
269
);
270
271
try {
272
Thread.sleep(1000); // Brief pause between receive operations
273
} catch (InterruptedException e) {
274
Thread.currentThread().interrupt();
275
break;
276
}
277
}
278
});
279
}
280
}
281
```
282
283
### Batch Operations with CompletableFuture
284
285
Combine async operations with CompletableFuture for advanced flow control.
286
287
```java
288
public class AsyncBatchProcessor {
289
private final AmazonSQSAsync asyncClient;
290
private final String queueUrl;
291
292
public AsyncBatchProcessor(AmazonSQSAsync asyncClient, String queueUrl) {
293
this.asyncClient = asyncClient;
294
this.queueUrl = queueUrl;
295
}
296
297
public CompletableFuture<List<String>> sendMessageBatch(List<String> messages) {
298
// Convert to batch entries
299
List<SendMessageBatchRequestEntry> entries = IntStream.range(0, messages.size())
300
.mapToObj(i -> new SendMessageBatchRequestEntry()
301
.withId("msg-" + i)
302
.withMessageBody(messages.get(i)))
303
.collect(Collectors.toList());
304
305
// Create CompletableFuture from AWS Future
306
CompletableFuture<SendMessageBatchResult> batchFuture =
307
CompletableFuture.supplyAsync(() -> {
308
try {
309
return asyncClient.sendMessageBatchAsync(
310
new SendMessageBatchRequest(queueUrl, entries)).get();
311
} catch (Exception e) {
312
throw new RuntimeException(e);
313
}
314
});
315
316
// Transform result to list of message IDs
317
return batchFuture.thenApply(result ->
318
result.getSuccessful().stream()
319
.map(SendMessageBatchResultEntry::getMessageId)
320
.collect(Collectors.toList())
321
);
322
}
323
324
public CompletableFuture<List<Message>> receiveAndProcessBatch(int maxMessages) {
325
CompletableFuture<ReceiveMessageResult> receiveFuture =
326
CompletableFuture.supplyAsync(() -> {
327
try {
328
return asyncClient.receiveMessageAsync(
329
new ReceiveMessageRequest(queueUrl)
330
.withMaxNumberOfMessages(maxMessages)
331
.withWaitTimeSeconds(20)
332
).get();
333
} catch (Exception e) {
334
throw new RuntimeException(e);
335
}
336
});
337
338
return receiveFuture.thenCompose(result -> {
339
List<CompletableFuture<Message>> processingFutures =
340
result.getMessages().stream()
341
.map(this::processMessageAsync)
342
.collect(Collectors.toList());
343
344
return CompletableFuture.allOf(
345
processingFutures.toArray(new CompletableFuture[0])
346
).thenApply(v ->
347
processingFutures.stream()
348
.map(CompletableFuture::join)
349
.collect(Collectors.toList())
350
);
351
});
352
}
353
354
private CompletableFuture<Message> processMessageAsync(Message message) {
355
return CompletableFuture.supplyAsync(() -> {
356
// Simulate processing
357
try {
358
Thread.sleep(100);
359
360
// Delete message after processing
361
asyncClient.deleteMessageAsync(
362
new DeleteMessageRequest(queueUrl, message.getReceiptHandle())
363
);
364
365
return message;
366
} catch (Exception e) {
367
throw new RuntimeException("Processing failed", e);
368
}
369
});
370
}
371
}
372
```
373
374
## Error Handling in Async Operations
375
376
### Exception Handling Patterns
377
378
Handle exceptions in both Future-based and callback-based async operations.
379
380
```java
381
// Future-based error handling
382
Future<SendMessageResult> future = asyncClient.sendMessageAsync(request);
383
384
try {
385
SendMessageResult result = future.get(30, TimeUnit.SECONDS);
386
// Handle success
387
} catch (TimeoutException e) {
388
System.err.println("Operation timed out");
389
future.cancel(true);
390
} catch (ExecutionException e) {
391
Throwable cause = e.getCause();
392
if (cause instanceof AmazonSQSException) {
393
AmazonSQSException sqsException = (AmazonSQSException) cause;
394
System.err.println("SQS Error: " + sqsException.getErrorCode());
395
} else {
396
System.err.println("Unexpected error: " + cause.getMessage());
397
}
398
} catch (InterruptedException e) {
399
Thread.currentThread().interrupt();
400
System.err.println("Operation interrupted");
401
}
402
403
// Callback-based error handling with retry logic
404
public class RetryingAsyncHandler<REQUEST extends AmazonWebServiceRequest, RESULT>
405
implements AsyncHandler<REQUEST, RESULT> {
406
407
private final int maxRetries;
408
private final Function<REQUEST, Future<RESULT>> retryFunction;
409
private int attempt = 0;
410
411
public RetryingAsyncHandler(int maxRetries, Function<REQUEST, Future<RESULT>> retryFunction) {
412
this.maxRetries = maxRetries;
413
this.retryFunction = retryFunction;
414
}
415
416
@Override
417
public void onSuccess(REQUEST request, RESULT result) {
418
System.out.println("Operation succeeded on attempt " + (attempt + 1));
419
// Handle success
420
}
421
422
@Override
423
public void onError(Exception exception) {
424
attempt++;
425
426
if (attempt <= maxRetries && isRetryableException(exception)) {
427
System.out.println("Retrying operation, attempt " + attempt);
428
429
// Exponential backoff
430
int delay = (int) Math.pow(2, attempt - 1) * 1000;
431
432
CompletableFuture.delayedExecutor(delay, TimeUnit.MILLISECONDS)
433
.execute(() -> {
434
// This would need the original request, which requires capturing it
435
// In practice, you'd need to design this differently
436
});
437
} else {
438
System.err.println("Operation failed after " + attempt + " attempts: " +
439
exception.getMessage());
440
// Handle permanent failure
441
}
442
}
443
444
private boolean isRetryableException(Exception exception) {
445
return exception instanceof RequestThrottledException ||
446
exception instanceof AmazonClientException;
447
}
448
}
449
```
450
451
## Performance Considerations
452
453
### Thread Pool Management
454
455
Configure thread pools appropriately for async operations.
456
457
```java
458
// Custom thread pool configuration
459
ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(
460
10, // Core pool size
461
50, // Maximum pool size
462
60L, TimeUnit.SECONDS, // Keep alive time
463
new LinkedBlockingQueue<>(1000), // Work queue
464
new ThreadFactoryBuilder()
465
.setNameFormat("sqs-async-%d")
466
.setDaemon(true)
467
.build()
468
);
469
470
AmazonSQSAsync asyncClient = AmazonSQSAsyncClientBuilder.standard()
471
.withExecutorFactory(() -> customExecutor)
472
.build();
473
474
// Monitor thread pool
475
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
476
monitor.scheduleAtFixedRate(() -> {
477
System.out.println("Active threads: " + customExecutor.getActiveCount());
478
System.out.println("Queue size: " + customExecutor.getQueue().size());
479
}, 0, 30, TimeUnit.SECONDS);
480
```