or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdbuffered-client.mdclient-management.mddead-letter-queues.mdindex.mdmessage-operations.mdmessage-visibility.mdqueue-operations.mdqueue-permissions.mdqueue-tagging.md

async-operations.mddocs/

0

# Asynchronous Operations

1

2

Asynchronous operations provide non-blocking execution of SQS operations using Future-based return types and optional callback handlers. This enables high-performance applications to efficiently manage multiple concurrent SQS operations without blocking threads.

3

4

## Async Client Interface

5

6

### AmazonSQSAsync Interface

7

8

Extended interface providing asynchronous versions of all SQS operations.

9

10

```java { .api }

11

interface AmazonSQSAsync extends AmazonSQS {

12

// Every synchronous operation has async equivalents

13

Future<CreateQueueResult> createQueueAsync(CreateQueueRequest request);

14

Future<CreateQueueResult> createQueueAsync(CreateQueueRequest request,

15

AsyncHandler<CreateQueueRequest, CreateQueueResult> asyncHandler);

16

17

Future<SendMessageResult> sendMessageAsync(SendMessageRequest request);

18

Future<SendMessageResult> sendMessageAsync(SendMessageRequest request,

19

AsyncHandler<SendMessageRequest, SendMessageResult> asyncHandler);

20

21

Future<SendMessageBatchResult> sendMessageBatchAsync(SendMessageBatchRequest request);

22

Future<SendMessageBatchResult> sendMessageBatchAsync(SendMessageBatchRequest request,

23

AsyncHandler<SendMessageBatchRequest, SendMessageBatchResult> asyncHandler);

24

25

Future<ReceiveMessageResult> receiveMessageAsync(ReceiveMessageRequest request);

26

Future<ReceiveMessageResult> receiveMessageAsync(ReceiveMessageRequest request,

27

AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult> asyncHandler);

28

29

Future<DeleteMessageResult> deleteMessageAsync(DeleteMessageRequest request);

30

Future<DeleteMessageResult> deleteMessageAsync(DeleteMessageRequest request,

31

AsyncHandler<DeleteMessageRequest, DeleteMessageResult> asyncHandler);

32

33

Future<DeleteMessageBatchResult> deleteMessageBatchAsync(DeleteMessageBatchRequest request);

34

Future<DeleteMessageBatchResult> deleteMessageBatchAsync(DeleteMessageBatchRequest request,

35

AsyncHandler<DeleteMessageBatchRequest, DeleteMessageBatchResult> asyncHandler);

36

37

// All other operations follow the same pattern...

38

}

39

40

// Callback interface for async operations

41

interface AsyncHandler<REQUEST extends AmazonWebServiceRequest, RESULT> {

42

void onError(Exception exception);

43

void onSuccess(REQUEST request, RESULT result);

44

}

45

```

46

47

## Async Client Creation

48

49

### Build Async Clients

50

51

Create asynchronous SQS clients with custom configuration.

52

53

```java { .api }

54

class AmazonSQSAsyncClientBuilder extends AwsAsyncClientBuilder<AmazonSQSAsyncClientBuilder, AmazonSQSAsync> {

55

static AmazonSQSAsyncClientBuilder standard();

56

static AmazonSQSAsync defaultClient();

57

AmazonSQSAsync build();

58

}

59

```

60

61

**Usage Example:**

62

63

```java

64

// Default async client

65

AmazonSQSAsync asyncClient = AmazonSQSAsyncClientBuilder.defaultClient();

66

67

// Custom async client with thread pool

68

ExecutorService executor = Executors.newFixedThreadPool(20);

69

70

AmazonSQSAsync customAsyncClient = AmazonSQSAsyncClientBuilder.standard()

71

.withRegion(Regions.US_WEST_2)

72

.withExecutorFactory(() -> executor)

73

.withCredentials(new ProfileCredentialsProvider())

74

.build();

75

```

76

77

## Future-Based Operations

78

79

### Working with Futures

80

81

Use Future objects to manage asynchronous operation completion.

82

83

**Usage Example:**

84

85

