or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdbuffered-client.mdclient-management.mddead-letter-queues.mdindex.mdmessage-operations.mdmessage-visibility.mdqueue-operations.mdqueue-permissions.mdqueue-tagging.md

buffered-client.mddocs/

0

# Buffered Client

1

2

The Amazon SQS Buffered Async Client provides automatic batching and prefetching capabilities to significantly improve throughput and reduce costs. It optimizes send, delete, and change visibility operations through client-side batching, while implementing intelligent message prefetching for receive operations.

3

4

## Buffered Client Overview

5

6

### AmazonSQSBufferedAsyncClient

7

8

High-performance client implementation with automatic optimization features.

9

10

```java { .api }

11

class AmazonSQSBufferedAsyncClient implements AmazonSQSAsync {

12

// Inherits all AmazonSQSAsync methods with optimized implementations

13

14

// Standard constructors

15

AmazonSQSBufferedAsyncClient();

16

AmazonSQSBufferedAsyncClient(AmazonSQSAsync realSQS);

17

AmazonSQSBufferedAsyncClient(AmazonSQSAsync realSQS, QueueBufferConfig config);

18

19

// Configuration-based constructors

20

AmazonSQSBufferedAsyncClient(AWSCredentialsProvider credentialsProvider);

21

AmazonSQSBufferedAsyncClient(AWSCredentialsProvider credentialsProvider,

22

ClientConfiguration clientConfiguration, ExecutorService executorService,

23

QueueBufferConfig queueBufferConfig);

24

}

25

```

26

27

## Configuration

28

29

### QueueBufferConfig

30

31

Configuration class for customizing buffered client behavior.

32

33

```java { .api }

34

class QueueBufferConfig {

35

// Default configuration

36

QueueBufferConfig();

37

38

// Batch configuration

39

int getMaxBatchSize();

40

QueueBufferConfig withMaxBatchSize(int maxBatchSize);

41

42

// Concurrency configuration

43

int getMaxInflightOutboundBatches();

44

QueueBufferConfig withMaxInflightOutboundBatches(int maxInflightOutboundBatches);

45

46

int getMaxInflightReceiveBatches();

47

QueueBufferConfig withMaxInflightReceiveBatches(int maxInflightReceiveBatches);

48

49

// Timing configuration

50

long getMaxBatchOpenMs();

51

QueueBufferConfig withMaxBatchOpenMs(long maxBatchOpenMs);

52

53

int getVisibilityTimeoutSeconds();

54

QueueBufferConfig withVisibilityTimeoutSeconds(int visibilityTimeoutSeconds);

55

56

// Polling configuration

57

boolean isLongPoll();

58

QueueBufferConfig withLongPoll(boolean longPoll);

59

60

int getMaxDoneReceiveBatches();

61

QueueBufferConfig withMaxDoneReceiveBatches(int maxDoneReceiveBatches);

62

}

63

```

64

65

**Usage Example:**

66

67

```java

68

// Create buffered client with default configuration

69

AmazonSQSBufferedAsyncClient bufferedClient = new AmazonSQSBufferedAsyncClient();

70

71

// Create buffered client with custom configuration

72

QueueBufferConfig config = new QueueBufferConfig()

73

.withMaxBatchSize(25) // Larger batches

74

.withMaxInflightOutboundBatches(10) // More concurrent batches

75

.withMaxBatchOpenMs(5000) // 5 second batch timeout

76

.withVisibilityTimeoutSeconds(120) // 2 minute visibility

77

.withLongPoll(true) // Enable long polling

78

.withMaxDoneReceiveBatches(50); // Larger receive buffer

79

80

AmazonSQSBufferedAsyncClient customBufferedClient =

81

new AmazonSQSBufferedAsyncClient(

82

AmazonSQSAsyncClientBuilder.defaultClient(),

83

config

84

);

85

```

86

87

## Batching Behavior

88

89

### Send Message Batching

90

91

Automatic batching of individual send operations into batch requests.

92

93

**Usage Example:**

94

