or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-sink.mdhybrid-source.mdindex.mdrate-limiting.mdsource-reader.mdtable-api.md

async-sink.mddocs/

0

# Async Sink Framework

1

2

The Async Sink Framework provides a complete solution for building high-performance, fault-tolerant sinks that integrate with asynchronous destination APIs. It handles batching, buffering, rate limiting, retry logic, and state management automatically.

3

4

## Core Components

5

6

### AsyncSinkBase

7

8

The foundation class for all async sink implementations.

9

10

```java { .api }

11

@PublicEvolving

12

public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>

13

implements SupportsWriterState<InputT, BufferedRequestState<RequestEntryT>>, Sink<InputT> {

14

15

protected AsyncSinkBase(

16

ElementConverter<InputT, RequestEntryT> elementConverter,

17

int maxBatchSize,

18

int maxInFlightRequests,

19

int maxBufferedRequests,

20

long maxBatchSizeInBytes,

21

long maxTimeInBufferMS,

22

long maxRecordSizeInBytes,

23

long requestTimeoutMS,

24

boolean failOnTimeout)

25

26

protected ElementConverter<InputT, RequestEntryT> getElementConverter()

27

protected int getMaxBatchSize()

28

protected int getMaxInFlightRequests()

29

protected int getMaxBufferedRequests()

30

protected long getMaxBatchSizeInBytes()

31

protected long getMaxTimeInBufferMS()

32

protected long getMaxRecordSizeInBytes()

33

protected long getRequestTimeoutMS()

34

protected boolean getFailOnTimeout()

35

}

36

```

37

38

### AsyncSinkWriter

39

40

The core writer that handles the async sink logic.

41

42

```java { .api }

43

@PublicEvolving

44

public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>

45

implements StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {

46

47

// Constructor

48

public AsyncSinkWriter(

49

ElementConverter<InputT, RequestEntryT> elementConverter,

50

WriterInitContext context,

51

AsyncSinkWriterConfiguration configuration,

52

Collection<BufferedRequestState<RequestEntryT>> states,

53

BatchCreator<RequestEntryT> batchCreator,

54

RequestBuffer<RequestEntryT> bufferedRequestEntries)

55

56

// Abstract methods to implement

57

protected abstract void submitRequestEntries(

58

List<RequestEntryT> requestEntries,

59

ResultHandler<RequestEntryT> resultHandler)

60

61

protected abstract long getSizeInBytes(RequestEntryT requestEntry)

62

63

// Public interface methods

64

public void write(InputT element, Context context) throws IOException, InterruptedException

65

public void flush(boolean flush) throws InterruptedException

66

public List<BufferedRequestState<RequestEntryT>> snapshotState(long checkpointId)

67

public void close()

68

69

// Protected helper methods

70

protected Consumer<Exception> getFatalExceptionCons()

71

}

72

```

73

74

### ElementConverter

75

76

Transforms stream elements into request entries for the destination.

77

78

```java { .api }

79

@PublicEvolving

80

public interface ElementConverter<InputT, RequestEntryT> extends Serializable {

81

RequestEntryT apply(InputT element, SinkWriter.Context context)

82

83

default void open(WriterInitContext context) {

84

// No-op default implementation

85

}

86

}

87

```

88

89

### ResultHandler

90

91

Handles the results of async requests with support for retries and fatal errors.

92

93

```java { .api }

94

@PublicEvolving

95

public interface ResultHandler<RequestEntryT> {

96

void complete()

97

void completeExceptionally(Exception e)

98

void retryForEntries(List<RequestEntryT> requestEntriesToRetry)

99

}

100

```

101

102

### BatchCreator

103

104

Pluggable interface for controlling how request entries are batched.

105

106

```java { .api }

107

@PublicEvolving

108

public interface BatchCreator<RequestEntryT extends Serializable> {

109

Batch<RequestEntryT> createNextBatch(

110

RequestInfo requestInfo,

111

RequestBuffer<RequestEntryT> bufferedRequestEntries)

112

}

113

```

114

115

### RequestBuffer

116

117

Flexible buffer interface for managing request entries.

118

119

