or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-connector-base

Base classes and utilities for building Apache Flink connectors with async sink patterns, source readers, and advanced streaming capabilities

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-base@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-base@2.1.0

0

# Apache Flink Connector Base

1

2

The Apache Flink Connector Base library provides foundational classes and utilities for building high-performance, production-ready Apache Flink connectors. It offers sophisticated async sink and source frameworks with built-in features like backpressure handling, rate limiting, checkpointing, and hybrid source switching.

3

4

## Package Information

5

6

**Maven Coordinates:**

7

```xml

8

<dependency>

9

<groupId>org.apache.flink</groupId>

10

<artifactId>flink-connector-base</artifactId>

11

<version>1.18+</version>

12

</dependency>

13

```

14

15

**Package:** `org.apache.flink.connector.base`

16

17

## Core Imports

18

19

### Essential Base Classes

20

```java

21

import org.apache.flink.connector.base.DeliveryGuarantee;

22

import org.apache.flink.connector.base.sink.AsyncSinkBase;

23

import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;

24

import org.apache.flink.connector.base.sink.writer.ElementConverter;

25

import org.apache.flink.connector.base.source.hybrid.HybridSource;

26

import org.apache.flink.connector.base.source.reader.SourceReaderBase;

27

import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;

28

```

29

30

### Configuration and Strategy Classes

31

```java

32

import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;

33

import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;

34

import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;

35

import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;

36

```

37

38

### Core Interfaces

39

```java

40

import org.apache.flink.connector.base.sink.writer.BatchCreator;

41

import org.apache.flink.connector.base.sink.writer.RequestBuffer;

42

import org.apache.flink.connector.base.sink.writer.ResultHandler;

43

import org.apache.flink.connector.base.source.reader.RecordEmitter;

44

import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;

45

```

46

47

## Basic Usage

48

49

### Creating an Async Sink

50

51

```java

52

// Define your request entry type

53

public class MyRequestEntry implements Serializable {

54

private final String data;

55

private final long timestamp;

56

57

public MyRequestEntry(String data, long timestamp) {

58

this.data = data;

59

this.timestamp = timestamp;

60

}

61

62

public String getData() { return data; }

63

public long getTimestamp() { return timestamp; }

64

}

65

66

// Implement ElementConverter

67

public class MyElementConverter implements ElementConverter<String, MyRequestEntry> {

68

@Override

69

public MyRequestEntry apply(String element, SinkWriter.Context context) {

70

return new MyRequestEntry(element, context.timestamp());

71

}

72

}

73

74

// Create AsyncSink implementation

75

public class MyAsyncSink extends AsyncSinkBase<String, MyRequestEntry> {

76

public MyAsyncSink() {

77

super(

78

new MyElementConverter(), // Element converter

79

100, // Max batch size

80

10, // Max in-flight requests

81

1000, // Max buffered requests

82

1024 * 1024, // Max batch size in bytes

83

5000, // Max time in buffer (ms)

84

256 * 1024, // Max record size in bytes

85

60000, // Request timeout (ms)

86

false // Fail on timeout

87

);

88

}

89

90

@Override

91

public SinkWriter<String> createWriter(WriterInitContext context) throws IOException {

92

return new MyAsyncSinkWriter(

93

getElementConverter(),

94

context,

95

AsyncSinkWriterConfiguration.builder()

96

.setMaxBatchSize(getMaxBatchSize())

97

.setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())

98

.setMaxInFlightRequests(getMaxInFlightRequests())

99

.setMaxBufferedRequests(getMaxBufferedRequests())

100

.setMaxTimeInBufferMS(getMaxTimeInBufferMS())

101

.setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())

102

.build(),

103

Collections.emptyList()

104

);

105

}

106

}

107

108

// Implement AsyncSinkWriter

109

public class MyAsyncSinkWriter extends AsyncSinkWriter<String, MyRequestEntry> {

110

private final AsyncClient client;

111

112

public MyAsyncSinkWriter(

113

ElementConverter<String, MyRequestEntry> elementConverter,

114

WriterInitContext context,

115

AsyncSinkWriterConfiguration configuration,

116

Collection<BufferedRequestState<MyRequestEntry>> states) {

117

super(elementConverter, context, configuration, states);

118

this.client = new AsyncClient();

119

}

120

121

@Override

122

protected void submitRequestEntries(

123

List<MyRequestEntry> requestEntries,

124

ResultHandler<MyRequestEntry> resultHandler) {

125

126

CompletableFuture<Response> future = client.sendBatch(requestEntries);

127

future.whenComplete((response, error) -> {

128

if (error != null && isFatalError(error)) {

129

resultHandler.completeExceptionally(new RuntimeException(error));

130

} else if (error != null || response.hasFailures()) {

131

List<MyRequestEntry> failedEntries = getFailedEntries(requestEntries, response);

132

resultHandler.retryForEntries(failedEntries);

133

} else {

134

resultHandler.complete();

135

}

136

});

137

}

138

139

@Override

140

protected long getSizeInBytes(MyRequestEntry requestEntry) {

141

return requestEntry.getData().length() + 8; // Data length + timestamp

142

}

143

}

144

```