95

```java

96

AmazonSQSBufferedAsyncClient bufferedClient = new AmazonSQSBufferedAsyncClient();

97

98

// These individual sends will be automatically batched

99

List<Future<SendMessageResult>> futures = new ArrayList<>();

100

101

for (int i = 0; i < 50; i++) {

102

Future<SendMessageResult> future = bufferedClient.sendMessageAsync(

103

new SendMessageRequest()

104

.withQueueUrl(queueUrl)

105

.withMessageBody("Message " + i)

106

);

107

futures.add(future);

108

}

109

110

// Wait for all sends to complete (they were sent in batches behind the scenes)

111

for (Future<SendMessageResult> future : futures) {

112

try {

113

SendMessageResult result = future.get();

114

System.out.println("Sent message: " + result.getMessageId());

115

} catch (ExecutionException e) {

116

System.err.println("Send failed: " + e.getCause().getMessage());

117

}

118

}

119

```

120

121

### Delete Message Batching

122

123

Automatic batching of delete operations for improved efficiency.

124

125

```java

126

// Receive messages

127

ReceiveMessageResult receiveResult = bufferedClient.receiveMessage(

128

new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10)

129

);

130

131

// Delete messages individually - will be batched automatically

132

List<Future<DeleteMessageResult>> deleteFutures = new ArrayList<>();

133

134

for (Message message : receiveResult.getMessages()) {

135

// Process message

136

processMessage(message);

137

138

// Delete (will be batched with other deletes)

139

Future<DeleteMessageResult> deleteFuture = bufferedClient.deleteMessageAsync(

140

new DeleteMessageRequest()

141

.withQueueUrl(queueUrl)

142

.withReceiptHandle(message.getReceiptHandle())

143

);

144

deleteFutures.add(deleteFuture);

145

}

146

147

// Ensure all deletes complete

148

for (Future<DeleteMessageResult> future : deleteFutures) {

149

future.get();

150

}

151

```

152

153

## Message Prefetching

154

155

### Receive Optimization

156

157

Intelligent prefetching to reduce receive operation latency.

158

159

```java

160

public class PrefetchingConsumer {

161

private final AmazonSQSBufferedAsyncClient bufferedClient;

162

private final String queueUrl;

163

164

public PrefetchingConsumer(String queueUrl) {

165

// Configure for aggressive prefetching

166

QueueBufferConfig config = new QueueBufferConfig()

167

.withMaxInflightReceiveBatches(20) // More prefetch requests

168

.withMaxDoneReceiveBatches(100) // Larger buffer

169

.withLongPoll(true) // Long polling

170

.withVisibilityTimeoutSeconds(300); // 5 minute processing time

171

172

this.bufferedClient = new AmazonSQSBufferedAsyncClient(

173

AmazonSQSAsyncClientBuilder.defaultClient(), config);

174

this.queueUrl = queueUrl;

175

}

176

177

public void startConsuming() {

178

ExecutorService executor = Executors.newFixedThreadPool(10);

179

180

for (int i = 0; i < 10; i++) {

181

executor.submit(() -> {

182

while (!Thread.currentThread().isInterrupted()) {

183

try {

184

// This receive will be served from prefetched messages when available

185

ReceiveMessageResult result = bufferedClient.receiveMessage(

186

new ReceiveMessageRequest(queueUrl)

187

.withMaxNumberOfMessages(10)

188

);

189

190

for (Message message : result.getMessages()) {

191

processMessage(message);

192

193

// Delete will be batched

194

bufferedClient.deleteMessage(queueUrl, message.getReceiptHandle());

195

}

196

197

} catch (Exception e) {

198

System.err.println("Consumer error: " + e.getMessage());

199

}

200

}

201

});

202

}

203

}

204

205

private void processMessage(Message message) {

206

// Message processing logic

207

System.out.println("Processing: " + message.getBody());

208

try {

209

Thread.sleep(100); // Simulate processing time

210

} catch (InterruptedException e) {

211

Thread.currentThread().interrupt();

212

}

213

}

214

}

215

```