```java { .api }

120

@PublicEvolving

121

public interface RequestBuffer<RequestEntryT extends Serializable> {

122

void add(RequestEntryWrapper<RequestEntryT> entry, boolean insertAtHead)

123

RequestEntryWrapper<RequestEntryT> poll()

124

RequestEntryWrapper<RequestEntryT> peek()

125

boolean isEmpty()

126

int size()

127

Collection<RequestEntryWrapper<RequestEntryT>> getBufferedState()

128

long totalSizeInBytes()

129

}

130

```

131

132

## Implementation Examples

133

134

### Complete Async Sink Implementation

135

136

```java

137

public class HttpAsyncSink extends AsyncSinkBase<JsonNode, HttpRequestEntry> {

138

139

public HttpAsyncSink(String endpoint, int maxBatchSize) {

140

super(

141

new JsonElementConverter(endpoint), // Element converter

142

maxBatchSize, // Max batch size

143

10, // Max in-flight requests

144

maxBatchSize * 10, // Max buffered requests

145

1024 * 1024, // Max batch size in bytes (1MB)

146

5000, // Max time in buffer (5s)

147

256 * 1024, // Max record size (256KB)

148

60000, // Request timeout (60s)

149

false // Don't fail on timeout

150

);

151

}

152

153

@Override

154

public SinkWriter<JsonNode> createWriter(WriterInitContext context) throws IOException {

155

return new HttpAsyncSinkWriter(

156

getElementConverter(),

157

context,

158

createWriterConfiguration(),

159

Collections.emptyList()

160

);

161

}

162

163

private AsyncSinkWriterConfiguration createWriterConfiguration() {

164

return AsyncSinkWriterConfiguration.builder()

165

.setMaxBatchSize(getMaxBatchSize())

166

.setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())

167

.setMaxInFlightRequests(getMaxInFlightRequests())

168

.setMaxBufferedRequests(getMaxBufferedRequests())

169

.setMaxTimeInBufferMS(getMaxTimeInBufferMS())

170

.setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())

171

.setRequestTimeoutMS(getRequestTimeoutMS())

172

.setFailOnTimeout(getFailOnTimeout())

173

.build();

174

}

175

}

176

177

// Element converter implementation

178

public class JsonElementConverter implements ElementConverter<JsonNode, HttpRequestEntry> {

179

private final String endpoint;

180

181

public JsonElementConverter(String endpoint) {

182

this.endpoint = endpoint;

183

}

184

185

@Override

186

public HttpRequestEntry apply(JsonNode element, SinkWriter.Context context) {

187

return new HttpRequestEntry(

188

endpoint,

189

element.toString(),

190

context.timestamp(),

191

generateRequestId()

192

);

193

}

194

195

private String generateRequestId() {

196

return UUID.randomUUID().toString();

197

}

198

}

199

200

// Request entry implementation

201

public class HttpRequestEntry implements Serializable {

202

private final String endpoint;

203

private final String payload;

204

private final long timestamp;

205

private final String requestId;

206

207

public HttpRequestEntry(String endpoint, String payload, long timestamp, String requestId) {

208

this.endpoint = endpoint;

209

this.payload = payload;

210

this.timestamp = timestamp;

211

this.requestId = requestId;

212

}

213

214

// Getters

215

public String getEndpoint() { return endpoint; }

216

public String getPayload() { return payload; }

217

public long getTimestamp() { return timestamp; }

218

public String getRequestId() { return requestId; }

219

}

220

221

// Async sink writer implementation

222

public class HttpAsyncSinkWriter extends AsyncSinkWriter<JsonNode, HttpRequestEntry> {

223

private final HttpAsyncClient httpClient;

224

private final ObjectMapper objectMapper;

225

226

public HttpAsyncSinkWriter(

227

ElementConverter<JsonNode, HttpRequestEntry> elementConverter,

228

WriterInitContext context,

229

AsyncSinkWriterConfiguration configuration,

230

Collection<BufferedRequestState<HttpRequestEntry>> states) {

231

super(elementConverter, context, configuration, states);

232

this.httpClient = new HttpAsyncClient();

233

this.objectMapper = new ObjectMapper();

234

}

235

236

@Override

237

protected void submitRequestEntries(

238

List<HttpRequestEntry> requestEntries,

239

ResultHandler<HttpRequestEntry> resultHandler) {

240

241

// Create batch request

242

BatchHttpRequest batchRequest = createBatchRequest(requestEntries);

243

244

// Submit asynchronously

245

CompletableFuture<BatchHttpResponse> future = httpClient.submitBatch(batchRequest);

246

247

// Handle response

248

future.whenComplete((response, error) -> {

249

if (error != null) {

250

if (isFatalError(error)) {

251

resultHandler.completeExceptionally(new RuntimeException(

252

"Fatal error in HTTP request", error));

253

} else {

254

// Retry all entries on network error

255

resultHandler.retryForEntries(requestEntries);

256

}

257

} else if (response.hasFailures()) {

258

// Partial failure - retry only failed entries

259

List<HttpRequestEntry> failedEntries = extractFailedEntries(requestEntries, response);

260

resultHandler.retryForEntries(failedEntries);

261

} else {

262

// Complete success

263

resultHandler.complete();

264

}

265

});

266

}

267

268

@Override

269

protected long getSizeInBytes(HttpRequestEntry requestEntry) {

270

// Estimate size including headers and metadata

271

return requestEntry.getPayload().getBytes(StandardCharsets.UTF_8).length +

272

requestEntry.getEndpoint().getBytes(StandardCharsets.UTF_8).length +

273

100; // Approximate header overhead

274

}

275

276

private BatchHttpRequest createBatchRequest(List<HttpRequestEntry> entries) {

277

List<String> payloads = entries.stream()

278

.map(HttpRequestEntry::getPayload)

279

.collect(Collectors.toList());

280

281

return new BatchHttpRequest(

282

entries.get(0).getEndpoint(),

283

payloads,

284

generateBatchId(),

285

System.currentTimeMillis()

286

);

287

}

288

289

private boolean isFatalError(Throwable error) {

290

// Consider authentication, authorization, and configuration errors as fatal

291

return error instanceof AuthenticationException ||

292

error instanceof AuthorizationException ||

293

error instanceof MalformedURLException;

294

}

295

296

private List<HttpRequestEntry> extractFailedEntries(

297

List<HttpRequestEntry> originalEntries,

298

BatchHttpResponse response) {

299

List<HttpRequestEntry> failed = new ArrayList<>();

300

List<Integer> failedIndices = response.getFailedIndices();

301

302

for (Integer index : failedIndices) {

303

if (index < originalEntries.size()) {

304

failed.add(originalEntries.get(index));

305

}

306

}

307

308

return failed;

309

}

310

311

private String generateBatchId() {

312

return UUID.randomUUID().toString();

313

}

314

}

315

```