```java

86

// Send message asynchronously

87

Future<SendMessageResult> sendFuture = asyncClient.sendMessageAsync(

88

new SendMessageRequest(queueUrl, "Async message"));

89

90

// Do other work while message is being sent

91

performOtherWork();

92

93

try {

94

// Wait for completion and get result

95

SendMessageResult result = sendFuture.get(30, TimeUnit.SECONDS);

96

System.out.println("Message sent: " + result.getMessageId());

97

} catch (TimeoutException e) {

98

System.err.println("Send operation timed out");

99

sendFuture.cancel(true);

100

} catch (ExecutionException e) {

101

System.err.println("Send failed: " + e.getCause().getMessage());

102

}

103

104

// Multiple concurrent operations

105

List<Future<SendMessageResult>> sendFutures = new ArrayList<>();

106

107

for (int i = 0; i < 10; i++) {

108

Future<SendMessageResult> future = asyncClient.sendMessageAsync(

109

new SendMessageRequest(queueUrl, "Message " + i));

110

sendFutures.add(future);

111

}

112

113

// Wait for all to complete

114

for (Future<SendMessageResult> future : sendFutures) {

115

try {

116

SendMessageResult result = future.get();

117

System.out.println("Sent: " + result.getMessageId());

118

} catch (ExecutionException e) {

119

System.err.println("Send failed: " + e.getCause().getMessage());

120

}

121

}

122

```

123

124

## Callback-Based Operations

125

126

### AsyncHandler Implementation

127

128

Use callbacks for reactive processing of async operation results.

129

130

**Usage Example:**

131

132

```java

133

// Create async handler

134

AsyncHandler<SendMessageRequest, SendMessageResult> handler =

135

new AsyncHandler<SendMessageRequest, SendMessageResult>() {

136

@Override

137

public void onSuccess(SendMessageRequest request, SendMessageResult result) {

138

System.out.println("Successfully sent message: " + result.getMessageId());

139

// Process success...

140

}

141

142

@Override

143

public void onError(Exception exception) {

144

System.err.println("Failed to send message: " + exception.getMessage());

145

// Handle error...

146

}

147

};

148

149

// Send with callback

150

asyncClient.sendMessageAsync(new SendMessageRequest(queueUrl, "Callback message"), handler);

151

152

// Lambda-based handlers (Java 8+)

153

asyncClient.receiveMessageAsync(

154

new ReceiveMessageRequest(queueUrl),

155

new AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult>() {

156

@Override

157

public void onSuccess(ReceiveMessageRequest request, ReceiveMessageResult result) {

158

result.getMessages().forEach(message -> {

159

System.out.println("Received: " + message.getBody());

160

161

// Process message asynchronously

162

asyncClient.deleteMessageAsync(

163

new DeleteMessageRequest(queueUrl, message.getReceiptHandle()),

164

new AsyncHandler<DeleteMessageRequest, DeleteMessageResult>() {

165

@Override

166

public void onSuccess(DeleteMessageRequest req, DeleteMessageResult res) {

167

System.out.println("Deleted message: " + message.getMessageId());

168

}

169

170

@Override

171

public void onError(Exception exception) {

172

System.err.println("Delete failed: " + exception.getMessage());

173

}

174

}

175

);

176

});

177

}

178

179

@Override

180

public void onError(Exception exception) {

181

System.err.println("Receive failed: " + exception.getMessage());

182

}

183

}

184

);

185

```

186

187

## Concurrent Processing Patterns

188

189

### Producer-Consumer Pattern

190

191

Implement high-throughput producer-consumer patterns with async operations.

192

193