216

217

## Performance Tuning

218

219

### Configuration Guidelines

220

221

Optimize configuration based on usage patterns and requirements.

222

223

```java

224

public class BufferedClientTuning {

225

226

public static QueueBufferConfig forHighThroughputSend() {

227

return new QueueBufferConfig()

228

.withMaxBatchSize(25) // Max batch size for cost efficiency

229

.withMaxInflightOutboundBatches(50) // High concurrency

230

.withMaxBatchOpenMs(1000) // 1 second batch timeout for low latency

231

.withLongPoll(false); // Not needed for send-heavy workload

232

}

233

234

public static QueueBufferConfig forHighThroughputReceive() {

235

return new QueueBufferConfig()

236

.withMaxInflightReceiveBatches(100) // Aggressive prefetching

237

.withMaxDoneReceiveBatches(200) // Large buffer

238

.withLongPoll(true) // Reduce empty receives

239

.withVisibilityTimeoutSeconds(600); // 10 minutes for processing

240

}

241

242

public static QueueBufferConfig forLowLatency() {

243

return new QueueBufferConfig()

244

.withMaxBatchSize(5) // Smaller batches

245

.withMaxBatchOpenMs(100) // 100ms batch timeout for very low latency

246

.withMaxInflightOutboundBatches(10) // Moderate concurrency

247

.withLongPoll(false); // Immediate response

248

}

249

250

public static QueueBufferConfig forCostOptimization() {

251

return new QueueBufferConfig()

252

.withMaxBatchSize(25) // Maximum batch size

253

.withMaxBatchOpenMs(10000) // 10 second batch timeout to maximize batching

254

.withMaxInflightOutboundBatches(5) // Lower concurrency

255

.withLongPoll(true) // Reduce API calls

256

.withMaxInflightReceiveBatches(5); // Conservative prefetching

257

}

258

}

259

```

260

261

### Monitoring and Metrics

262

263

Track buffered client performance and behavior.

264

265

```java

266

public class BufferedClientMonitor {

267

private final AmazonSQSBufferedAsyncClient bufferedClient;

268

private final ScheduledExecutorService monitor;

269

270

public BufferedClientMonitor(AmazonSQSBufferedAsyncClient bufferedClient) {

271

this.bufferedClient = bufferedClient;

272

this.monitor = Executors.newSingleThreadScheduledExecutor();

273

}

274

275

public void startMonitoring() {

276

monitor.scheduleAtFixedRate(this::logMetrics, 0, 30, TimeUnit.SECONDS);

277

}

278

279

private void logMetrics() {

280

// Note: Actual metrics would require instrumentation

281

// This is conceptual - real implementation would track:

282

283

System.out.println("=== Buffered Client Metrics ===");

284

// - Batch fill ratios

285

// - Average batch size

286

// - Flush timeout occurrences

287

// - Prefetch buffer hit rate

288

// - API call reduction percentage

289

// - Latency improvements

290

System.out.println("Monitoring buffered client performance...");

291

}

292

293

public void shutdown() {

294

monitor.shutdown();

295

}

296

}

297

```

298

299

## Best Practices

300

301

### Optimal Usage Patterns

302

303

Guidelines for maximizing buffered client benefits.

304

305