316

317

### Custom Batch Creator

318

319

```java

320

public class SizeLimitedBatchCreator implements BatchCreator<HttpRequestEntry> {

321

private final long maxBatchSizeInBytes;

322

323

public SizeLimitedBatchCreator(long maxBatchSizeInBytes) {

324

this.maxBatchSizeInBytes = maxBatchSizeInBytes;

325

}

326

327

@Override

328

public Batch<HttpRequestEntry> createNextBatch(

329

RequestInfo requestInfo,

330

RequestBuffer<HttpRequestEntry> bufferedRequestEntries) {

331

332

List<HttpRequestEntry> batch = new ArrayList<>();

333

long totalSize = 0;

334

int maxBatchSize = requestInfo.getBatchSize();

335

336

// Add entries until we hit size or count limits

337

while (!bufferedRequestEntries.isEmpty() &&

338

batch.size() < maxBatchSize) {

339

340

RequestEntryWrapper<HttpRequestEntry> wrapper = bufferedRequestEntries.peek();

341

if (wrapper == null) {

342

break;

343

}

344

345

// Check if adding this entry would exceed size limit

346

if (totalSize + wrapper.getSize() > maxBatchSizeInBytes && !batch.isEmpty()) {

347

break;

348

}

349

350

// Remove from buffer and add to batch

351

bufferedRequestEntries.poll();

352

batch.add(wrapper.getRequestEntry());

353

totalSize += wrapper.getSize();

354

}

355

356

return new Batch<>(batch, totalSize);

357

}

358

}

359

```

