0
# Apache Flink Connector Base
1
2
The Apache Flink Connector Base library provides foundational classes and utilities for building high-performance, production-ready Apache Flink connectors. It offers sophisticated async sink and source frameworks with built-in features like backpressure handling, rate limiting, checkpointing, and hybrid source switching.
3
4
## Package Information
5
6
**Maven Coordinates:**
7
```xml
8
<dependency>
9
<groupId>org.apache.flink</groupId>
10
<artifactId>flink-connector-base</artifactId>
11
<version>1.18+</version>
12
</dependency>
13
```
14
15
**Package:** `org.apache.flink.connector.base`
16
17
## Core Imports
18
19
### Essential Base Classes
20
```java
21
import org.apache.flink.connector.base.DeliveryGuarantee;
22
import org.apache.flink.connector.base.sink.AsyncSinkBase;
23
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
24
import org.apache.flink.connector.base.sink.writer.ElementConverter;
25
import org.apache.flink.connector.base.source.hybrid.HybridSource;
26
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
27
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
28
```
29
30
### Configuration and Strategy Classes
31
```java
32
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
33
import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
34
import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
35
import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
36
```
37
38
### Core Interfaces
39
```java
40
import org.apache.flink.connector.base.sink.writer.BatchCreator;
41
import org.apache.flink.connector.base.sink.writer.RequestBuffer;
42
import org.apache.flink.connector.base.sink.writer.ResultHandler;
43
import org.apache.flink.connector.base.source.reader.RecordEmitter;
44
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
45
```
46
47
## Basic Usage
48
49
### Creating an Async Sink
50
51
```java
52
// Define your request entry type
53
public class MyRequestEntry implements Serializable {
54
private final String data;
55
private final long timestamp;
56
57
public MyRequestEntry(String data, long timestamp) {
58
this.data = data;
59
this.timestamp = timestamp;
60
}
61
62
public String getData() { return data; }
63
public long getTimestamp() { return timestamp; }
64
}
65
66
// Implement ElementConverter
67
public class MyElementConverter implements ElementConverter<String, MyRequestEntry> {
68
@Override
69
public MyRequestEntry apply(String element, SinkWriter.Context context) {
70
return new MyRequestEntry(element, context.timestamp());
71
}
72
}
73
74
// Create AsyncSink implementation
75
public class MyAsyncSink extends AsyncSinkBase<String, MyRequestEntry> {
76
public MyAsyncSink() {
77
super(
78
new MyElementConverter(), // Element converter
79
100, // Max batch size
80
10, // Max in-flight requests
81
1000, // Max buffered requests
82
1024 * 1024, // Max batch size in bytes
83
5000, // Max time in buffer (ms)
84
256 * 1024, // Max record size in bytes
85
60000, // Request timeout (ms)
86
false // Fail on timeout
87
);
88
}
89
90
@Override
91
public SinkWriter<String> createWriter(WriterInitContext context) throws IOException {
92
return new MyAsyncSinkWriter(
93
getElementConverter(),
94
context,
95
AsyncSinkWriterConfiguration.builder()
96
.setMaxBatchSize(getMaxBatchSize())
97
.setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())
98
.setMaxInFlightRequests(getMaxInFlightRequests())
99
.setMaxBufferedRequests(getMaxBufferedRequests())
100
.setMaxTimeInBufferMS(getMaxTimeInBufferMS())
101
.setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())
102
.build(),
103
Collections.emptyList()
104
);
105
}
106
}
107
108
// Implement AsyncSinkWriter
109
public class MyAsyncSinkWriter extends AsyncSinkWriter<String, MyRequestEntry> {
110
private final AsyncClient client;
111
112
public MyAsyncSinkWriter(
113
ElementConverter<String, MyRequestEntry> elementConverter,
114
WriterInitContext context,
115
AsyncSinkWriterConfiguration configuration,
116
Collection<BufferedRequestState<MyRequestEntry>> states) {
117
super(elementConverter, context, configuration, states);
118
this.client = new AsyncClient();
119
}
120
121
@Override
122
protected void submitRequestEntries(
123
List<MyRequestEntry> requestEntries,
124
ResultHandler<MyRequestEntry> resultHandler) {
125
126
CompletableFuture<Response> future = client.sendBatch(requestEntries);
127
future.whenComplete((response, error) -> {
128
if (error != null && isFatalError(error)) {
129
resultHandler.completeExceptionally(new RuntimeException(error));
130
} else if (error != null || response.hasFailures()) {
131
List<MyRequestEntry> failedEntries = getFailedEntries(requestEntries, response);
132
resultHandler.retryForEntries(failedEntries);
133
} else {
134
resultHandler.complete();
135
}
136
});
137
}
138
139
@Override
140
protected long getSizeInBytes(MyRequestEntry requestEntry) {
141
return requestEntry.getData().length() + 8; // Data length + timestamp
142
}
143
}
144
```
145
146
### Creating a Hybrid Source
147
148
```java
149
// Create file and Kafka sources
150
FileSource<String> fileSource = FileSource
151
.forRecordStreamFormat(new TextLineInputFormat(), path)
152
.build();
153
154
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
155
.setBootstrapServers("localhost:9092")
156
.setTopics("events")
157
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
158
.setStartingOffsets(OffsetsInitializer.earliest())
159
.build();
160
161
// Create hybrid source that reads files first, then switches to Kafka
162
HybridSource<String> hybridSource = HybridSource.builder(fileSource)
163
.addSource(kafkaSource)
164
.build();
165
166
// Use in DataStream
167
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
168
DataStream<String> stream = env.fromSource(
169
hybridSource,
170
WatermarkStrategy.noWatermarks(),
171
"hybrid-source"
172
);
173
```
174
175
## Architecture
176
177
The connector base library is organized into several key architectural components:
178
179
### Sink Architecture
180
- **AsyncSinkBase**: Abstract base class for destination-agnostic async sinks
181
- **AsyncSinkWriter**: Core writer that handles batching, buffering, and retry logic
182
- **ElementConverter**: Transforms stream elements into request entries
183
- **RateLimitingStrategy**: Controls throughput and backpressure
184
- **BatchCreator**: Pluggable batching logic for request grouping
185
186
### Source Architecture
187
- **SourceReaderBase**: Foundation for building custom source readers
188
- **SplitReader**: Interface for reading from individual splits
189
- **RecordEmitter**: Handles record processing and state updates
190
- **HybridSource**: Enables seamless switching between multiple sources
191
192
### State Management
193
- **BufferedRequestState**: Handles checkpointing for async sinks
194
- **Split State Management**: Automatic state tracking for source splits
195
- **Serializers**: Built-in serialization for state persistence
196
197
## Capabilities
198
199
### [Async Sink Framework](./async-sink.md) { .api }
200
Complete framework for building async sinks with batching, buffering, rate limiting, and fault tolerance.
201
202
**Key APIs:**
203
```java { .api }
204
public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>
205
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
206
public interface ElementConverter<InputT, RequestEntryT>
207
public interface ResultHandler<RequestEntryT>
208
```
209
210
### [Source Reader Framework](./source-reader.md) { .api }
211
Sophisticated framework for building source readers with split management and coordination.
212
213
**Key APIs:**
214
```java { .api }
215
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
216
public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT, SplitStateT>
217
public interface SplitReader<E, SplitT>
218
public interface RecordEmitter<E, T, SplitStateT>
219
```
220
221
### [Hybrid Source System](./hybrid-source.md) { .api }
222
Advanced source that can switch between multiple underlying sources with position transfer.
223
224
**Key APIs:**
225
```java { .api }
226
public class HybridSource<T>
227
public interface SourceFactory<T, SourceT, FromEnumT>
228
public interface SourceSwitchContext<EnumT>
229
```
230
231
### [Rate Limiting & Scaling](./rate-limiting.md) { .api }
232
Pluggable strategies for controlling throughput, handling backpressure, and dynamic scaling.
233
234
**Key APIs:**
235
```java { .api }
236
public interface RateLimitingStrategy
237
public interface ScalingStrategy<T>
238
public class CongestionControlRateLimitingStrategy
239
public class AIMDScalingStrategy
240
```
241
242
### [Table API Integration](./table-api.md) { .api }
243
Base classes for integrating async sinks with Flink's Table API and SQL.
244
245
**Key APIs:**
246
```java { .api }
247
public abstract class AsyncDynamicTableSinkFactory
248
public class AsyncDynamicTableSink
249
public interface ConfigurationValidator
250
```
251
252
## Type Definitions
253
254
### Core Types
255
256
**DeliveryGuarantee**
257
```java { .api }
258
public enum DeliveryGuarantee implements DescribedEnum {
259
EXACTLY_ONCE, // Records delivered exactly once, even under failover
260
AT_LEAST_ONCE, // Records ensured delivery but may be duplicated
261
NONE // Best effort delivery, may lose or duplicate records
262
}
263
```
264
265
**RequestEntryWrapper**
266
```java { .api }
267
public class RequestEntryWrapper<RequestEntryT> {
268
public RequestEntryWrapper(RequestEntryT requestEntry, long size)
269
public RequestEntryT getRequestEntry()
270
public long getSize()
271
}
272
```
273
274
**Batch**
275
```java { .api }
276
public class Batch<RequestEntryT extends Serializable> {
277
public Batch(List<RequestEntryT> batchEntries, long sizeInBytes)
278
public List<RequestEntryT> getBatchEntries()
279
public long getSizeInBytes()
280
public int getRecordCount()
281
}
282
```
283
284
**BufferedRequestState**
285
```java { .api }
286
public class BufferedRequestState<RequestEntryT extends Serializable> {
287
public BufferedRequestState(List<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries)
288
public List<RequestEntryWrapper<RequestEntryT>> getBufferedRequestEntries()
289
public long getStateSize()
290
public static <T extends Serializable> BufferedRequestState<T> emptyState()
291
}
292
```
293
294
### Strategy Types
295
296
**RequestInfo**
297
```java { .api }
298
public interface RequestInfo {
299
int getBatchSize()
300
}
301
```
302
303
**ResultInfo**
304
```java { .api }
305
public interface ResultInfo {
306
int getFailedMessages()
307
int getBatchSize()
308
}
309
```
310
311
**BasicRequestInfo**
312
```java { .api }
313
public class BasicRequestInfo implements RequestInfo {
314
public BasicRequestInfo(int batchSize)
315
public int getBatchSize()
316
}
317
```
318
319
**BasicResultInfo**
320
```java { .api }
321
public class BasicResultInfo implements ResultInfo {
322
public BasicResultInfo(int failedMessages, int batchSize)
323
public int getFailedMessages()
324
public int getBatchSize()
325
}
326
```
327
328
### Source Types
329
330
**RecordsWithSplitIds**
331
```java { .api }
332
public interface RecordsWithSplitIds<E> {
333
String nextSplit()
334
E nextRecordFromSplit()
335
Set<String> finishedSplits()
336
void recycle()
337
}
338
```
339
340
**RecordsBySplits**
341
```java { .api }
342
public class RecordsBySplits<E> implements RecordsWithSplitIds<E> {
343
public static <E> RecordsBySplits<E> forRecords(Map<String, Collection<E>> recordsBySplit)
344
public static <E> RecordsBySplits<E> forFinishedSplit(String splitId)
345
}
346
```
347
348
## Configuration Examples
349
350
### Basic Async Sink Configuration
351
```java
352
AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()
353
.setMaxBatchSize(100) // Max records per batch
354
.setMaxBatchSizeInBytes(1024 * 1024) // Max 1MB per batch
355
.setMaxInFlightRequests(10) // Max concurrent requests
356
.setMaxBufferedRequests(1000) // Max queued requests
357
.setMaxTimeInBufferMS(5000) // Max 5s buffering
358
.setMaxRecordSizeInBytes(256 * 1024) // Max 256KB per record
359
.setRequestTimeoutMS(60000) // 60s timeout
360
.setFailOnTimeout(false) // Retry on timeout
361
.build();
362
```
363
364
### Advanced Rate Limiting Configuration
365
```java
366
// AIMD scaling strategy
367
AIMDScalingStrategy scalingStrategy = AIMDScalingStrategy.builder(1000)
368
.setIncreaseRate(10) // Linear increase
369
.setDecreaseFactor(0.5) // 50% decrease on failure
370
.build();
371
372
// Congestion control rate limiting
373
CongestionControlRateLimitingStrategy rateLimiting =
374
CongestionControlRateLimitingStrategy.builder()
375
.setMaxInFlightRequests(50)
376
.setInitialMaxInFlightMessages(100)
377
.setScalingStrategy(scalingStrategy)
378
.build();
379
380
AsyncSinkWriterConfiguration config = AsyncSinkWriterConfiguration.builder()
381
.setMaxBatchSize(100)
382
.setMaxBatchSizeInBytes(1024 * 1024)
383
.setMaxInFlightRequests(50)
384
.setMaxBufferedRequests(1000)
385
.setMaxTimeInBufferMS(5000)
386
.setMaxRecordSizeInBytes(256 * 1024)
387
.setRateLimitingStrategy(rateLimiting) // Custom rate limiting
388
.build();
389
```
390
391
### Hybrid Source with Dynamic Position Transfer
392
```java
393
HybridSource<String> hybridSource = HybridSource
394
.<String, FileSourceEnumerator>builder(fileSource)
395
.addSource(
396
switchContext -> {
397
// Get end position from previous source
398
FileSourceEnumerator previousEnumerator = switchContext.getPreviousEnumerator();
399
long endTimestamp = previousEnumerator.getEndTimestamp();
400
401
// Configure next source with derived start position
402
return KafkaSource.<String>builder()
403
.setBootstrapServers("localhost:9092")
404
.setTopics("events")
405
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
406
.setStartingOffsets(OffsetsInitializer.timestamp(endTimestamp))
407
.build();
408
},
409
Boundedness.CONTINUOUS_UNBOUNDED
410
)
411
.build();
412
```
413
414
## Best Practices
415
416
### Sink Implementation
417
1. **Always implement proper error handling** in `submitRequestEntries`
418
2. **Use appropriate batch sizes** for your destination's characteristics
419
3. **Configure rate limiting** based on destination capacity
420
4. **Implement proper size calculation** in `getSizeInBytes`
421
5. **Handle fatal vs retryable exceptions** correctly
422
423
### Source Implementation
424
1. **Implement efficient split reading** in `SplitReader.fetch()`
425
2. **Handle split lifecycle properly** (add/remove/pause/resume)
426
3. **Use appropriate record emitters** for state management
427
4. **Configure proper queue sizes** for throughput requirements
428
5. **Implement proper cleanup** in close methods
429
430
### Performance Optimization
431
1. **Tune batch sizes** for optimal throughput vs latency
432
2. **Configure appropriate timeouts** for your network conditions
433
3. **Use efficient serialization** for request entries
434
4. **Monitor and tune rate limiting** strategies
435
5. **Optimize record size calculations** for performance
436
437
This comprehensive framework enables building production-ready Flink connectors with sophisticated features like rate limiting, backpressure handling, state management, and fault tolerance built-in.