0
# Async Sink Framework
1
2
The Async Sink Framework provides a complete solution for building high-performance, fault-tolerant sinks that integrate with asynchronous destination APIs. It handles batching, buffering, rate limiting, retry logic, and state management automatically.
3
4
## Core Components
5
6
### AsyncSinkBase
7
8
The foundation class for all async sink implementations.
9
10
```java { .api }
11
@PublicEvolving
12
public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>
13
implements SupportsWriterState<InputT, BufferedRequestState<RequestEntryT>>, Sink<InputT> {
14
15
protected AsyncSinkBase(
16
ElementConverter<InputT, RequestEntryT> elementConverter,
17
int maxBatchSize,
18
int maxInFlightRequests,
19
int maxBufferedRequests,
20
long maxBatchSizeInBytes,
21
long maxTimeInBufferMS,
22
long maxRecordSizeInBytes,
23
long requestTimeoutMS,
24
boolean failOnTimeout)
25
26
protected ElementConverter<InputT, RequestEntryT> getElementConverter()
27
protected int getMaxBatchSize()
28
protected int getMaxInFlightRequests()
29
protected int getMaxBufferedRequests()
30
protected long getMaxBatchSizeInBytes()
31
protected long getMaxTimeInBufferMS()
32
protected long getMaxRecordSizeInBytes()
33
protected long getRequestTimeoutMS()
34
protected boolean getFailOnTimeout()
35
}
36
```
37
38
### AsyncSinkWriter
39
40
The core writer that handles the async sink logic.
41
42
```java { .api }
43
@PublicEvolving
44
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
45
implements StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
46
47
// Constructor
48
public AsyncSinkWriter(
49
ElementConverter<InputT, RequestEntryT> elementConverter,
50
WriterInitContext context,
51
AsyncSinkWriterConfiguration configuration,
52
Collection<BufferedRequestState<RequestEntryT>> states,
53
BatchCreator<RequestEntryT> batchCreator,
54
RequestBuffer<RequestEntryT> bufferedRequestEntries)
55
56
// Abstract methods to implement
57
protected abstract void submitRequestEntries(
58
List<RequestEntryT> requestEntries,
59
ResultHandler<RequestEntryT> resultHandler)
60
61
protected abstract long getSizeInBytes(RequestEntryT requestEntry)
62
63
// Public interface methods
64
public void write(InputT element, Context context) throws IOException, InterruptedException
65
public void flush(boolean flush) throws InterruptedException
66
public List<BufferedRequestState<RequestEntryT>> snapshotState(long checkpointId)
67
public void close()
68
69
// Protected helper methods
70
protected Consumer<Exception> getFatalExceptionCons()
71
}
72
```
73
74
### ElementConverter
75
76
Transforms stream elements into request entries for the destination.
77
78
```java { .api }
79
@PublicEvolving
80
public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
81
RequestEntryT apply(InputT element, SinkWriter.Context context)
82
83
default void open(WriterInitContext context) {
84
// No-op default implementation
85
}
86
}
87
```
88
89
### ResultHandler
90
91
Handles the results of async requests with support for retries and fatal errors.
92
93
```java { .api }
94
@PublicEvolving
95
public interface ResultHandler<RequestEntryT> {
96
void complete()
97
void completeExceptionally(Exception e)
98
void retryForEntries(List<RequestEntryT> requestEntriesToRetry)
99
}
100
```
101
102
### BatchCreator
103
104
Pluggable interface for controlling how request entries are batched.
105
106
```java { .api }
107
@PublicEvolving
108
public interface BatchCreator<RequestEntryT extends Serializable> {
109
Batch<RequestEntryT> createNextBatch(
110
RequestInfo requestInfo,
111
RequestBuffer<RequestEntryT> bufferedRequestEntries)
112
}
113
```
114
115
### RequestBuffer
116
117
Flexible buffer interface for managing request entries.
118
119
```java { .api }
120
@PublicEvolving
121
public interface RequestBuffer<RequestEntryT extends Serializable> {
122
void add(RequestEntryWrapper<RequestEntryT> entry, boolean insertAtHead)
123
RequestEntryWrapper<RequestEntryT> poll()
124
RequestEntryWrapper<RequestEntryT> peek()
125
boolean isEmpty()
126
int size()
127
Collection<RequestEntryWrapper<RequestEntryT>> getBufferedState()
128
long totalSizeInBytes()
129
}
130
```
131
132
## Implementation Examples
133
134
### Complete Async Sink Implementation
135
136
```java
137
public class HttpAsyncSink extends AsyncSinkBase<JsonNode, HttpRequestEntry> {
138
139
public HttpAsyncSink(String endpoint, int maxBatchSize) {
140
super(
141
new JsonElementConverter(endpoint), // Element converter
142
maxBatchSize, // Max batch size
143
10, // Max in-flight requests
144
maxBatchSize * 10, // Max buffered requests
145
1024 * 1024, // Max batch size in bytes (1MB)
146
5000, // Max time in buffer (5s)
147
256 * 1024, // Max record size (256KB)
148
60000, // Request timeout (60s)
149
false // Don't fail on timeout
150
);
151
}
152
153
@Override
154
public SinkWriter<JsonNode> createWriter(WriterInitContext context) throws IOException {
155
return new HttpAsyncSinkWriter(
156
getElementConverter(),
157
context,
158
createWriterConfiguration(),
159
Collections.emptyList()
160
);
161
}
162
163
private AsyncSinkWriterConfiguration createWriterConfiguration() {
164
return AsyncSinkWriterConfiguration.builder()
165
.setMaxBatchSize(getMaxBatchSize())
166
.setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())
167
.setMaxInFlightRequests(getMaxInFlightRequests())
168
.setMaxBufferedRequests(getMaxBufferedRequests())
169
.setMaxTimeInBufferMS(getMaxTimeInBufferMS())
170
.setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())
171
.setRequestTimeoutMS(getRequestTimeoutMS())
172
.setFailOnTimeout(getFailOnTimeout())
173
.build();
174
}
175
}
176
177
// Element converter implementation
178
public class JsonElementConverter implements ElementConverter<JsonNode, HttpRequestEntry> {
179
private final String endpoint;
180
181
public JsonElementConverter(String endpoint) {
182
this.endpoint = endpoint;
183
}
184
185
@Override
186
public HttpRequestEntry apply(JsonNode element, SinkWriter.Context context) {
187
return new HttpRequestEntry(
188
endpoint,
189
element.toString(),
190
context.timestamp(),
191
generateRequestId()
192
);
193
}
194
195
private String generateRequestId() {
196
return UUID.randomUUID().toString();
197
}
198
}
199
200
// Request entry implementation
201
public class HttpRequestEntry implements Serializable {
202
private final String endpoint;
203
private final String payload;
204
private final long timestamp;
205
private final String requestId;
206
207
public HttpRequestEntry(String endpoint, String payload, long timestamp, String requestId) {
208
this.endpoint = endpoint;
209
this.payload = payload;
210
this.timestamp = timestamp;
211
this.requestId = requestId;
212
}
213
214
// Getters
215
public String getEndpoint() { return endpoint; }
216
public String getPayload() { return payload; }
217
public long getTimestamp() { return timestamp; }
218
public String getRequestId() { return requestId; }
219
}
220
221
// Async sink writer implementation
222
public class HttpAsyncSinkWriter extends AsyncSinkWriter<JsonNode, HttpRequestEntry> {
223
private final HttpAsyncClient httpClient;
224
private final ObjectMapper objectMapper;
225
226
public HttpAsyncSinkWriter(
227
ElementConverter<JsonNode, HttpRequestEntry> elementConverter,
228
WriterInitContext context,
229
AsyncSinkWriterConfiguration configuration,
230
Collection<BufferedRequestState<HttpRequestEntry>> states) {
231
super(elementConverter, context, configuration, states);
232
this.httpClient = new HttpAsyncClient();
233
this.objectMapper = new ObjectMapper();
234
}
235
236
@Override
237
protected void submitRequestEntries(
238
List<HttpRequestEntry> requestEntries,
239
ResultHandler<HttpRequestEntry> resultHandler) {
240
241
// Create batch request
242
BatchHttpRequest batchRequest = createBatchRequest(requestEntries);
243
244
// Submit asynchronously
245
CompletableFuture<BatchHttpResponse> future = httpClient.submitBatch(batchRequest);
246
247
// Handle response
248
future.whenComplete((response, error) -> {
249
if (error != null) {
250
if (isFatalError(error)) {
251
resultHandler.completeExceptionally(new RuntimeException(
252
"Fatal error in HTTP request", error));
253
} else {
254
// Retry all entries on network error
255
resultHandler.retryForEntries(requestEntries);
256
}
257
} else if (response.hasFailures()) {
258
// Partial failure - retry only failed entries
259
List<HttpRequestEntry> failedEntries = extractFailedEntries(requestEntries, response);
260
resultHandler.retryForEntries(failedEntries);
261
} else {
262
// Complete success
263
resultHandler.complete();
264
}
265
});
266
}
267
268
@Override
269
protected long getSizeInBytes(HttpRequestEntry requestEntry) {
270
// Estimate size including headers and metadata
271
return requestEntry.getPayload().getBytes(StandardCharsets.UTF_8).length +
272
requestEntry.getEndpoint().getBytes(StandardCharsets.UTF_8).length +
273
100; // Approximate header overhead
274
}
275
276
private BatchHttpRequest createBatchRequest(List<HttpRequestEntry> entries) {
277
List<String> payloads = entries.stream()
278
.map(HttpRequestEntry::getPayload)
279
.collect(Collectors.toList());
280
281
return new BatchHttpRequest(
282
entries.get(0).getEndpoint(),
283
payloads,
284
generateBatchId(),
285
System.currentTimeMillis()
286
);
287
}
288
289
private boolean isFatalError(Throwable error) {
290
// Consider authentication, authorization, and configuration errors as fatal
291
return error instanceof AuthenticationException ||
292
error instanceof AuthorizationException ||
293
error instanceof MalformedURLException;
294
}
295
296
private List<HttpRequestEntry> extractFailedEntries(
297
List<HttpRequestEntry> originalEntries,
298
BatchHttpResponse response) {
299
List<HttpRequestEntry> failed = new ArrayList<>();
300
List<Integer> failedIndices = response.getFailedIndices();
301
302
for (Integer index : failedIndices) {
303
if (index < originalEntries.size()) {
304
failed.add(originalEntries.get(index));
305
}
306
}
307
308
return failed;
309
}
310
311
private String generateBatchId() {
312
return UUID.randomUUID().toString();
313
}
314
}
315
```
316
317
### Custom Batch Creator
318
319
```java
320
public class SizeLimitedBatchCreator implements BatchCreator<HttpRequestEntry> {
321
private final long maxBatchSizeInBytes;
322
323
public SizeLimitedBatchCreator(long maxBatchSizeInBytes) {
324
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
325
}
326
327
@Override
328
public Batch<HttpRequestEntry> createNextBatch(
329
RequestInfo requestInfo,
330
RequestBuffer<HttpRequestEntry> bufferedRequestEntries) {
331
332
List<HttpRequestEntry> batch = new ArrayList<>();
333
long totalSize = 0;
334
int maxBatchSize = requestInfo.getBatchSize();
335
336
// Add entries until we hit size or count limits
337
while (!bufferedRequestEntries.isEmpty() &&
338
batch.size() < maxBatchSize) {
339
340
RequestEntryWrapper<HttpRequestEntry> wrapper = bufferedRequestEntries.peek();
341
if (wrapper == null) {
342
break;
343
}
344
345
// Check if adding this entry would exceed size limit
346
if (totalSize + wrapper.getSize() > maxBatchSizeInBytes && !batch.isEmpty()) {
347
break;
348
}
349
350
// Remove from buffer and add to batch
351
bufferedRequestEntries.poll();
352
batch.add(wrapper.getRequestEntry());
353
totalSize += wrapper.getSize();
354
}
355
356
return new Batch<>(batch, totalSize);
357
}
358
}
359
```
360
361
### Advanced Error Handling
362
363
```java
364
public class AdvancedHttpAsyncSinkWriter extends AsyncSinkWriter<JsonNode, HttpRequestEntry> {
365
private final FatalExceptionClassifier fatalExceptionClassifier;
366
367
public AdvancedHttpAsyncSinkWriter(
368
ElementConverter<JsonNode, HttpRequestEntry> elementConverter,
369
WriterInitContext context,
370
AsyncSinkWriterConfiguration configuration,
371
Collection<BufferedRequestState<HttpRequestEntry>> states) {
372
super(elementConverter, context, configuration, states);
373
374
// Create chain of fatal exception classifiers
375
this.fatalExceptionClassifier = FatalExceptionClassifier.createChain(
376
FatalExceptionClassifier.withRootCauseOfType(
377
AuthenticationException.class,
378
cause -> new RuntimeException("Authentication failed", cause)
379
),
380
FatalExceptionClassifier.withRootCauseOfType(
381
IllegalArgumentException.class,
382
cause -> new RuntimeException("Invalid request configuration", cause)
383
)
384
);
385
}
386
387
@Override
388
protected void submitRequestEntries(
389
List<HttpRequestEntry> requestEntries,
390
ResultHandler<HttpRequestEntry> resultHandler) {
391
392
CompletableFuture<BatchHttpResponse> future = httpClient.submitBatch(
393
createBatchRequest(requestEntries));
394
395
future.whenComplete((response, error) -> {
396
if (error != null) {
397
// Use classifier to determine if error is fatal
398
if (fatalExceptionClassifier.isFatal(error, getFatalExceptionCons())) {
399
return; // Fatal exception consumer already called
400
} else {
401
// Retryable error - retry all entries
402
resultHandler.retryForEntries(requestEntries);
403
}
404
} else {
405
handleResponse(requestEntries, response, resultHandler);
406
}
407
});
408
}
409
410
private void handleResponse(
411
List<HttpRequestEntry> requestEntries,
412
BatchHttpResponse response,
413
ResultHandler<HttpRequestEntry> resultHandler) {
414
415
if (response.isSuccess()) {
416
resultHandler.complete();
417
} else if (response.hasPartialFailures()) {
418
List<HttpRequestEntry> failedEntries = new ArrayList<>();
419
420
for (int i = 0; i < requestEntries.size(); i++) {
421
if (response.isFailed(i)) {
422
int statusCode = response.getStatusCode(i);
423
424
// Check if individual failure is retryable
425
if (isRetryableStatusCode(statusCode)) {
426
failedEntries.add(requestEntries.get(i));
427
}
428
// Non-retryable individual failures are dropped (logged elsewhere)
429
}
430
}
431
432
if (!failedEntries.isEmpty()) {
433
resultHandler.retryForEntries(failedEntries);
434
} else {
435
resultHandler.complete();
436
}
437
} else {
438
// Complete failure - retry all if retryable
439
if (isRetryableStatusCode(response.getOverallStatusCode())) {
440
resultHandler.retryForEntries(requestEntries);
441
} else {
442
// Non-retryable failure - complete (entries lost, logged elsewhere)
443
resultHandler.complete();
444
}
445
}
446
}
447
448
private boolean isRetryableStatusCode(int statusCode) {
449
// 5xx server errors and some 4xx errors are retryable
450
return statusCode >= 500 ||
451
statusCode == 408 || // Request Timeout
452
statusCode == 429; // Too Many Requests
453
}
454
}
455
```
456
457
## Configuration Patterns
458
459
### Basic Configuration Builder Pattern
460
461
```java
462
AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()
463
.setMaxBatchSize(100) // Records per batch
464
.setMaxBatchSizeInBytes(1024 * 1024) // Bytes per batch (1MB)
465
.setMaxInFlightRequests(10) // Concurrent requests
466
.setMaxBufferedRequests(1000) // Queue capacity
467
.setMaxTimeInBufferMS(5000) // Max buffering delay (5s)
468
.setMaxRecordSizeInBytes(256 * 1024) // Max record size (256KB)
469
.setRequestTimeoutMS(30000) // Request timeout (30s)
470
.setFailOnTimeout(false) // Retry on timeout
471
.build();
472
```
473
474
### Advanced Configuration with Custom Rate Limiting
475
476
```java
477
// Create AIMD scaling strategy
478
AIMDScalingStrategy scalingStrategy = AIMDScalingStrategy.builder(1000)
479
.setIncreaseRate(5) // Increase by 5 per success
480
.setDecreaseFactor(0.7) // Decrease by 30% on failure
481
.build();
482
483
// Create congestion control rate limiting
484
CongestionControlRateLimitingStrategy rateLimiting =
485
CongestionControlRateLimitingStrategy.builder()
486
.setMaxInFlightRequests(50)
487
.setInitialMaxInFlightMessages(100)
488
.setScalingStrategy(scalingStrategy)
489
.build();
490
491
// Apply to configuration
492
AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()
493
.setMaxBatchSize(200)
494
.setMaxBatchSizeInBytes(2 * 1024 * 1024) // 2MB
495
.setMaxInFlightRequests(50)
496
.setMaxBufferedRequests(2000)
497
.setMaxTimeInBufferMS(3000) // 3s
498
.setMaxRecordSizeInBytes(512 * 1024) // 512KB
499
.setRateLimitingStrategy(rateLimiting) // Custom rate limiting
500
.build();
501
```
502
503
## State Management
504
505
The async sink framework provides automatic state management for fault tolerance:
506
507
### BufferedRequestState
508
509
```java { .api }
510
@PublicEvolving
511
public class BufferedRequestState<RequestEntryT extends Serializable> {
512
public BufferedRequestState(List<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries)
513
public BufferedRequestState(RequestBuffer<RequestEntryT> requestBuffer)
514
515
public List<RequestEntryWrapper<RequestEntryT>> getBufferedRequestEntries()
516
public long getStateSize()
517
518
public static <T extends Serializable> BufferedRequestState<T> emptyState()
519
}
520
```
521
522
### State Serializer (Custom Implementation)
523
524
```java
525
public class HttpRequestEntryStateSerializer
526
extends AsyncSinkWriterStateSerializer<HttpRequestEntry> {
527
528
@Override
529
protected void serializeRequestToStream(
530
HttpRequestEntry request,
531
DataOutputStream out) throws IOException {
532
out.writeUTF(request.getEndpoint());
533
out.writeUTF(request.getPayload());
534
out.writeLong(request.getTimestamp());
535
out.writeUTF(request.getRequestId());
536
}
537
538
@Override
539
protected HttpRequestEntry deserializeRequestFromStream(
540
long requestSize,
541
DataInputStream in) throws IOException {
542
String endpoint = in.readUTF();
543
String payload = in.readUTF();
544
long timestamp = in.readLong();
545
String requestId = in.readUTF();
546
547
return new HttpRequestEntry(endpoint, payload, timestamp, requestId);
548
}
549
}
550
```
551
552
## Best Practices
553
554
### Performance Optimization
555
1. **Batch Size Tuning**: Balance between latency and throughput
556
2. **Buffer Management**: Size buffers based on memory constraints and throughput requirements
557
3. **Rate Limiting**: Configure based on destination capacity and network conditions
558
4. **Request Sizing**: Implement efficient `getSizeInBytes()` calculations
559
560
### Error Handling
561
1. **Fatal vs Retryable**: Properly classify exceptions to avoid infinite retries
562
2. **Partial Failures**: Handle individual entry failures in batch responses
563
3. **Timeout Handling**: Configure appropriate timeouts for network conditions
564
4. **Exception Classification**: Use `FatalExceptionClassifier` for sophisticated error handling
565
566
### Resource Management
567
1. **Connection Pooling**: Reuse HTTP connections in async clients
568
2. **Memory Management**: Monitor buffer sizes and implement backpressure
569
3. **Thread Management**: Ensure proper cleanup of async resources
570
4. **Metrics Integration**: Use built-in metrics for monitoring and alerting
571
572
The async sink framework provides a robust foundation for building production-ready sinks with sophisticated features like automatic batching, rate limiting, fault tolerance, and state management.