360

361

### Advanced Error Handling

362

363

```java

364

public class AdvancedHttpAsyncSinkWriter extends AsyncSinkWriter<JsonNode, HttpRequestEntry> {

365

private final FatalExceptionClassifier fatalExceptionClassifier;

366

367

public AdvancedHttpAsyncSinkWriter(

368

ElementConverter<JsonNode, HttpRequestEntry> elementConverter,

369

WriterInitContext context,

370

AsyncSinkWriterConfiguration configuration,

371

Collection<BufferedRequestState<HttpRequestEntry>> states) {

372

super(elementConverter, context, configuration, states);

373

374

// Create chain of fatal exception classifiers

375

this.fatalExceptionClassifier = FatalExceptionClassifier.createChain(

376

FatalExceptionClassifier.withRootCauseOfType(

377

AuthenticationException.class,

378

cause -> new RuntimeException("Authentication failed", cause)

379

),

380

FatalExceptionClassifier.withRootCauseOfType(

381

IllegalArgumentException.class,

382

cause -> new RuntimeException("Invalid request configuration", cause)

383

)

384

);

385

}

386

387

@Override

388

protected void submitRequestEntries(

389

List<HttpRequestEntry> requestEntries,

390

ResultHandler<HttpRequestEntry> resultHandler) {

391

392

CompletableFuture<BatchHttpResponse> future = httpClient.submitBatch(

393

createBatchRequest(requestEntries));

394

395

future.whenComplete((response, error) -> {

396

if (error != null) {

397

// Use classifier to determine if error is fatal

398

if (fatalExceptionClassifier.isFatal(error, getFatalExceptionCons())) {

399

return; // Fatal exception consumer already called

400

} else {

401

// Retryable error - retry all entries

402

resultHandler.retryForEntries(requestEntries);

403

}

404

} else {

405

handleResponse(requestEntries, response, resultHandler);

406

}

407

});

408

}

409

410

private void handleResponse(

411

List<HttpRequestEntry> requestEntries,

412

BatchHttpResponse response,

413

ResultHandler<HttpRequestEntry> resultHandler) {

414

415

if (response.isSuccess()) {

416

resultHandler.complete();

417

} else if (response.hasPartialFailures()) {

418

List<HttpRequestEntry> failedEntries = new ArrayList<>();

419

420

for (int i = 0; i < requestEntries.size(); i++) {

421

if (response.isFailed(i)) {

422

int statusCode = response.getStatusCode(i);

423

424

// Check if individual failure is retryable

425

if (isRetryableStatusCode(statusCode)) {

426

failedEntries.add(requestEntries.get(i));

427

}

428

// Non-retryable individual failures are dropped (logged elsewhere)

429

}

430

}

431

432

if (!failedEntries.isEmpty()) {

433

resultHandler.retryForEntries(failedEntries);

434

} else {

435

resultHandler.complete();

436

}

437

} else {

438

// Complete failure - retry all if retryable

439

if (isRetryableStatusCode(response.getOverallStatusCode())) {

440

resultHandler.retryForEntries(requestEntries);

441

} else {

442

// Non-retryable failure - complete (entries lost, logged elsewhere)

443

resultHandler.complete();

444

}

445

}

446

}

447

448

private boolean isRetryableStatusCode(int statusCode) {

449

// 5xx server errors and some 4xx errors are retryable

450

return statusCode >= 500 ||

451

statusCode == 408 || // Request Timeout

452

statusCode == 429; // Too Many Requests

453

}

454

}

455

```

456

457

## Configuration Patterns

458

459

### Basic Configuration Builder Pattern

460

461

```java

462

AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()

463

.setMaxBatchSize(100) // Records per batch

464

.setMaxBatchSizeInBytes(1024 * 1024) // Bytes per batch (1MB)

465

.setMaxInFlightRequests(10) // Concurrent requests

466

.setMaxBufferedRequests(1000) // Queue capacity

467

.setMaxTimeInBufferMS(5000) // Max buffering delay (5s)

468

.setMaxRecordSizeInBytes(256 * 1024) // Max record size (256KB)

469

.setRequestTimeoutMS(30000) // Request timeout (30s)

470

.setFailOnTimeout(false) // Retry on timeout

471

.build();

472

```

