0
# Connector Framework
1
2
Unified connector framework for integrating with external systems, supporting both source and sink operations with exactly-once processing guarantees.
3
4
## Capabilities
5
6
### Source Framework
7
8
New unified source interface for reading data from external systems.
9
10
```java { .api }
11
/**
12
* Unified source interface
13
* @param <T> Output element type
14
* @param <SplitT> Split type
15
* @param <EnumChkT> Enumerator checkpoint type
16
*/
17
interface Source<T, SplitT extends SourceSplit, EnumChkT> {
18
/**
19
* Get source reader
20
* @param readerContext Reader context
21
* @return Source reader
22
*/
23
SourceReader<T, SplitT> createReader(SourceReaderContext readerContext);
24
25
/**
26
* Create split enumerator
27
* @param enumContext Enumerator context
28
* @return Split enumerator
29
*/
30
SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext);
31
32
/**
33
* Restore split enumerator
34
* @param enumContext Enumerator context
35
* @param checkpoint Checkpoint
36
* @return Restored enumerator
37
*/
38
SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(SplitEnumeratorContext<SplitT> enumContext, EnumChkT checkpoint);
39
40
/**
41
* Get split serializer
42
* @return Split serializer
43
*/
44
SimpleVersionedSerializer<SplitT> getSplitSerializer();
45
46
/**
47
* Get enumerator checkpoint serializer
48
* @return Checkpoint serializer
49
*/
50
SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
51
52
/**
53
* Get source boundedness
54
* @return Boundedness
55
*/
56
Boundedness getBoundedness();
57
}
58
59
/**
60
* Source reader interface
61
* @param <T> Element type
62
* @param <SplitT> Split type
63
*/
64
interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseable {
65
/**
66
* Start the reader
67
*/
68
void start();
69
70
/**
71
* Poll for next batch of records
72
* @return Input status
73
* @throws Exception
74
*/
75
InputStatus pollNext(ReaderOutput<T> output) throws Exception;
76
77
/**
78
* List completed checkpoints
79
* @return List of completed checkpoints
80
*/
81
List<SplitT> snapshotState(long checkpointId);
82
83
/**
84
* Add splits to reader
85
* @param splits Splits to add
86
*/
87
void addSplits(List<SplitT> splits);
88
89
/**
90
* Handle no more splits
91
*/
92
void notifyNoMoreSplits();
93
94
@Override
95
void close() throws Exception;
96
}
97
98
/**
99
* Split enumerator interface
100
* @param <SplitT> Split type
101
* @param <CheckpointT> Checkpoint type
102
*/
103
interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extends AutoCloseable {
104
/**
105
* Start the enumerator
106
*/
107
void start();
108
109
/**
110
* Handle split request
111
* @param subtaskId Subtask requesting splits
112
* @param requesterHostname Requester hostname
113
*/
114
void handleSplitRequest(int subtaskId, @Nullable String requesterHostname);
115
116
/**
117
* Add splits back to enumerator
118
* @param splits Splits to add back
119
* @param subtaskId Subtask ID
120
*/
121
void addSplitsBack(List<SplitT> splits, int subtaskId);
122
123
/**
124
* Add new reader
125
* @param subtaskId Subtask ID
126
*/
127
void addReader(int subtaskId);
128
129
/**
130
* Snapshot enumerator state
131
* @param checkpointId Checkpoint ID
132
* @return Checkpoint state
133
* @throws Exception
134
*/
135
CheckpointT snapshotState(long checkpointId) throws Exception;
136
137
@Override
138
void close() throws IOException;
139
}
140
```
141
142
### Sink Framework
143
144
New unified sink interface for writing data to external systems.
145
146
```java { .api }
147
/**
148
* Unified sink interface
149
* @param <InputT> Input element type
150
*/
151
interface Sink<InputT> {
152
/**
153
* Create sink writer
154
* @param context Writer initialization context
155
* @return Sink writer
156
*/
157
SinkWriter<InputT> createWriter(WriterInitContext context);
158
159
/**
160
* Restore sink writer
161
* @param context Writer initialization context
162
* @param recoveredState Recovered state
163
* @return Restored sink writer
164
* @throws IOException
165
*/
166
default SinkWriter<InputT> restoreWriter(WriterInitContext context, Collection<WriterState> recoveredState) throws IOException {
167
return createWriter(context);
168
}
169
170
/**
171
* Create committer
172
* @param <CommittableT> Committable type
173
* @return Optional committer
174
*/
175
default <CommittableT> Optional<Committer<CommittableT>> createCommitter() {
176
return Optional.empty();
177
}
178
179
/**
180
* Create global committer
181
* @param <CommittableT> Committable type
182
* @param <GlobalCommittableT> Global committable type
183
* @return Optional global committer
184
*/
185
default <CommittableT, GlobalCommittableT> Optional<GlobalCommitter<CommittableT, GlobalCommittableT>> createGlobalCommitter() {
186
return Optional.empty();
187
}
188
189
/**
190
* Get writer state serializer
191
* @return Optional state serializer
192
*/
193
default Optional<SimpleVersionedSerializer<WriterState>> getWriterStateSerializer() {
194
return Optional.empty();
195
}
196
}
197
198
/**
199
* Sink writer interface
200
* @param <InputT> Input element type
201
*/
202
interface SinkWriter<InputT> extends AutoCloseable {
203
/**
204
* Write element
205
* @param element Element to write
206
* @param context Write context
207
* @throws IOException
208
* @throws InterruptedException
209
*/
210
void write(InputT element, Context context) throws IOException, InterruptedException;
211
212
/**
213
* Flush buffered elements
214
* @param endOfInput Whether this is end of input
215
* @return List of committables
216
* @throws IOException
217
* @throws InterruptedException
218
*/
219
List<CommittableT> flush(boolean endOfInput) throws IOException, InterruptedException;
220
221
/**
222
* Snapshot writer state
223
* @param checkpointId Checkpoint ID
224
* @return Writer state
225
* @throws IOException
226
*/
227
default List<WriterState> snapshotState(long checkpointId) throws IOException {
228
return Collections.emptyList();
229
}
230
231
@Override
232
void close() throws Exception;
233
234
/**
235
* Write context
236
*/
237
interface Context {
238
/**
239
* Get current event time
240
* @return Event time
241
*/
242
long currentEventTime();
243
244
/**
245
* Get element timestamp
246
* @return Element timestamp
247
*/
248
Long timestamp();
249
}
250
}
251
```
252
253
### Base Connector Components
254
255
Base classes and interfaces for building connectors.
256
257
```java { .api }
258
/**
259
* Base split interface
260
*/
261
interface SourceSplit {
262
/**
263
* Get split ID
264
* @return Split identifier
265
*/
266
String splitId();
267
}
268
269
/**
270
* Source split with file information
271
*/
272
class FileSourceSplit implements SourceSplit {
273
/**
274
* Create file source split
275
* @param id Split ID
276
* @param path File path
277
* @param offset Start offset
278
* @param length Split length
279
*/
280
public FileSourceSplit(String id, Path path, long offset, long length);
281
282
/**
283
* Get file path
284
* @return File path
285
*/
286
public Path path();
287
288
/**
289
* Get start offset
290
* @return Start offset
291
*/
292
public long offset();
293
294
/**
295
* Get split length
296
* @return Split length
297
*/
298
public long length();
299
}
300
301
/**
302
* Input status enumeration
303
*/
304
enum InputStatus {
305
/** More data available */
306
MORE_AVAILABLE,
307
/** No more data */
308
NOTHING_AVAILABLE,
309
/** End of input */
310
END_OF_INPUT
311
}
312
313
/**
314
* Boundedness enumeration
315
*/
316
enum Boundedness {
317
/** Bounded source */
318
BOUNDED,
319
/** Unbounded source */
320
CONTINUOUS_UNBOUNDED
321
}
322
323
/**
324
* Reader output interface
325
* @param <T> Element type
326
*/
327
interface ReaderOutput<T> {
328
/**
329
* Collect element
330
* @param element Element to collect
331
*/
332
void collect(T element);
333
334
/**
335
* Collect element with timestamp
336
* @param element Element to collect
337
* @param timestamp Element timestamp
338
*/
339
void collect(T element, long timestamp);
340
341
/**
342
* Emit watermark
343
* @param watermark Watermark to emit
344
*/
345
void emitWatermark(Watermark watermark);
346
347
/**
348
* Mark source idle
349
*/
350
void markIdle();
351
352
/**
353
* Mark source active
354
*/
355
void markActive();
356
}
357
```
358
359
### File Connectors
360
361
File-based source and sink connectors.
362
363
```java { .api }
364
/**
365
* File source for reading files
366
* @param <T> Element type
367
*/
368
class FileSource<T> implements Source<T, FileSourceSplit, PendingSplitsCheckpoint> {
369
/**
370
* Create file source builder
371
* @param <T> Element type
372
* @return File source builder
373
*/
374
public static <T> FileSourceBuilder<T> forRecordStreamFormat(StreamFormat<T> streamFormat);
375
376
/**
377
* Create file source for bulk format
378
* @param bulkFormat Bulk format
379
* @param <T> Element type
380
* @return File source builder
381
*/
382
public static <T> FileSourceBuilder<T> forBulkFileFormat(BulkFormat<T, ?> bulkFormat);
383
}
384
385
/**
386
* File source builder
387
* @param <T> Element type
388
*/
389
class FileSourceBuilder<T> {
390
/**
391
* Set file paths to read
392
* @param inputPaths Input paths
393
* @return Builder
394
*/
395
public FileSourceBuilder<T> setFilePaths(Path... inputPaths);
396
397
/**
398
* Set file paths to read
399
* @param inputPaths Input paths
400
* @return Builder
401
*/
402
public FileSourceBuilder<T> setFilePaths(Collection<Path> inputPaths);
403
404
/**
405
* Monitor path for new files
406
* @param monitoredPath Path to monitor
407
* @param processingMode Processing mode
408
* @return Builder
409
*/
410
public FileSourceBuilder<T> monitorContinuously(Path monitoredPath, Duration discoveryInterval);
411
412
/**
413
* Set file path filter
414
* @param pathFilter Path filter
415
* @return Builder
416
*/
417
public FileSourceBuilder<T> setFilePathFilter(PathFilter pathFilter);
418
419
/**
420
* Build file source
421
* @return File source
422
*/
423
public FileSource<T> build();
424
}
425
426
/**
427
* Stream format interface
428
* @param <T> Element type
429
*/
430
interface StreamFormat<T> {
431
/**
432
* Create reader for input stream
433
* @param config Configuration
434
* @param inputStream Input stream
435
* @param fileLen File length
436
* @param splitEnd Split end position
437
* @return Stream format reader
438
* @throws IOException
439
*/
440
Reader<T> createReader(Configuration config, FSDataInputStream inputStream, long fileLen, long splitEnd) throws IOException;
441
442
/**
443
* Check if format is splittable
444
* @return true if splittable
445
*/
446
boolean isSplittable();
447
448
/**
449
* Stream format reader
450
* @param <T> Element type
451
*/
452
interface Reader<T> extends AutoCloseable {
453
/**
454
* Read next record
455
* @return Next record or null if end of split
456
* @throws IOException
457
*/
458
T read() throws IOException;
459
460
@Override
461
void close() throws IOException;
462
}
463
}
464
```
465
466
### Committable Types
467
468
Types used in the sink framework for two-phase commit scenarios.
469
470
```java { .api }
471
/**
472
* Base interface for committable data
473
*/
474
interface Committable {}
475
476
/**
477
* Marker interface for writer state
478
*/
479
interface WriterState {}
480
481
/**
482
* Committer interface for two-phase commit
483
* @param <CommittableT> Committable type
484
*/
485
interface Committer<CommittableT> extends AutoCloseable {
486
/**
487
* Commit the committables
488
* @param committables List of committables to commit
489
* @return List of retry committables
490
* @throws IOException
491
* @throws InterruptedException
492
*/
493
List<CommittableT> commit(List<CommittableT> committables) throws IOException, InterruptedException;
494
495
@Override
496
void close() throws Exception;
497
}
498
499
/**
500
* Global committer interface
501
* @param <CommittableT> Committable type
502
* @param <GlobalCommittableT> Global committable type
503
*/
504
interface GlobalCommitter<CommittableT, GlobalCommittableT> extends AutoCloseable {
505
/**
506
* Combine committables for global commit
507
* @param committables List of committables
508
* @return Combined global committable
509
* @throws IOException
510
*/
511
GlobalCommittableT combine(List<CommittableT> committables) throws IOException;
512
513
/**
514
* Commit global committable
515
* @param globalCommittables List of global committables
516
* @return List of retry global committables
517
* @throws IOException
518
* @throws InterruptedException
519
*/
520
List<GlobalCommittableT> commit(List<GlobalCommittableT> globalCommittables) throws IOException, InterruptedException;
521
522
@Override
523
void close() throws Exception;
524
}
525
```