0
# Connectors
1
2
Apache Flink Core provides comprehensive connector APIs for building data sources and sinks that integrate with external systems. These APIs enable developers to create efficient, fault-tolerant connectors with features like checkpointing, parallelism, and exactly-once semantics.
3
4
## Source Connector Framework
5
6
### Basic Source Interface
7
8
The foundation for all Flink data sources.
9
10
```java { .api }
11
import org.apache.flink.api.connector.source.*;
12
import org.apache.flink.core.io.SimpleVersionedSerializer;
13
14
// Basic source implementation
15
public class CustomSource implements Source<MyRecord, MySourceSplit, MyEnumeratorState> {
16
17
@Override
18
public Boundedness getBoundedness() {
19
return Boundedness.CONTINUOUS_UNBOUNDED; // or BOUNDED
20
}
21
22
@Override
23
public SourceReader<MyRecord, MySourceSplit> createReader(SourceReaderContext readerContext) {
24
return new MySourceReader(readerContext);
25
}
26
27
@Override
28
public SplitEnumerator<MySourceSplit, MyEnumeratorState> createEnumerator(
29
SplitEnumeratorContext<MySourceSplit> enumContext) {
30
return new MySplitEnumerator(enumContext);
31
}
32
33
@Override
34
public SplitEnumerator<MySourceSplit, MyEnumeratorState> restoreEnumerator(
35
SplitEnumeratorContext<MySourceSplit> enumContext,
36
MyEnumeratorState checkpoint) {
37
return new MySplitEnumerator(enumContext, checkpoint);
38
}
39
40
@Override
41
public SimpleVersionedSerializer<MySourceSplit> getSplitSerializer() {
42
return new MySourceSplitSerializer();
43
}
44
45
@Override
46
public SimpleVersionedSerializer<MyEnumeratorState> getEnumeratorCheckpointSerializer() {
47
return new MyEnumeratorStateSerializer();
48
}
49
}
50
```
51
52
### Source Split Definition
53
54
Define how data is partitioned and processed.
55
56
```java { .api }
57
import org.apache.flink.api.connector.source.SourceSplit;
58
59
// Custom source split
60
public class MySourceSplit implements SourceSplit {
61
private final String splitId;
62
private final String filepath;
63
private final long startOffset;
64
private final long endOffset;
65
66
public MySourceSplit(String splitId, String filepath, long startOffset, long endOffset) {
67
this.splitId = splitId;
68
this.filepath = filepath;
69
this.startOffset = startOffset;
70
this.endOffset = endOffset;
71
}
72
73
@Override
74
public String splitId() {
75
return splitId;
76
}
77
78
// Getters
79
public String getFilepath() { return filepath; }
80
public long getStartOffset() { return startOffset; }
81
public long getEndOffset() { return endOffset; }
82
83
@Override
84
public String toString() {
85
return String.format("MySourceSplit{id='%s', file='%s', range=[%d, %d]}",
86
splitId, filepath, startOffset, endOffset);
87
}
88
}
89
90
// Split serializer for checkpointing
91
public class MySourceSplitSerializer implements SimpleVersionedSerializer<MySourceSplit> {
92
93
@Override
94
public int getVersion() {
95
return 1;
96
}
97
98
@Override
99
public byte[] serialize(MySourceSplit split) throws IOException {
100
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
101
DataOutputStream out = new DataOutputStream(baos)) {
102
103
out.writeUTF(split.splitId());
104
out.writeUTF(split.getFilepath());
105
out.writeLong(split.getStartOffset());
106
out.writeLong(split.getEndOffset());
107
108
return baos.toByteArray();
109
}
110
}
111
112
@Override
113
public MySourceSplit deserialize(int version, byte[] serialized) throws IOException {
114
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
115
DataInputStream in = new DataInputStream(bais)) {
116
117
String splitId = in.readUTF();
118
String filepath = in.readUTF();
119
long startOffset = in.readLong();
120
long endOffset = in.readLong();
121
122
return new MySourceSplit(splitId, filepath, startOffset, endOffset);
123
}
124
}
125
}
126
```
127
128
### Split Enumerator
129
130
Discovers and assigns splits to source readers.
131
132
```java { .api }
133
import org.apache.flink.api.connector.source.SplitEnumerator;
134
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
135
136
public class MySplitEnumerator implements SplitEnumerator<MySourceSplit, MyEnumeratorState> {
137
private final SplitEnumeratorContext<MySourceSplit> context;
138
private final Set<String> remainingFiles;
139
private final Map<Integer, Set<String>> readerAssignments;
140
141
public MySplitEnumerator(SplitEnumeratorContext<MySourceSplit> context) {
142
this.context = context;
143
this.remainingFiles = discoverFiles();
144
this.readerAssignments = new HashMap<>();
145
}
146
147
public MySplitEnumerator(SplitEnumeratorContext<MySourceSplit> context,
148
MyEnumeratorState restoredState) {
149
this.context = context;
150
this.remainingFiles = restoredState.getRemainingFiles();
151
this.readerAssignments = restoredState.getReaderAssignments();
152
}
153
154
@Override
155
public void start() {
156
// Initialize and assign initial splits
157
assignSplitsToReaders();
158
}
159
160
@Override
161
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
162
// Assign more splits when requested
163
if (!remainingFiles.isEmpty()) {
164
String nextFile = remainingFiles.iterator().next();
165
remainingFiles.remove(nextFile);
166
167
MySourceSplit split = createSplitFromFile(nextFile, subtaskId);
168
context.assignSplit(split, subtaskId);
169
170
// Track assignment
171
readerAssignments.computeIfAbsent(subtaskId, k -> new HashSet<>()).add(nextFile);
172
} else {
173
// No more splits available
174
context.signalNoMoreSplits(subtaskId);
175
}
176
}
177
178
@Override
179
public void addSplitsBack(List<MySourceSplit> splits, int subtaskId) {
180
// Handle split reassignment on failure
181
for (MySourceSplit split : splits) {
182
remainingFiles.add(split.getFilepath());
183
readerAssignments.get(subtaskId).remove(split.getFilepath());
184
}
185
}
186
187
@Override
188
public void addReader(int subtaskId) {
189
// New reader registered
190
readerAssignments.put(subtaskId, new HashSet<>());
191
assignSplitsToReader(subtaskId);
192
}
193
194
@Override
195
public MyEnumeratorState snapshotState(long checkpointId) throws Exception {
196
return new MyEnumeratorState(remainingFiles, readerAssignments);
197
}
198
199
@Override
200
public void close() throws IOException {
201
// Cleanup resources
202
}
203
204
private void assignSplitsToReaders() {
205
for (int readerId : context.registeredReaders().keySet()) {
206
assignSplitsToReader(readerId);
207
}
208
}
209
210
private void assignSplitsToReader(int readerId) {
211
// Assign initial splits to reader
212
if (!remainingFiles.isEmpty()) {
213
String file = remainingFiles.iterator().next();
214
remainingFiles.remove(file);
215
216
MySourceSplit split = createSplitFromFile(file, readerId);
217
context.assignSplit(split, readerId);
218
readerAssignments.get(readerId).add(file);
219
}
220
}
221
222
private MySourceSplit createSplitFromFile(String filepath, int readerId) {
223
String splitId = String.format("%s-%d", filepath, readerId);
224
// Calculate file offsets based on parallelism
225
return new MySourceSplit(splitId, filepath, 0, getFileSize(filepath));
226
}
227
228
private Set<String> discoverFiles() {
229
// Discover files to process
230
return new HashSet<>(Arrays.asList("file1.txt", "file2.txt", "file3.txt"));
231
}
232
233
private long getFileSize(String filepath) {
234
// Get file size for split calculation
235
return 1024 * 1024; // 1MB example
236
}
237
}
238
```
239
240
### Source Reader
241
242
Reads records from assigned splits.
243
244
```java { .api }
245
import org.apache.flink.api.connector.source.SourceReader;
246
import org.apache.flink.api.connector.source.SourceReaderContext;
247
248
public class MySourceReader implements SourceReader<MyRecord, MySourceSplit> {
249
private final SourceReaderContext context;
250
private final Queue<MySourceSplit> pendingSplits;
251
private final Map<String, MyFileReader> activeReaders;
252
253
public MySourceReader(SourceReaderContext context) {
254
this.context = context;
255
this.pendingSplits = new LinkedList<>();
256
this.activeReaders = new HashMap<>();
257
}
258
259
@Override
260
public void start() {
261
// Initialize reader
262
}
263
264
@Override
265
public InputStatus pollNext(ReaderOutput<MyRecord> output) throws Exception {
266
// Check for available data
267
if (activeReaders.isEmpty() && pendingSplits.isEmpty()) {
268
// Request more splits if needed
269
context.sendSplitRequest();
270
return InputStatus.NOTHING_AVAILABLE;
271
}
272
273
// Process pending splits
274
while (!pendingSplits.isEmpty()) {
275
MySourceSplit split = pendingSplits.poll();
276
MyFileReader fileReader = new MyFileReader(split);
277
activeReaders.put(split.splitId(), fileReader);
278
}
279
280
// Read records from active readers
281
boolean hasData = false;
282
Iterator<Map.Entry<String, MyFileReader>> iterator = activeReaders.entrySet().iterator();
283
284
while (iterator.hasNext()) {
285
Map.Entry<String, MyFileReader> entry = iterator.next();
286
MyFileReader reader = entry.getValue();
287
288
MyRecord record = reader.readNext();
289
if (record != null) {
290
output.collect(record);
291
hasData = true;
292
} else if (reader.isFinished()) {
293
// Split is exhausted
294
reader.close();
295
iterator.remove();
296
}
297
}
298
299
if (activeReaders.isEmpty()) {
300
return InputStatus.END_OF_INPUT;
301
}
302
303
return hasData ? InputStatus.MORE_AVAILABLE : InputStatus.NOTHING_AVAILABLE;
304
}
305
306
@Override
307
public List<MySourceSplit> snapshotState(long checkpointId) {
308
// Return unprocessed splits for checkpointing
309
List<MySourceSplit> splitsToSnapshot = new ArrayList<>(pendingSplits);
310
311
// Add partially processed splits
312
for (MyFileReader reader : activeReaders.values()) {
313
if (!reader.isFinished()) {
314
splitsToSnapshot.add(reader.getCurrentSplit());
315
}
316
}
317
318
return splitsToSnapshot;
319
}
320
321
@Override
322
public void addSplits(List<MySourceSplit> splits) {
323
pendingSplits.addAll(splits);
324
}
325
326
@Override
327
public void notifyNoMoreSplits() {
328
// No more splits will be assigned
329
}
330
331
@Override
332
public void close() throws Exception {
333
for (MyFileReader reader : activeReaders.values()) {
334
reader.close();
335
}
336
activeReaders.clear();
337
}
338
}
339
```
340
341
### Built-in Sources
342
343
Using pre-built source implementations.
344
345
```java { .api }
346
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
347
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
348
349
public class BuiltInSourceExamples {
350
351
public static void numberSequenceExample() {
352
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
353
354
// Number sequence source
355
NumberSequenceSource source = new NumberSequenceSource(1, 1000000);
356
357
DataStream<Long> numbers = env.fromSource(
358
source,
359
WatermarkStrategy.noWatermarks(),
360
"number-sequence"
361
);
362
363
numbers.print();
364
}
365
366
// Custom bounded source
367
public static DataStream<String> createBoundedFileSource(StreamExecutionEnvironment env) {
368
CustomFileSource source = new CustomFileSource("/path/to/files");
369
370
return env.fromSource(
371
source,
372
WatermarkStrategy.noWatermarks(),
373
"file-source"
374
);
375
}
376
377
// Custom unbounded source with watermarks
378
public static DataStream<Event> createUnboundedEventSource(StreamExecutionEnvironment env) {
379
CustomEventSource source = new CustomEventSource("kafka-topic");
380
381
WatermarkStrategy<Event> watermarkStrategy =
382
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
383
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
384
385
return env.fromSource(
386
source,
387
watermarkStrategy,
388
"event-source"
389
);
390
}
391
}
392
```
393
394
## Sink Connector Framework
395
396
### Basic Sink Interface
397
398
The foundation for all Flink data sinks.
399
400
```java { .api }
401
import org.apache.flink.api.connector.sink2.*;
402
403
// Simple stateless sink
404
public class MyBasicSink implements Sink<MyRecord> {
405
406
@Override
407
public SinkWriter<MyRecord> createWriter(InitContext context) throws IOException {
408
return new MyBasicSinkWriter(context);
409
}
410
}
411
412
// Basic sink writer implementation
413
public class MyBasicSinkWriter implements SinkWriter<MyRecord> {
414
private final InitContext context;
415
private final DatabaseConnection connection;
416
417
public MyBasicSinkWriter(InitContext context) throws IOException {
418
this.context = context;
419
this.connection = new DatabaseConnection();
420
}
421
422
@Override
423
public void write(MyRecord element, Context context) throws IOException, InterruptedException {
424
// Write record to external system
425
connection.insert(element);
426
}
427
428
@Override
429
public void flush(boolean endOfInput) throws IOException, InterruptedException {
430
// Flush any buffered data
431
connection.flush();
432
}
433
434
@Override
435
public void close() throws Exception {
436
connection.close();
437
}
438
}
439
```
440
441
### Stateful Sink with Checkpointing
442
443
Handle state for exactly-once guarantees.
444
445
```java { .api }
446
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
447
import org.apache.flink.api.connector.sink2.SupportsWriterState;
448
449
// Sink supporting writer state
450
public class MyStatefulSink implements Sink<MyRecord>, SupportsWriterState<MyRecord, MyWriterState> {
451
452
@Override
453
public StatefulSinkWriter<MyRecord, MyWriterState> createWriter(InitContext context)
454
throws IOException {
455
return new MyStatefulSinkWriter(context);
456
}
457
458
@Override
459
public StatefulSinkWriter<MyRecord, MyWriterState> restoreWriter(
460
InitContext context,
461
Collection<MyWriterState> recoveredState) throws IOException {
462
return new MyStatefulSinkWriter(context, recoveredState);
463
}
464
465
@Override
466
public SimpleVersionedSerializer<MyWriterState> getWriterStateSerializer() {
467
return new MyWriterStateSerializer();
468
}
469
}
470
471
// Stateful sink writer
472
public class MyStatefulSinkWriter implements StatefulSinkWriter<MyRecord, MyWriterState> {
473
private final List<MyRecord> pendingRecords;
474
private final Map<String, Long> processedCounts;
475
476
public MyStatefulSinkWriter(InitContext context) {
477
this.pendingRecords = new ArrayList<>();
478
this.processedCounts = new HashMap<>();
479
}
480
481
public MyStatefulSinkWriter(InitContext context, Collection<MyWriterState> recoveredState) {
482
this.pendingRecords = new ArrayList<>();
483
this.processedCounts = new HashMap<>();
484
485
// Restore state
486
for (MyWriterState state : recoveredState) {
487
pendingRecords.addAll(state.getPendingRecords());
488
processedCounts.putAll(state.getProcessedCounts());
489
}
490
}
491
492
@Override
493
public void write(MyRecord element, Context context) throws IOException, InterruptedException {
494
pendingRecords.add(element);
495
496
String key = element.getKey();
497
processedCounts.merge(key, 1L, Long::sum);
498
499
// Batch write when buffer is full
500
if (pendingRecords.size() >= 1000) {
501
flushPendingRecords();
502
}
503
}
504
505
@Override
506
public List<MyWriterState> snapshotState(long checkpointId) throws IOException {
507
// Create state snapshot
508
MyWriterState state = new MyWriterState(
509
new ArrayList<>(pendingRecords),
510
new HashMap<>(processedCounts),
511
checkpointId
512
);
513
514
return Collections.singletonList(state);
515
}
516
517
@Override
518
public void flush(boolean endOfInput) throws IOException, InterruptedException {
519
flushPendingRecords();
520
}
521
522
@Override
523
public void close() throws Exception {
524
flushPendingRecords();
525
}
526
527
private void flushPendingRecords() throws IOException {
528
if (!pendingRecords.isEmpty()) {
529
// Write records to external system
530
for (MyRecord record : pendingRecords) {
531
writeToExternalSystem(record);
532
}
533
pendingRecords.clear();
534
}
535
}
536
537
private void writeToExternalSystem(MyRecord record) throws IOException {
538
// Implementation specific to external system
539
}
540
}
541
```
542
543
### Two-Phase Commit Sink
544
545
Implement exactly-once semantics with two-phase commit.
546
547
```java { .api }
548
import org.apache.flink.api.connector.sink2.SupportsCommitter;
549
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
550
import org.apache.flink.api.connector.sink2.Committer;
551
552
// Sink with two-phase commit
553
public class MyTransactionalSink implements
554
Sink<MyRecord>,
555
SupportsCommitter<MyCommittable> {
556
557
@Override
558
public CommittingSinkWriter<MyRecord, MyCommittable> createWriter(InitContext context)
559
throws IOException {
560
return new MyCommittingSinkWriter(context);
561
}
562
563
@Override
564
public Committer<MyCommittable> createCommitter() throws IOException {
565
return new MyCommitter();
566
}
567
568
@Override
569
public SimpleVersionedSerializer<MyCommittable> getCommittableSerializer() {
570
return new MyCommittableSerializer();
571
}
572
}
573
574
// Committing sink writer (first phase)
575
public class MyCommittingSinkWriter implements CommittingSinkWriter<MyRecord, MyCommittable> {
576
private final String transactionId;
577
private final List<MyRecord> currentBatch;
578
private final DatabaseTransaction transaction;
579
580
public MyCommittingSinkWriter(InitContext context) throws IOException {
581
this.transactionId = generateTransactionId(context);
582
this.currentBatch = new ArrayList<>();
583
this.transaction = new DatabaseTransaction(transactionId);
584
}
585
586
@Override
587
public void write(MyRecord element, Context context) throws IOException, InterruptedException {
588
currentBatch.add(element);
589
transaction.prepare(element);
590
}
591
592
@Override
593
public Collection<MyCommittable> prepareCommit() throws IOException, InterruptedException {
594
if (currentBatch.isEmpty()) {
595
return Collections.emptyList();
596
}
597
598
// Prepare transaction for commit
599
transaction.prepareForCommit();
600
601
MyCommittable committable = new MyCommittable(
602
transactionId,
603
new ArrayList<>(currentBatch),
604
System.currentTimeMillis()
605
);
606
607
currentBatch.clear();
608
return Collections.singletonList(committable);
609
}
610
611
@Override
612
public void flush(boolean endOfInput) throws IOException, InterruptedException {
613
// Ensure all data is prepared
614
prepareCommit();
615
}
616
617
@Override
618
public void close() throws Exception {
619
transaction.close();
620
}
621
622
private String generateTransactionId(InitContext context) {
623
return String.format("txn_%d_%d_%d",
624
context.getSubtaskId(),
625
context.getAttemptNumber(),
626
System.currentTimeMillis());
627
}
628
}
629
630
// Committer (second phase)
631
public class MyCommitter implements Committer<MyCommittable> {
632
633
@Override
634
public void commit(Collection<CommitRequest<MyCommittable>> requests)
635
throws IOException, InterruptedException {
636
637
for (CommitRequest<MyCommittable> request : requests) {
638
MyCommittable committable = request.getCommittable();
639
640
try {
641
// Commit the transaction
642
DatabaseTransaction transaction =
643
DatabaseTransaction.resume(committable.getTransactionId());
644
transaction.commit();
645
646
} catch (Exception e) {
647
// Handle commit failure
648
throw new IOException("Failed to commit transaction: " +
649
committable.getTransactionId(), e);
650
}
651
}
652
}
653
654
@Override
655
public void close() throws Exception {
656
// Cleanup resources
657
}
658
}
659
```
660
661
### Advanced Connector Features
662
663
#### Dynamic Parallelism Inference
664
665
```java { .api }
666
import org.apache.flink.api.connector.source.DynamicParallelismInference;
667
668
public class DynamicSource implements Source<MyRecord, MySourceSplit, MyEnumeratorState>,
669
DynamicParallelismInference {
670
671
@Override
672
public int inferParallelism(SourceReaderFactory readerFactory) {
673
// Infer optimal parallelism based on source characteristics
674
int availableFiles = discoverAvailableFiles();
675
int maxParallelism = getMaxRecommendedParallelism();
676
677
return Math.min(availableFiles, maxParallelism);
678
}
679
680
private int discoverAvailableFiles() {
681
// Count available data partitions/files
682
return 10; // Example
683
}
684
685
private int getMaxRecommendedParallelism() {
686
// Based on external system limits or performance characteristics
687
return 50;
688
}
689
}
690
```
691
692
#### Rate Limiting
693
694
```java { .api }
695
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
696
697
public class RateLimitedSourceReader implements SourceReader<MyRecord, MySourceSplit> {
698
private final SourceReaderContext context;
699
private final RateLimiterStrategy rateLimiter;
700
701
public RateLimitedSourceReader(SourceReaderContext context) {
702
this.context = context;
703
this.rateLimiter = RateLimiterStrategy.perSecond(1000); // 1000 records/sec
704
}
705
706
@Override
707
public InputStatus pollNext(ReaderOutput<MyRecord> output) throws Exception {
708
// Check rate limit before processing
709
if (!rateLimiter.tryAcquire(1)) {
710
return InputStatus.NOTHING_AVAILABLE;
711
}
712
713
// Regular record processing
714
MyRecord record = readNextRecord();
715
if (record != null) {
716
output.collect(record);
717
return InputStatus.MORE_AVAILABLE;
718
}
719
720
return InputStatus.NOTHING_AVAILABLE;
721
}
722
723
private MyRecord readNextRecord() {
724
// Read from external source
725
return null; // Implementation specific
726
}
727
}
728
```
729
730
## Connector Utilities and Best Practices
731
732
### Error Handling and Retry Logic
733
734
```java { .api }
735
public class RobustSinkWriter implements SinkWriter<MyRecord> {
736
private final RetryPolicy retryPolicy;
737
private final DeadLetterQueue<MyRecord> dlq;
738
739
public RobustSinkWriter(InitContext context) {
740
this.retryPolicy = RetryPolicy.builder()
741
.maxAttempts(3)
742
.backoff(Duration.ofSeconds(1), Duration.ofSeconds(30))
743
.build();
744
this.dlq = new DeadLetterQueue<>();
745
}
746
747
@Override
748
public void write(MyRecord element, Context context) throws IOException, InterruptedException {
749
retryPolicy.execute(() -> {
750
try {
751
writeToExternalSystem(element);
752
} catch (TransientException e) {
753
throw new RetryableException("Transient error, will retry", e);
754
} catch (PermanentException e) {
755
// Send to dead letter queue
756
dlq.send(element, e);
757
return; // Don't retry permanent failures
758
}
759
});
760
}
761
762
private void writeToExternalSystem(MyRecord record) throws Exception {
763
// Implementation specific
764
}
765
}
766
```
767
768
### Monitoring and Metrics
769
770
```java { .api }
771
public class InstrumentedSourceReader implements SourceReader<MyRecord, MySourceSplit> {
772
private final Counter recordsRead;
773
private final Counter errors;
774
private final Histogram readLatency;
775
private final Gauge<Integer> pendingSplits;
776
777
public InstrumentedSourceReader(SourceReaderContext context) {
778
MetricGroup metricGroup = context.metricGroup();
779
780
this.recordsRead = metricGroup.counter("records_read");
781
this.errors = metricGroup.counter("errors");
782
this.readLatency = metricGroup.histogram("read_latency");
783
this.pendingSplits = metricGroup.gauge("pending_splits",
784
() -> this.pendingSplitQueue.size());
785
}
786
787
@Override
788
public InputStatus pollNext(ReaderOutput<MyRecord> output) throws Exception {
789
long startTime = System.nanoTime();
790
791
try {
792
MyRecord record = readRecord();
793
if (record != null) {
794
output.collect(record);
795
recordsRead.inc();
796
readLatency.update(System.nanoTime() - startTime);
797
return InputStatus.MORE_AVAILABLE;
798
}
799
return InputStatus.NOTHING_AVAILABLE;
800
801
} catch (Exception e) {
802
errors.inc();
803
throw e;
804
}
805
}
806
}
807
```
808
809
### Connector Configuration
810
811
```java { .api }
812
public class ConfigurableSource implements Source<MyRecord, MySourceSplit, MyEnumeratorState> {
813
private final MySourceConfig config;
814
815
public ConfigurableSource(MySourceConfig config) {
816
this.config = config;
817
}
818
819
public static class MySourceConfig implements Serializable {
820
private final String connectionUrl;
821
private final int batchSize;
822
private final Duration pollInterval;
823
private final boolean enableMetrics;
824
825
private MySourceConfig(Builder builder) {
826
this.connectionUrl = builder.connectionUrl;
827
this.batchSize = builder.batchSize;
828
this.pollInterval = builder.pollInterval;
829
this.enableMetrics = builder.enableMetrics;
830
}
831
832
public static Builder builder() {
833
return new Builder();
834
}
835
836
public static class Builder {
837
private String connectionUrl;
838
private int batchSize = 1000;
839
private Duration pollInterval = Duration.ofSeconds(1);
840
private boolean enableMetrics = true;
841
842
public Builder connectionUrl(String url) {
843
this.connectionUrl = url;
844
return this;
845
}
846
847
public Builder batchSize(int size) {
848
this.batchSize = size;
849
return this;
850
}
851
852
public Builder pollInterval(Duration interval) {
853
this.pollInterval = interval;
854
return this;
855
}
856
857
public Builder enableMetrics(boolean enable) {
858
this.enableMetrics = enable;
859
return this;
860
}
861
862
public MySourceConfig build() {
863
Preconditions.checkNotNull(connectionUrl, "Connection URL is required");
864
return new MySourceConfig(this);
865
}
866
}
867
868
// Getters
869
public String getConnectionUrl() { return connectionUrl; }
870
public int getBatchSize() { return batchSize; }
871
public Duration getPollInterval() { return pollInterval; }
872
public boolean isMetricsEnabled() { return enableMetrics; }
873
}
874
}
875
876
// Usage
877
MySourceConfig config = MySourceConfig.builder()
878
.connectionUrl("jdbc:postgresql://localhost:5432/mydb")
879
.batchSize(500)
880
.pollInterval(Duration.ofSeconds(2))
881
.enableMetrics(true)
882
.build();
883
884
MySource source = new MySource(config);
885
```
886
887
Apache Flink's connector framework provides a powerful foundation for building efficient, fault-tolerant data sources and sinks. By understanding these APIs and following best practices, you can create connectors that integrate seamlessly with Flink's runtime and provide reliable data processing capabilities.