0
# Connector Framework
1
2
Provider interfaces for implementing custom table sources and sinks that integrate with the DataStream API. These interfaces enable developers to create connectors that seamlessly bridge external systems with Flink's table ecosystem.
3
4
## Capabilities
5
6
### Source Providers
7
8
Interfaces for implementing table sources that produce DataStream instances.
9
10
#### DataStream Scan Provider
11
12
Creates DataStream-based table sources with direct DataStream integration.
13
14
```java { .api }
15
/**
16
* Provider that produces a DataStream as runtime implementation for ScanTableSource
17
* Note: For advanced connector developers. Usually prefer SourceProvider or SourceFunctionProvider
18
*/
19
public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
20
21
/**
22
* Creates a scan DataStream from StreamExecutionEnvironment with provider context
23
* @param providerContext Context for generating unique identifiers and accessing framework services
24
* @param execEnv The StreamExecutionEnvironment for DataStream creation
25
* @return DataStream of RowData for table consumption
26
*/
27
DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv);
28
29
/**
30
* @deprecated Use produceDataStream(ProviderContext, StreamExecutionEnvironment) instead
31
*/
32
@Deprecated
33
DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv);
34
}
35
```
36
37
#### Source Function Provider
38
39
Creates SourceFunction-based table sources for more traditional streaming patterns.
40
41
```java { .api }
42
/**
43
* Provider that produces a SourceFunction as runtime implementation for ScanTableSource
44
*/
45
public interface SourceFunctionProvider extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
46
47
/**
48
* Creates a SourceFunction instance for data production
49
* @return SourceFunction that produces RowData records
50
*/
51
SourceFunction<RowData> createSourceFunction();
52
53
/**
54
* Indicates whether the source is bounded (finite) or unbounded (infinite)
55
* @return true if source is bounded, false for unbounded streams
56
*/
57
boolean isBounded();
58
}
59
```
60
61
**Usage Examples:**
62
63
```java
64
import org.apache.flink.streaming.api.functions.source.SourceFunction;
65
import org.apache.flink.table.data.RowData;
66
import org.apache.flink.table.connector.source.SourceFunctionProvider;
67
68
// Custom source function provider
69
public class CustomSourceProvider implements SourceFunctionProvider {
70
71
@Override
72
public SourceFunction<RowData> createSourceFunction() {
73
return new CustomSourceFunction();
74
}
75
76
@Override
77
public boolean isBounded() {
78
return false; // Unbounded stream
79
}
80
81
@Override
82
public Optional<Integer> getParallelism() {
83
return Optional.of(4); // Fixed parallelism
84
}
85
}
86
87
// Custom DataStream scan provider
88
public class CustomDataStreamProvider implements DataStreamScanProvider {
89
90
@Override
91
public DataStream<RowData> produceDataStream(
92
ProviderContext providerContext,
93
StreamExecutionEnvironment execEnv) {
94
95
return execEnv
96
.addSource(new CustomSourceFunction())
97
.uid(providerContext.generateUid("custom-source"))
98
.name("Custom Table Source");
99
}
100
}
101
```
102
103
### Sink Providers
104
105
Interfaces for implementing table sinks that consume DataStream instances.
106
107
#### DataStream Sink Provider
108
109
Creates DataStream-based table sinks with direct DataStream integration.
110
111
```java { .api }
112
/**
113
* Provider that consumes a DataStream as runtime implementation for DynamicTableSink
114
* Note: For advanced connector developers. Usually prefer SinkProvider or SinkFunctionProvider
115
*/
116
public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider {
117
118
/**
119
* Consumes a DataStream and returns the sink transformation
120
* @param providerContext Context for generating unique identifiers
121
* @param dataStream The input DataStream of RowData to consume
122
* @return DataStreamSink transformation for the sink operation
123
*/
124
DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream);
125
126
/**
127
* @deprecated Use consumeDataStream(ProviderContext, DataStream) instead
128
*/
129
@Deprecated
130
DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream);
131
}
132
```
133
134
#### Sink Function Provider
135
136
Creates SinkFunction-based table sinks for more traditional output patterns.
137
138
```java { .api }
139
/**
140
* Provider that produces a SinkFunction as runtime implementation for DynamicTableSink
141
*/
142
public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider {
143
144
/**
145
* Creates a SinkFunction instance for data consumption
146
* @return SinkFunction that consumes RowData records
147
*/
148
SinkFunction<RowData> createSinkFunction();
149
}
150
```
151
152
**Usage Examples:**
153
154
```java
155
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
156
import org.apache.flink.streaming.api.datastream.DataStreamSink;
157
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
158
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
159
160
// Custom sink function provider
161
public class CustomSinkFunctionProvider implements SinkFunctionProvider {
162
163
@Override
164
public SinkFunction<RowData> createSinkFunction() {
165
return new CustomSinkFunction();
166
}
167
}
168
169
// Custom DataStream sink provider
170
public class CustomDataStreamSinkProvider implements DataStreamSinkProvider {
171
172
@Override
173
public DataStreamSink<?> consumeDataStream(
174
ProviderContext providerContext,
175
DataStream<RowData> dataStream) {
176
177
return dataStream
178
.addSink(new CustomSinkFunction())
179
.uid(providerContext.generateUid("custom-sink"))
180
.name("Custom Table Sink");
181
}
182
}
183
```
184
185
### Provider Context
186
187
Context interface providing framework services to connector implementations.
188
189
```java { .api }
190
/**
191
* Context for providing runtime instances in connectors
192
*/
193
public interface ProviderContext {
194
195
/**
196
* Generates a unique identifier for transformations/operators in the DataStream
197
* This enables stateful Flink version upgrades by providing consistent operator IDs
198
* @param name Base name for the identifier
199
* @return Unique identifier string for the transformation
200
*/
201
String generateUid(String name);
202
}
203
```
204
205
**Usage Examples:**
206
207
```java
208
// Using ProviderContext for unique ID generation
209
@Override
210
public DataStream<RowData> produceDataStream(
211
ProviderContext providerContext,
212
StreamExecutionEnvironment execEnv) {
213
214
return execEnv
215
.addSource(sourceFunction)
216
.uid(providerContext.generateUid("kafka-source")) // Unique source ID
217
.name("Kafka Table Source")
218
.map(transformFunction)
219
.uid(providerContext.generateUid("kafka-transform")) // Unique transform ID
220
.name("Kafka Data Transform");
221
}
222
```
223
224
### Factory Integration
225
226
Connector providers are typically created by DynamicTableSourceFactory and DynamicTableSinkFactory implementations.
227
228
```java { .api }
229
// Example factory implementation pattern
230
public class CustomTableSourceFactory implements DynamicTableSourceFactory {
231
232
@Override
233
public String factoryIdentifier() {
234
return "custom";
235
}
236
237
@Override
238
public DynamicTableSource createDynamicTableSource(Context context) {
239
return new CustomDynamicTableSource();
240
}
241
}
242
243
// Custom table source with provider
244
public class CustomDynamicTableSource implements ScanTableSource {
245
246
@Override
247
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
248
return new CustomDataStreamProvider();
249
}
250
}
251
```
252
253
## Type Definitions
254
255
### Runtime Provider Hierarchy
256
257
```java { .api }
258
// Source provider hierarchy
259
interface ScanTableSource.ScanRuntimeProvider { }
260
├── SourceProvider // New source API (preferred)
261
├── SourceFunctionProvider // Legacy SourceFunction API
262
└── DataStreamScanProvider // Direct DataStream integration
263
264
// Sink provider hierarchy
265
interface DynamicTableSink.SinkRuntimeProvider { }
266
├── SinkProvider // New sink API (preferred)
267
├── SinkFunctionProvider // Legacy SinkFunction API
268
└── DataStreamSinkProvider // Direct DataStream integration
269
```
270
271
### Parallelism Provider
272
273
```java { .api }
274
/**
275
* Optional interface for specifying parallelism constraints
276
*/
277
public interface ParallelismProvider {
278
279
/**
280
* Returns the parallelism for the operation
281
* @return Optional parallelism value, empty for default parallelism
282
*/
283
Optional<Integer> getParallelism();
284
}
285
```
286
287
### Context Interfaces
288
289
```java { .api }
290
// Scan context for source providers
291
interface ScanTableSource.ScanContext {
292
// Framework-provided context for scan operations
293
}
294
295
// Sink context for sink providers
296
interface DynamicTableSink.SinkContext {
297
// Framework-provided context for sink operations
298
}
299
300
// Factory context for table creation
301
interface DynamicTableFactory.Context {
302
// Configuration and metadata access
303
ReadableConfig getConfiguration();
304
String[] getIdentifier();
305
TableSchema getSchema();
306
}
307
```
308
309
### Data Types
310
311
```java { .api }
312
import org.apache.flink.table.data.RowData;
313
314
// RowData is the internal representation for table data
315
// Provides efficient serialization and field access
316
// Use RowData.createFieldGetter() for field extraction
317
// Use RowDataBuilder for construction
318
```
319
320
## Best Practices
321
322
### Unique ID Generation
323
324
Always use ProviderContext.generateUid() for operator IDs to support stateful upgrades:
325
326
```java
327
// Good: Consistent operator IDs
328
.uid(providerContext.generateUid("source"))
329
.uid(providerContext.generateUid("transform"))
330
331
// Bad: Manual or missing IDs
332
.uid("my-source") // Risk of conflicts
333
// Missing .uid() // Risk of savepoint incompatibility
334
```
335
336
### Provider Selection
337
338
Choose the appropriate provider based on your needs:
339
340
- **SourceProvider/SinkProvider**: Use for new implementations (preferred)
341
- **SourceFunctionProvider/SinkFunctionProvider**: Use for legacy compatibility
342
- **DataStreamScanProvider/DataStreamSinkProvider**: Use for complex DataStream integration
343
344
### Error Handling
345
346
Implement proper error handling and resource cleanup in custom providers:
347
348
```java
349
@Override
350
public DataStream<RowData> produceDataStream(
351
ProviderContext providerContext,
352
StreamExecutionEnvironment execEnv) {
353
354
try {
355
// Create and configure source
356
CustomSourceFunction source = new CustomSourceFunction(config);
357
358
return execEnv
359
.addSource(source)
360
.uid(providerContext.generateUid("custom-source"));
361
362
} catch (Exception e) {
363
throw new RuntimeException("Failed to create custom source", e);
364
}
365
}