```java

194

public class AsyncProducerConsumer {

195

private final AmazonSQSAsync asyncClient;

196

private final String queueUrl;

197

private final ExecutorService executorService;

198

199

public AsyncProducerConsumer(AmazonSQSAsync asyncClient, String queueUrl) {

200

this.asyncClient = asyncClient;

201

this.queueUrl = queueUrl;

202

this.executorService = Executors.newFixedThreadPool(10);

203

}

204

205

// High-throughput producer

206

public void startProducer(BlockingQueue<String> messageQueue) {

207

executorService.submit(() -> {

208

while (!Thread.currentThread().isInterrupted()) {

209

try {

210

String messageBody = messageQueue.take();

211

212

asyncClient.sendMessageAsync(

213

new SendMessageRequest(queueUrl, messageBody),

214

new AsyncHandler<SendMessageRequest, SendMessageResult>() {

215

@Override

216

public void onSuccess(SendMessageRequest request, SendMessageResult result) {

217

System.out.println("Produced: " + result.getMessageId());

218

}

219

220

@Override

221

public void onError(Exception exception) {

222

System.err.println("Production failed: " + exception.getMessage());

223

// Re-queue message for retry

224

messageQueue.offer(messageBody);

225

}

226

}

227

);

228

} catch (InterruptedException e) {

229

Thread.currentThread().interrupt();

230

break;

231

}

232

}

233

});

234

}

235

236

// High-throughput consumer

237

public void startConsumer(Consumer<Message> messageProcessor) {

238

executorService.submit(() -> {

239

while (!Thread.currentThread().isInterrupted()) {

240

asyncClient.receiveMessageAsync(

241

new ReceiveMessageRequest(queueUrl)

242

.withMaxNumberOfMessages(10)

243

.withWaitTimeSeconds(20),

244

new AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult>() {

245

@Override

246

public void onSuccess(ReceiveMessageRequest request, ReceiveMessageResult result) {

247

for (Message message : result.getMessages()) {

248

// Process message asynchronously

249

CompletableFuture.runAsync(() -> {

250

try {

251

messageProcessor.accept(message);

252

253

// Delete after successful processing

254

asyncClient.deleteMessageAsync(

255

new DeleteMessageRequest(queueUrl, message.getReceiptHandle())

256

);

257

} catch (Exception e) {

258

System.err.println("Processing failed: " + e.getMessage());

259

}

260

}, executorService);

261

}

262

}

263

264

@Override

265

public void onError(Exception exception) {

266

System.err.println("Receive failed: " + exception.getMessage());

267

}

268

}

269

);

270

271

try {

272

Thread.sleep(1000); // Brief pause between receive operations

273

} catch (InterruptedException e) {

274

Thread.currentThread().interrupt();

275

break;

276

}

277

}

278

});

279

}

280

}

281

```

282

283

### Batch Operations with CompletableFuture

284

285

Combine async operations with CompletableFuture for advanced flow control.

286

287

```java

288

public class AsyncBatchProcessor {

289

private final AmazonSQSAsync asyncClient;

290

private final String queueUrl;

291

292

public AsyncBatchProcessor(AmazonSQSAsync asyncClient, String queueUrl) {

293

this.asyncClient = asyncClient;

294

this.queueUrl = queueUrl;

295

}

296

297

public CompletableFuture<List<String>> sendMessageBatch(List<String> messages) {

298

// Convert to batch entries

299

List<SendMessageBatchRequestEntry> entries = IntStream.range(0, messages.size())

300

.mapToObj(i -> new SendMessageBatchRequestEntry()

301

.withId("msg-" + i)

302

.withMessageBody(messages.get(i)))

303

.collect(Collectors.toList());

304

305

// Create CompletableFuture from AWS Future

306

CompletableFuture<SendMessageBatchResult> batchFuture =

307

CompletableFuture.supplyAsync(() -> {

308

try {

309

return asyncClient.sendMessageBatchAsync(

310

new SendMessageBatchRequest(queueUrl, entries)).get();

311

} catch (Exception e) {

312

throw new RuntimeException(e);

313

}

314

});

315

316

// Transform result to list of message IDs

317

return batchFuture.thenApply(result ->

318

result.getSuccessful().stream()

319

.map(SendMessageBatchResultEntry::getMessageId)

320

.collect(Collectors.toList())

321

);

322

}

323

324

public CompletableFuture<List<Message>> receiveAndProcessBatch(int maxMessages) {

325

CompletableFuture<ReceiveMessageResult> receiveFuture =

326

CompletableFuture.supplyAsync(() -> {

327

try {

328

return asyncClient.receiveMessageAsync(

329

new ReceiveMessageRequest(queueUrl)

330

.withMaxNumberOfMessages(maxMessages)

331

.withWaitTimeSeconds(20)

332

).get();

333

} catch (Exception e) {

334

throw new RuntimeException(e);

335

}

336

});

337

338

return receiveFuture.thenCompose(result -> {

339

List<CompletableFuture<Message>> processingFutures =

340

result.getMessages().stream()

341

.map(this::processMessageAsync)

342

.collect(Collectors.toList());

343

344

return CompletableFuture.allOf(

345

processingFutures.toArray(new CompletableFuture[0])

346

).thenApply(v ->

347

processingFutures.stream()

348

.map(CompletableFuture::join)

349

.collect(Collectors.toList())

350

);

351

});

352

}

353

354

private CompletableFuture<Message> processMessageAsync(Message message) {

355

return CompletableFuture.supplyAsync(() -> {

356

// Simulate processing

357

try {

358

Thread.sleep(100);

359

360

// Delete message after processing

361

asyncClient.deleteMessageAsync(

362

new DeleteMessageRequest(queueUrl, message.getReceiptHandle())

363

);

364

365

return message;

366

} catch (Exception e) {

367

throw new RuntimeException("Processing failed", e);

368

}

369

});

370

}

371

}

372

```