145

146

### Creating a Hybrid Source

147

148

```java

149

// Create file and Kafka sources

150

FileSource<String> fileSource = FileSource

151

.forRecordStreamFormat(new TextLineInputFormat(), path)

152

.build();

153

154

KafkaSource<String> kafkaSource = KafkaSource.<String>builder()

155

.setBootstrapServers("localhost:9092")

156

.setTopics("events")

157

.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))

158

.setStartingOffsets(OffsetsInitializer.earliest())

159

.build();

160

161

// Create hybrid source that reads files first, then switches to Kafka

162

HybridSource<String> hybridSource = HybridSource.builder(fileSource)

163

.addSource(kafkaSource)

164

.build();

165

166

// Use in DataStream

167

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

168

DataStream<String> stream = env.fromSource(

169

hybridSource,

170

WatermarkStrategy.noWatermarks(),

171

"hybrid-source"

172

);

173

```

174

175

## Architecture

176

177

The connector base library is organized into several key architectural components:

178

179

### Sink Architecture

180

- **AsyncSinkBase**: Abstract base class for destination-agnostic async sinks

181

- **AsyncSinkWriter**: Core writer that handles batching, buffering, and retry logic

182

- **ElementConverter**: Transforms stream elements into request entries

183

- **RateLimitingStrategy**: Controls throughput and backpressure

184

- **BatchCreator**: Pluggable batching logic for request grouping

185

186

### Source Architecture

187

- **SourceReaderBase**: Foundation for building custom source readers

188

- **SplitReader**: Interface for reading from individual splits

189

- **RecordEmitter**: Handles record processing and state updates

190

- **HybridSource**: Enables seamless switching between multiple sources

191

192

### State Management

193

- **BufferedRequestState**: Handles checkpointing for async sinks

194

- **Split State Management**: Automatic state tracking for source splits

195

- **Serializers**: Built-in serialization for state persistence

196

197

## Capabilities

198

199

### [Async Sink Framework](./async-sink.md) { .api }

200

Complete framework for building async sinks with batching, buffering, rate limiting, and fault tolerance.

201

202

**Key APIs:**

203

```java { .api }

204

public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>

205

public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>

206

public interface ElementConverter<InputT, RequestEntryT>

207

public interface ResultHandler<RequestEntryT>

208

```

209

210

### [Source Reader Framework](./source-reader.md) { .api }

211

Sophisticated framework for building source readers with split management and coordination.

212

213

**Key APIs:**

214

```java { .api }

215

public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>

216

public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT, SplitStateT>

217

public interface SplitReader<E, SplitT>

218

public interface RecordEmitter<E, T, SplitStateT>

219

```

220

221