```java

306

public class BufferedClientBestPractices {

307

308

// DO: Use for high-volume operations

309

public void goodHighVolumePattern() {

310

AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient();

311

312

// Sending many messages - batching provides significant benefits

313

for (int i = 0; i < 1000; i++) {

314

client.sendMessageAsync(new SendMessageRequest(queueUrl, "Message " + i));

315

}

316

}

317

318

// DO: Configure appropriately for workload

319

public void goodConfigurationPattern() {

320

QueueBufferConfig config = new QueueBufferConfig();

321

322

if (isHighThroughputWorkload()) {

323

config.withMaxBatchSize(25)

324

.withMaxInflightOutboundBatches(20);

325

} else if (isLowLatencyWorkload()) {

326

config.withMaxBatchOpenMs(100)

327

.withMaxBatchSize(5);

328

}

329

330

AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient(

331

AmazonSQSAsyncClientBuilder.defaultClient(), config);

332

}

333

334

// DON'T: Use for single operations

335

public void poorSingleOperationPattern() {

336

AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient();

337

338

// Only sending one message - no batching benefit

339

client.sendMessage(new SendMessageRequest(queueUrl, "Single message"));

340

341

// Better to use regular client for single operations

342

AmazonSQS regularClient = AmazonSQSClientBuilder.defaultClient();

343

regularClient.sendMessage(new SendMessageRequest(queueUrl, "Single message"));

344

}

345

346

// DON'T: Create multiple buffered clients for same queue

347

public void poorMultipleClientPattern() {

348

// Creates separate buffers - reduces batching efficiency

349

AmazonSQSBufferedAsyncClient client1 = new AmazonSQSBufferedAsyncClient();

350

AmazonSQSBufferedAsyncClient client2 = new AmazonSQSBufferedAsyncClient();

351

352

// Better: Share single buffered client

353

AmazonSQSBufferedAsyncClient sharedClient = new AmazonSQSBufferedAsyncClient();

354

// Use sharedClient in multiple threads/components

355

}

356

357

private boolean isHighThroughputWorkload() {

358

return true; // Your logic here

359

}

360

361

private boolean isLowLatencyWorkload() {

362

return false; // Your logic here

363

}

364

}

365

```

366

367

### Error Handling

368

369

Handle buffered client specific considerations.

370

371

```java

372

// Buffered client preserves individual operation results

373

List<Future<SendMessageResult>> futures = new ArrayList<>();

374

375

for (int i = 0; i < 10; i++) {

376

futures.add(bufferedClient.sendMessageAsync(

377

new SendMessageRequest(queueUrl, "Message " + i)));

378

}

379

380

// Each future represents individual operation result, even though batched

381

for (int i = 0; i < futures.size(); i++) {

382

try {

383

SendMessageResult result = futures.get(i).get();

384

System.out.println("Message " + i + " sent: " + result.getMessageId());

385

} catch (ExecutionException e) {

386

System.err.println("Message " + i + " failed: " + e.getCause().getMessage());

387

// Individual message failure doesn't affect others in batch

388

}

389

}

390

```

391

392

## Migration from Standard Client

393

394

### Converting Existing Code

395

396

Straightforward migration from standard to buffered client.

397

398

```java

399

// Before: Standard client

400

AmazonSQS standardClient = AmazonSQSClientBuilder.defaultClient();

401

402

// After: Buffered client (drop-in replacement)

403

AmazonSQSBufferedAsyncClient bufferedClient = new AmazonSQSBufferedAsyncClient();

404

405

// All existing code works unchanged

406

SendMessageResult result = bufferedClient.sendMessage(

407

new SendMessageRequest(queueUrl, "message"));

408

409

// Async variants also available

410

Future<SendMessageResult> future = bufferedClient.sendMessageAsync(

411

new SendMessageRequest(queueUrl, "async message"));

412

```

413

414

### Gradual Migration Strategy

415

416

Approach for safely migrating production systems.

417

418

```java

419

public class GradualMigration {

420

private final AmazonSQS standardClient;

421

private final AmazonSQSBufferedAsyncClient bufferedClient;

422

private final double bufferedClientRatio = 0.1; // Start with 10%

423

424

public GradualMigration() {

425

this.standardClient = AmazonSQSClientBuilder.defaultClient();

426

this.bufferedClient = new AmazonSQSBufferedAsyncClient();

427

}

428

429

public SendMessageResult sendMessage(SendMessageRequest request) {

430

if (Math.random() < bufferedClientRatio) {

431

// Use buffered client for percentage of traffic

432

return bufferedClient.sendMessage(request);

433

} else {

434

// Use standard client for majority of traffic

435

return standardClient.sendMessage(request);

436

}

437

}

438

}

439

```