473

474

### Advanced Configuration with Custom Rate Limiting

475

476

```java

477

// Create AIMD scaling strategy

478

AIMDScalingStrategy scalingStrategy = AIMDScalingStrategy.builder(1000)

479

.setIncreaseRate(5) // Increase by 5 per success

480

.setDecreaseFactor(0.7) // Decrease by 30% on failure

481

.build();

482

483

// Create congestion control rate limiting

484

CongestionControlRateLimitingStrategy rateLimiting =

485

CongestionControlRateLimitingStrategy.builder()

486

.setMaxInFlightRequests(50)

487

.setInitialMaxInFlightMessages(100)

488

.setScalingStrategy(scalingStrategy)

489

.build();

490

491

// Apply to configuration

492

AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()

493

.setMaxBatchSize(200)

494

.setMaxBatchSizeInBytes(2 * 1024 * 1024) // 2MB

495

.setMaxInFlightRequests(50)

496

.setMaxBufferedRequests(2000)

497

.setMaxTimeInBufferMS(3000) // 3s

498

.setMaxRecordSizeInBytes(512 * 1024) // 512KB

499

.setRateLimitingStrategy(rateLimiting) // Custom rate limiting

500

.build();

501

```

502

503

## State Management

504

505

The async sink framework provides automatic state management for fault tolerance:

506

507

### BufferedRequestState

508

509

```java { .api }

510

@PublicEvolving

511

public class BufferedRequestState<RequestEntryT extends Serializable> {

512

public BufferedRequestState(List<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries)

513

public BufferedRequestState(RequestBuffer<RequestEntryT> requestBuffer)

514

515

public List<RequestEntryWrapper<RequestEntryT>> getBufferedRequestEntries()

516

public long getStateSize()

517

518

public static <T extends Serializable> BufferedRequestState<T> emptyState()

519

}

520

```

521

522

### State Serializer (Custom Implementation)

523

524

```java

525

public class HttpRequestEntryStateSerializer

526

extends AsyncSinkWriterStateSerializer<HttpRequestEntry> {

527

528

@Override

529

protected void serializeRequestToStream(

530

HttpRequestEntry request,

531

DataOutputStream out) throws IOException {

532

out.writeUTF(request.getEndpoint());

533

out.writeUTF(request.getPayload());

534

out.writeLong(request.getTimestamp());

535

out.writeUTF(request.getRequestId());

536

}

537

538

@Override

539

protected HttpRequestEntry deserializeRequestFromStream(

540

long requestSize,

541

DataInputStream in) throws IOException {

542

String endpoint = in.readUTF();

543

String payload = in.readUTF();

544

long timestamp = in.readLong();

545

String requestId = in.readUTF();

546

547

return new HttpRequestEntry(endpoint, payload, timestamp, requestId);

548

}

549

}

550

```

551

552

## Best Practices

553

554

### Performance Optimization

555

1. **Batch Size Tuning**: Balance between latency and throughput

556

2. **Buffer Management**: Size buffers based on memory constraints and throughput requirements

557

3. **Rate Limiting**: Configure based on destination capacity and network conditions

558

4. **Request Sizing**: Implement efficient `getSizeInBytes()` calculations

559

560

### Error Handling

561

1. **Fatal vs Retryable**: Properly classify exceptions to avoid infinite retries

562

2. **Partial Failures**: Handle individual entry failures in batch responses

563

3. **Timeout Handling**: Configure appropriate timeouts for network conditions

564

4. **Exception Classification**: Use `FatalExceptionClassifier` for sophisticated error handling

565

566

### Resource Management

567

1. **Connection Pooling**: Reuse HTTP connections in async clients

568

2. **Memory Management**: Monitor buffer sizes and implement backpressure

569

3. **Thread Management**: Ensure proper cleanup of async resources

570

4. **Metrics Integration**: Use built-in metrics for monitoring and alerting

571

572

The async sink framework provides a robust foundation for building production-ready sinks with sophisticated features like automatic batching, rate limiting, fault tolerance, and state management.