### [Hybrid Source System](./hybrid-source.md) { .api }

222

Advanced source that can switch between multiple underlying sources with position transfer.

223

224

**Key APIs:**

225

```java { .api }

226

public class HybridSource<T>

227

public interface SourceFactory<T, SourceT, FromEnumT>

228

public interface SourceSwitchContext<EnumT>

229

```

230

231

### [Rate Limiting & Scaling](./rate-limiting.md) { .api }

232

Pluggable strategies for controlling throughput, handling backpressure, and dynamic scaling.

233

234

**Key APIs:**

235

```java { .api }

236

public interface RateLimitingStrategy

237

public interface ScalingStrategy<T>

238

public class CongestionControlRateLimitingStrategy

239

public class AIMDScalingStrategy

240

```

241

242

### [Table API Integration](./table-api.md) { .api }

243

Base classes for integrating async sinks with Flink's Table API and SQL.

244

245

**Key APIs:**

246

```java { .api }

247

public abstract class AsyncDynamicTableSinkFactory

248

public class AsyncDynamicTableSink

249

public interface ConfigurationValidator

250

```

251

252

## Type Definitions

253

254

### Core Types

255

256

**DeliveryGuarantee**

257

```java { .api }

258

public enum DeliveryGuarantee implements DescribedEnum {

259

EXACTLY_ONCE, // Records delivered exactly once, even under failover

260

AT_LEAST_ONCE, // Records ensured delivery but may be duplicated

261

NONE // Best effort delivery, may lose or duplicate records

262

}

263

```

264

265

**RequestEntryWrapper**

266

```java { .api }

267

public class RequestEntryWrapper<RequestEntryT> {

268

public RequestEntryWrapper(RequestEntryT requestEntry, long size)

269

public RequestEntryT getRequestEntry()

270

public long getSize()

271

}

272

```

273

274

**Batch**

275

```java { .api }

276

public class Batch<RequestEntryT extends Serializable> {

277

public Batch(List<RequestEntryT> batchEntries, long sizeInBytes)

278

public List<RequestEntryT> getBatchEntries()

279

public long getSizeInBytes()

280

public int getRecordCount()

281

}

282

```

283

284

**BufferedRequestState**

285

```java { .api }

286

public class BufferedRequestState<RequestEntryT extends Serializable> {

287

public BufferedRequestState(List<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries)

288

public List<RequestEntryWrapper<RequestEntryT>> getBufferedRequestEntries()

289

public long getStateSize()

290

public static <T extends Serializable> BufferedRequestState<T> emptyState()

291

}

292

```

293

294

### Strategy Types

295

296

**RequestInfo**

297

```java { .api }

298

public interface RequestInfo {

299

int getBatchSize()

300

}

301

```

302

303

**ResultInfo**

304

```java { .api }

305

public interface ResultInfo {

306

int getFailedMessages()

307

int getBatchSize()

308

}

309

```

310

311

**BasicRequestInfo**

312

```java { .api }

313

public class BasicRequestInfo implements RequestInfo {

314

public BasicRequestInfo(int batchSize)

315

public int getBatchSize()

316

}

317

```

318

319

**BasicResultInfo**

320

```java { .api }

321

public class BasicResultInfo implements ResultInfo {

322

public BasicResultInfo(int failedMessages, int batchSize)

323

public int getFailedMessages()

324

public int getBatchSize()

325

}

326

```

327

328

### Source Types

329

330

**RecordsWithSplitIds**

331

```java { .api }

332

public interface RecordsWithSplitIds<E> {

333

String nextSplit()

334

E nextRecordFromSplit()

335

Set<String> finishedSplits()

336

void recycle()

337

}

338

```

339

340

**RecordsBySplits**

341

```java { .api }

342

public class RecordsBySplits<E> implements RecordsWithSplitIds<E> {

343

public static <E> RecordsBySplits<E> forRecords(Map<String, Collection<E>> recordsBySplit)

344

public static <E> RecordsBySplits<E> forFinishedSplit(String splitId)

345

}

346

```