373

374

## Error Handling in Async Operations

375

376

### Exception Handling Patterns

377

378

Handle exceptions in both Future-based and callback-based async operations.

379

380

```java

381

// Future-based error handling

382

Future<SendMessageResult> future = asyncClient.sendMessageAsync(request);

383

384

try {

385

SendMessageResult result = future.get(30, TimeUnit.SECONDS);

386

// Handle success

387

} catch (TimeoutException e) {

388

System.err.println("Operation timed out");

389

future.cancel(true);

390

} catch (ExecutionException e) {

391

Throwable cause = e.getCause();

392

if (cause instanceof AmazonSQSException) {

393

AmazonSQSException sqsException = (AmazonSQSException) cause;

394

System.err.println("SQS Error: " + sqsException.getErrorCode());

395

} else {

396

System.err.println("Unexpected error: " + cause.getMessage());

397

}

398

} catch (InterruptedException e) {

399

Thread.currentThread().interrupt();

400

System.err.println("Operation interrupted");

401

}

402

403

// Callback-based error handling with retry logic

404

public class RetryingAsyncHandler<REQUEST extends AmazonWebServiceRequest, RESULT>

405

implements AsyncHandler<REQUEST, RESULT> {

406

407

private final int maxRetries;

408

private final Function<REQUEST, Future<RESULT>> retryFunction;

409

private int attempt = 0;

410

411

public RetryingAsyncHandler(int maxRetries, Function<REQUEST, Future<RESULT>> retryFunction) {

412

this.maxRetries = maxRetries;

413

this.retryFunction = retryFunction;

414

}

415

416

@Override

417

public void onSuccess(REQUEST request, RESULT result) {

418

System.out.println("Operation succeeded on attempt " + (attempt + 1));

419

// Handle success

420

}

421

422

@Override

423

public void onError(Exception exception) {

424

attempt++;

425

426

if (attempt <= maxRetries && isRetryableException(exception)) {

427

System.out.println("Retrying operation, attempt " + attempt);

428

429

// Exponential backoff

430

int delay = (int) Math.pow(2, attempt - 1) * 1000;

431

432

CompletableFuture.delayedExecutor(delay, TimeUnit.MILLISECONDS)

433

.execute(() -> {

434

// This would need the original request, which requires capturing it

435

// In practice, you'd need to design this differently

436

});

437

} else {

438

System.err.println("Operation failed after " + attempt + " attempts: " +

439

exception.getMessage());

440

// Handle permanent failure

441

}

442

}

443

444

private boolean isRetryableException(Exception exception) {

445

return exception instanceof RequestThrottledException ||

446

exception instanceof AmazonClientException;

447

}

448

}

449

```

450

451

## Performance Considerations

452

453

### Thread Pool Management

454

455

Configure thread pools appropriately for async operations.

456

457

```java

458

// Custom thread pool configuration

459

ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(

460

10, // Core pool size

461

50, // Maximum pool size

462

60L, TimeUnit.SECONDS, // Keep alive time

463

new LinkedBlockingQueue<>(1000), // Work queue

464

new ThreadFactoryBuilder()

465

.setNameFormat("sqs-async-%d")

466

.setDaemon(true)

467

.build()

468

);

469

470

AmazonSQSAsync asyncClient = AmazonSQSAsyncClientBuilder.standard()

471

.withExecutorFactory(() -> customExecutor)

472

.build();

473

474

// Monitor thread pool

475

ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();

476

monitor.scheduleAtFixedRate(() -> {

477

System.out.println("Active threads: " + customExecutor.getActiveCount());

478

System.out.println("Queue size: " + customExecutor.getQueue().size());

479

}, 0, 30, TimeUnit.SECONDS);

480

```