0
# Buffered Client
1
2
The Amazon SQS Buffered Async Client provides automatic batching and prefetching capabilities to significantly improve throughput and reduce costs. It optimizes send, delete, and change visibility operations through client-side batching, while implementing intelligent message prefetching for receive operations.
3
4
## Buffered Client Overview
5
6
### AmazonSQSBufferedAsyncClient
7
8
High-performance client implementation with automatic optimization features.
9
10
```java { .api }
11
class AmazonSQSBufferedAsyncClient implements AmazonSQSAsync {
12
// Inherits all AmazonSQSAsync methods with optimized implementations
13
14
// Standard constructors
15
AmazonSQSBufferedAsyncClient();
16
AmazonSQSBufferedAsyncClient(AmazonSQSAsync realSQS);
17
AmazonSQSBufferedAsyncClient(AmazonSQSAsync realSQS, QueueBufferConfig config);
18
19
// Configuration-based constructors
20
AmazonSQSBufferedAsyncClient(AWSCredentialsProvider credentialsProvider);
21
AmazonSQSBufferedAsyncClient(AWSCredentialsProvider credentialsProvider,
22
ClientConfiguration clientConfiguration, ExecutorService executorService,
23
QueueBufferConfig queueBufferConfig);
24
}
25
```
26
27
## Configuration
28
29
### QueueBufferConfig
30
31
Configuration class for customizing buffered client behavior.
32
33
```java { .api }
34
class QueueBufferConfig {
35
// Default configuration
36
QueueBufferConfig();
37
38
// Batch configuration
39
int getMaxBatchSize();
40
QueueBufferConfig withMaxBatchSize(int maxBatchSize);
41
42
// Concurrency configuration
43
int getMaxInflightOutboundBatches();
44
QueueBufferConfig withMaxInflightOutboundBatches(int maxInflightOutboundBatches);
45
46
int getMaxInflightReceiveBatches();
47
QueueBufferConfig withMaxInflightReceiveBatches(int maxInflightReceiveBatches);
48
49
// Timing configuration
50
long getMaxBatchOpenMs();
51
QueueBufferConfig withMaxBatchOpenMs(long maxBatchOpenMs);
52
53
int getVisibilityTimeoutSeconds();
54
QueueBufferConfig withVisibilityTimeoutSeconds(int visibilityTimeoutSeconds);
55
56
// Polling configuration
57
boolean isLongPoll();
58
QueueBufferConfig withLongPoll(boolean longPoll);
59
60
int getMaxDoneReceiveBatches();
61
QueueBufferConfig withMaxDoneReceiveBatches(int maxDoneReceiveBatches);
62
}
63
```
64
65
**Usage Example:**
66
67
```java
68
// Create buffered client with default configuration
69
AmazonSQSBufferedAsyncClient bufferedClient = new AmazonSQSBufferedAsyncClient();
70
71
// Create buffered client with custom configuration
72
QueueBufferConfig config = new QueueBufferConfig()
73
.withMaxBatchSize(25) // Larger batches
74
.withMaxInflightOutboundBatches(10) // More concurrent batches
75
.withMaxBatchOpenMs(5000) // 5 second batch timeout
76
.withVisibilityTimeoutSeconds(120) // 2 minute visibility
77
.withLongPoll(true) // Enable long polling
78
.withMaxDoneReceiveBatches(50); // Larger receive buffer
79
80
AmazonSQSBufferedAsyncClient customBufferedClient =
81
new AmazonSQSBufferedAsyncClient(
82
AmazonSQSAsyncClientBuilder.defaultClient(),
83
config
84
);
85
```
86
87
## Batching Behavior
88
89
### Send Message Batching
90
91
Automatic batching of individual send operations into batch requests.
92
93
**Usage Example:**
94
95
```java
96
AmazonSQSBufferedAsyncClient bufferedClient = new AmazonSQSBufferedAsyncClient();
97
98
// These individual sends will be automatically batched
99
List<Future<SendMessageResult>> futures = new ArrayList<>();
100
101
for (int i = 0; i < 50; i++) {
102
Future<SendMessageResult> future = bufferedClient.sendMessageAsync(
103
new SendMessageRequest()
104
.withQueueUrl(queueUrl)
105
.withMessageBody("Message " + i)
106
);
107
futures.add(future);
108
}
109
110
// Wait for all sends to complete (they were sent in batches behind the scenes)
111
for (Future<SendMessageResult> future : futures) {
112
try {
113
SendMessageResult result = future.get();
114
System.out.println("Sent message: " + result.getMessageId());
115
} catch (ExecutionException e) {
116
System.err.println("Send failed: " + e.getCause().getMessage());
117
}
118
}
119
```
120
121
### Delete Message Batching
122
123
Automatic batching of delete operations for improved efficiency.
124
125
```java
126
// Receive messages
127
ReceiveMessageResult receiveResult = bufferedClient.receiveMessage(
128
new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10)
129
);
130
131
// Delete messages individually - will be batched automatically
132
List<Future<DeleteMessageResult>> deleteFutures = new ArrayList<>();
133
134
for (Message message : receiveResult.getMessages()) {
135
// Process message
136
processMessage(message);
137
138
// Delete (will be batched with other deletes)
139
Future<DeleteMessageResult> deleteFuture = bufferedClient.deleteMessageAsync(
140
new DeleteMessageRequest()
141
.withQueueUrl(queueUrl)
142
.withReceiptHandle(message.getReceiptHandle())
143
);
144
deleteFutures.add(deleteFuture);
145
}
146
147
// Ensure all deletes complete
148
for (Future<DeleteMessageResult> future : deleteFutures) {
149
future.get();
150
}
151
```
152
153
## Message Prefetching
154
155
### Receive Optimization
156
157
Intelligent prefetching to reduce receive operation latency.
158
159
```java
160
public class PrefetchingConsumer {
161
private final AmazonSQSBufferedAsyncClient bufferedClient;
162
private final String queueUrl;
163
164
public PrefetchingConsumer(String queueUrl) {
165
// Configure for aggressive prefetching
166
QueueBufferConfig config = new QueueBufferConfig()
167
.withMaxInflightReceiveBatches(20) // More prefetch requests
168
.withMaxDoneReceiveBatches(100) // Larger buffer
169
.withLongPoll(true) // Long polling
170
.withVisibilityTimeoutSeconds(300); // 5 minute processing time
171
172
this.bufferedClient = new AmazonSQSBufferedAsyncClient(
173
AmazonSQSAsyncClientBuilder.defaultClient(), config);
174
this.queueUrl = queueUrl;
175
}
176
177
public void startConsuming() {
178
ExecutorService executor = Executors.newFixedThreadPool(10);
179
180
for (int i = 0; i < 10; i++) {
181
executor.submit(() -> {
182
while (!Thread.currentThread().isInterrupted()) {
183
try {
184
// This receive will be served from prefetched messages when available
185
ReceiveMessageResult result = bufferedClient.receiveMessage(
186
new ReceiveMessageRequest(queueUrl)
187
.withMaxNumberOfMessages(10)
188
);
189
190
for (Message message : result.getMessages()) {
191
processMessage(message);
192
193
// Delete will be batched
194
bufferedClient.deleteMessage(queueUrl, message.getReceiptHandle());
195
}
196
197
} catch (Exception e) {
198
System.err.println("Consumer error: " + e.getMessage());
199
}
200
}
201
});
202
}
203
}
204
205
private void processMessage(Message message) {
206
// Message processing logic
207
System.out.println("Processing: " + message.getBody());
208
try {
209
Thread.sleep(100); // Simulate processing time
210
} catch (InterruptedException e) {
211
Thread.currentThread().interrupt();
212
}
213
}
214
}
215
```
216
217
## Performance Tuning
218
219
### Configuration Guidelines
220
221
Optimize configuration based on usage patterns and requirements.
222
223
```java
224
public class BufferedClientTuning {
225
226
public static QueueBufferConfig forHighThroughputSend() {
227
return new QueueBufferConfig()
228
.withMaxBatchSize(25) // Max batch size for cost efficiency
229
.withMaxInflightOutboundBatches(50) // High concurrency
230
.withMaxBatchOpenMs(1000) // 1 second batch timeout for low latency
231
.withLongPoll(false); // Not needed for send-heavy workload
232
}
233
234
public static QueueBufferConfig forHighThroughputReceive() {
235
return new QueueBufferConfig()
236
.withMaxInflightReceiveBatches(100) // Aggressive prefetching
237
.withMaxDoneReceiveBatches(200) // Large buffer
238
.withLongPoll(true) // Reduce empty receives
239
.withVisibilityTimeoutSeconds(600); // 10 minutes for processing
240
}
241
242
public static QueueBufferConfig forLowLatency() {
243
return new QueueBufferConfig()
244
.withMaxBatchSize(5) // Smaller batches
245
.withMaxBatchOpenMs(100) // 100ms batch timeout for very low latency
246
.withMaxInflightOutboundBatches(10) // Moderate concurrency
247
.withLongPoll(false); // Immediate response
248
}
249
250
public static QueueBufferConfig forCostOptimization() {
251
return new QueueBufferConfig()
252
.withMaxBatchSize(25) // Maximum batch size
253
.withMaxBatchOpenMs(10000) // 10 second batch timeout to maximize batching
254
.withMaxInflightOutboundBatches(5) // Lower concurrency
255
.withLongPoll(true) // Reduce API calls
256
.withMaxInflightReceiveBatches(5); // Conservative prefetching
257
}
258
}
259
```
260
261
### Monitoring and Metrics
262
263
Track buffered client performance and behavior.
264
265
```java
266
public class BufferedClientMonitor {
267
private final AmazonSQSBufferedAsyncClient bufferedClient;
268
private final ScheduledExecutorService monitor;
269
270
public BufferedClientMonitor(AmazonSQSBufferedAsyncClient bufferedClient) {
271
this.bufferedClient = bufferedClient;
272
this.monitor = Executors.newSingleThreadScheduledExecutor();
273
}
274
275
public void startMonitoring() {
276
monitor.scheduleAtFixedRate(this::logMetrics, 0, 30, TimeUnit.SECONDS);
277
}
278
279
private void logMetrics() {
280
// Note: Actual metrics would require instrumentation
281
// This is conceptual - real implementation would track:
282
283
System.out.println("=== Buffered Client Metrics ===");
284
// - Batch fill ratios
285
// - Average batch size
286
// - Flush timeout occurrences
287
// - Prefetch buffer hit rate
288
// - API call reduction percentage
289
// - Latency improvements
290
System.out.println("Monitoring buffered client performance...");
291
}
292
293
public void shutdown() {
294
monitor.shutdown();
295
}
296
}
297
```
298
299
## Best Practices
300
301
### Optimal Usage Patterns
302
303
Guidelines for maximizing buffered client benefits.
304
305
```java
306
public class BufferedClientBestPractices {
307
308
// DO: Use for high-volume operations
309
public void goodHighVolumePattern() {
310
AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient();
311
312
// Sending many messages - batching provides significant benefits
313
for (int i = 0; i < 1000; i++) {
314
client.sendMessageAsync(new SendMessageRequest(queueUrl, "Message " + i));
315
}
316
}
317
318
// DO: Configure appropriately for workload
319
public void goodConfigurationPattern() {
320
QueueBufferConfig config = new QueueBufferConfig();
321
322
if (isHighThroughputWorkload()) {
323
config.withMaxBatchSize(25)
324
.withMaxInflightOutboundBatches(20);
325
} else if (isLowLatencyWorkload()) {
326
config.withMaxBatchOpenMs(100)
327
.withMaxBatchSize(5);
328
}
329
330
AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient(
331
AmazonSQSAsyncClientBuilder.defaultClient(), config);
332
}
333
334
// DON'T: Use for single operations
335
public void poorSingleOperationPattern() {
336
AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient();
337
338
// Only sending one message - no batching benefit
339
client.sendMessage(new SendMessageRequest(queueUrl, "Single message"));
340
341
// Better to use regular client for single operations
342
AmazonSQS regularClient = AmazonSQSClientBuilder.defaultClient();
343
regularClient.sendMessage(new SendMessageRequest(queueUrl, "Single message"));
344
}
345
346
// DON'T: Create multiple buffered clients for same queue
347
public void poorMultipleClientPattern() {
348
// Creates separate buffers - reduces batching efficiency
349
AmazonSQSBufferedAsyncClient client1 = new AmazonSQSBufferedAsyncClient();
350
AmazonSQSBufferedAsyncClient client2 = new AmazonSQSBufferedAsyncClient();
351
352
// Better: Share single buffered client
353
AmazonSQSBufferedAsyncClient sharedClient = new AmazonSQSBufferedAsyncClient();
354
// Use sharedClient in multiple threads/components
355
}
356
357
private boolean isHighThroughputWorkload() {
358
return true; // Your logic here
359
}
360
361
private boolean isLowLatencyWorkload() {
362
return false; // Your logic here
363
}
364
}
365
```
366
367
### Error Handling
368
369
Handle buffered client specific considerations.
370
371
```java
372
// Buffered client preserves individual operation results
373
List<Future<SendMessageResult>> futures = new ArrayList<>();
374
375
for (int i = 0; i < 10; i++) {
376
futures.add(bufferedClient.sendMessageAsync(
377
new SendMessageRequest(queueUrl, "Message " + i)));
378
}
379
380
// Each future represents individual operation result, even though batched
381
for (int i = 0; i < futures.size(); i++) {
382
try {
383
SendMessageResult result = futures.get(i).get();
384
System.out.println("Message " + i + " sent: " + result.getMessageId());
385
} catch (ExecutionException e) {
386
System.err.println("Message " + i + " failed: " + e.getCause().getMessage());
387
// Individual message failure doesn't affect others in batch
388
}
389
}
390
```
391
392
## Migration from Standard Client
393
394
### Converting Existing Code
395
396
Straightforward migration from standard to buffered client.
397
398
```java
399
// Before: Standard client
400
AmazonSQS standardClient = AmazonSQSClientBuilder.defaultClient();
401
402
// After: Buffered client (drop-in replacement)
403
AmazonSQSBufferedAsyncClient bufferedClient = new AmazonSQSBufferedAsyncClient();
404
405
// All existing code works unchanged
406
SendMessageResult result = bufferedClient.sendMessage(
407
new SendMessageRequest(queueUrl, "message"));
408
409
// Async variants also available
410
Future<SendMessageResult> future = bufferedClient.sendMessageAsync(
411
new SendMessageRequest(queueUrl, "async message"));
412
```
413
414
### Gradual Migration Strategy
415
416
Approach for safely migrating production systems.
417
418
```java
419
public class GradualMigration {
420
private final AmazonSQS standardClient;
421
private final AmazonSQSBufferedAsyncClient bufferedClient;
422
private final double bufferedClientRatio = 0.1; // Start with 10%
423
424
public GradualMigration() {
425
this.standardClient = AmazonSQSClientBuilder.defaultClient();
426
this.bufferedClient = new AmazonSQSBufferedAsyncClient();
427
}
428
429
public SendMessageResult sendMessage(SendMessageRequest request) {
430
if (Math.random() < bufferedClientRatio) {
431
// Use buffered client for percentage of traffic
432
return bufferedClient.sendMessage(request);
433
} else {
434
// Use standard client for majority of traffic
435
return standardClient.sendMessage(request);
436
}
437
}
438
}
439
```