347

348

## Configuration Examples

349

350

### Basic Async Sink Configuration

351

```java

352

AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()

353

.setMaxBatchSize(100) // Max records per batch

354

.setMaxBatchSizeInBytes(1024 * 1024) // Max 1MB per batch

355

.setMaxInFlightRequests(10) // Max concurrent requests

356

.setMaxBufferedRequests(1000) // Max queued requests

357

.setMaxTimeInBufferMS(5000) // Max 5s buffering

358

.setMaxRecordSizeInBytes(256 * 1024) // Max 256KB per record

359

.setRequestTimeoutMS(60000) // 60s timeout

360

.setFailOnTimeout(false) // Retry on timeout

361

.build();

362

```

363

364

### Advanced Rate Limiting Configuration

365

```java

366

// AIMD scaling strategy

367

AIMDScalingStrategy scalingStrategy = AIMDScalingStrategy.builder(1000)

368

.setIncreaseRate(10) // Linear increase

369

.setDecreaseFactor(0.5) // 50% decrease on failure

370

.build();

371

372

// Congestion control rate limiting

373

CongestionControlRateLimitingStrategy rateLimiting =

374

CongestionControlRateLimitingStrategy.builder()

375

.setMaxInFlightRequests(50)

376

.setInitialMaxInFlightMessages(100)

377

.setScalingStrategy(scalingStrategy)

378

.build();

379

380

AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()

381

.setMaxBatchSize(100)

382

.setMaxBatchSizeInBytes(1024 * 1024)

383

.setMaxInFlightRequests(50)

384

.setMaxBufferedRequests(1000)

385

.setMaxTimeInBufferMS(5000)

386

.setMaxRecordSizeInBytes(256 * 1024)

387

.setRateLimitingStrategy(rateLimiting) // Custom rate limiting

388

.build();

389

```

390

391

### Hybrid Source with Dynamic Position Transfer

392

```java

393

HybridSource<String> hybridSource = HybridSource

394

.<String, FileSourceEnumerator>builder(fileSource)

395

.addSource(

396

switchContext -> {

397

// Get end position from previous source

398

FileSourceEnumerator previousEnumerator = switchContext.getPreviousEnumerator();

399

long endTimestamp = previousEnumerator.getEndTimestamp();

400

401

// Configure next source with derived start position

402

return KafkaSource.<String>builder()

403

.setBootstrapServers("localhost:9092")

404

.setTopics("events")

405

.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))

406

.setStartingOffsets(OffsetsInitializer.timestamp(endTimestamp))

407

.build();

408

},

409

Boundedness.CONTINUOUS_UNBOUNDED

410

)

411

.build();

412

```

413

414

## Best Practices

415

416

### Sink Implementation

417

1. **Always implement proper error handling** in `submitRequestEntries`

418

2. **Use appropriate batch sizes** for your destination's characteristics

419

3. **Configure rate limiting** based on destination capacity

420

4. **Implement proper size calculation** in `getSizeInBytes`

421

5. **Handle fatal vs retryable exceptions** correctly

422

423

### Source Implementation

424

1. **Implement efficient split reading** in `SplitReader.fetch()`

425

2. **Handle split lifecycle properly** (add/remove/pause/resume)

426

3. **Use appropriate record emitters** for state management

427

4. **Configure proper queue sizes** for throughput requirements

428

5. **Implement proper cleanup** in close methods

429

430

### Performance Optimization

431

1. **Tune batch sizes** for optimal throughput vs latency

432

2. **Configure appropriate timeouts** for your network conditions

433

3. **Use efficient serialization** for request entries

434

4. **Monitor and tune rate limiting** strategies

435

5. **Optimize record size calculations** for performance

436

437

This comprehensive framework enables building production-ready Flink connectors with sophisticated features like rate limiting, backpressure handling, state management, and fault tolerance built-in.