0
# DataStream Connectors
1
2
Provider interfaces for advanced connector development that integrate directly with DataStream API. These providers enable custom connectors to produce and consume DataStreams while maintaining full integration with Flink's table ecosystem.
3
4
## Capabilities
5
6
### DataStream Scan Provider
7
8
Provider interface for creating table sources that produce DataStreams directly.
9
10
```java { .api }
11
/**
12
* Provider that produces a Java DataStream as runtime implementation for ScanTableSource
13
* Note: This provider is only meant for advanced connector developers
14
*/
15
public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
16
17
/**
18
* Creates a scan DataStream from a StreamExecutionEnvironment
19
* Note: Must set unique identifiers for transformations when using CompiledPlan feature
20
* @param providerContext Context providing utilities like UID generation
21
* @param execEnv StreamExecutionEnvironment for creating the DataStream
22
* @return DataStream producing RowData for the table source
23
*/
24
DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv);
25
}
26
```
27
28
**Usage Examples:**
29
30
```java
31
import org.apache.flink.table.connector.source.DataStreamScanProvider;
32
import org.apache.flink.table.connector.ProviderContext;
33
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
34
import org.apache.flink.streaming.api.datastream.DataStream;
35
import org.apache.flink.table.data.RowData;
36
37
public class CustomDataStreamScanProvider implements DataStreamScanProvider {
38
39
@Override
40
public DataStream<RowData> produceDataStream(
41
ProviderContext providerContext,
42
StreamExecutionEnvironment execEnv) {
43
44
// Create custom data source
45
DataStream<RowData> sourceStream = execEnv
46
.addSource(new CustomSourceFunction())
47
.uid(providerContext.generateUid("custom-source")); // Unique ID for savepoint compatibility
48
49
// Apply transformations with unique IDs
50
return sourceStream
51
.map(new CustomRowDataMapper())
52
.uid(providerContext.generateUid("custom-mapper"));
53
}
54
55
@Override
56
public Optional<Integer> getParallelism() {
57
return Optional.of(4); // Custom parallelism
58
}
59
}
60
```
61
62
### DataStream Sink Provider
63
64
Provider interface for creating table sinks that consume DataStreams directly.
65
66
```java { .api }
67
/**
68
* Provider that consumes a Java DataStream as runtime implementation for DynamicTableSink
69
* Note: This provider is only meant for advanced connector developers
70
*/
71
public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
72
73
/**
74
* Consumes the given DataStream and returns the sink transformation
75
* Note: Must set unique identifiers for transformations when using CompiledPlan feature
76
* @param providerContext Context providing utilities like UID generation
77
* @param dataStream Input DataStream of RowData to consume
78
* @return DataStreamSink representing the sink transformation
79
*/
80
DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream);
81
82
/**
83
* Custom parallelism for the sink operations
84
* Note: If multiple transformations are applied, set same parallelism to avoid changelog issues
85
* @return Optional parallelism setting
86
*/
87
@Override
88
default Optional<Integer> getParallelism() {
89
return Optional.empty();
90
}
91
}
92
```
93
94
**Usage Examples:**
95
96
```java
97
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
98
import org.apache.flink.streaming.api.datastream.DataStream;
99
import org.apache.flink.streaming.api.datastream.DataStreamSink;
100
import org.apache.flink.table.data.RowData;
101
102
public class CustomDataStreamSinkProvider implements DataStreamSinkProvider {
103
104
@Override
105
public DataStreamSink<?> consumeDataStream(
106
ProviderContext providerContext,
107
DataStream<RowData> dataStream) {
108
109
// Apply transformations to the input stream
110
DataStream<String> transformedStream = dataStream
111
.map(new RowDataToStringMapper())
112
.uid(providerContext.generateUid("sink-mapper"));
113
114
// Create sink with unique ID
115
return transformedStream
116
.addSink(new CustomSinkFunction())
117
.uid(providerContext.generateUid("custom-sink"));
118
}
119
120
@Override
121
public Optional<Integer> getParallelism() {
122
return Optional.of(2); // Custom parallelism for all sink operations
123
}
124
}
125
```
126
127
### Provider Context
128
129
Context interface providing utilities for connector providers.
130
131
```java { .api }
132
/**
133
* Context providing utilities for runtime providers
134
*/
135
public interface ProviderContext {
136
137
/**
138
* Generates topology-wide unique identifier for transformations
139
* Essential for stateful upgrades and savepoint compatibility
140
* @param operatorName Base name for the operator
141
* @return Unique identifier string
142
*/
143
String generateUid(String operatorName);
144
}
145
```
146
147
### Parallelism Provider
148
149
Interface for specifying custom parallelism in connector providers.
150
151
```java { .api }
152
/**
153
* Provider interface for specifying custom parallelism
154
*/
155
public interface ParallelismProvider {
156
157
/**
158
* Returns custom parallelism for the connector operations
159
* @return Optional parallelism setting, empty means use default
160
*/
161
default Optional<Integer> getParallelism() {
162
return Optional.empty();
163
}
164
}
165
```
166
167
## Advanced Implementation Patterns
168
169
### Custom Source Connector
170
171
Complete example of implementing a custom table source with DataStream integration.
172
173
```java
174
import org.apache.flink.table.connector.source.DynamicTableSource;
175
import org.apache.flink.table.connector.source.ScanTableSource;
176
177
public class CustomTableSource implements ScanTableSource {
178
179
@Override
180
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
181
return new DataStreamScanProvider() {
182
@Override
183
public DataStream<RowData> produceDataStream(
184
ProviderContext providerContext,
185
StreamExecutionEnvironment execEnv) {
186
187
// Create source with configuration
188
CustomSourceFunction sourceFunction = new CustomSourceFunction(config);
189
190
return execEnv
191
.addSource(sourceFunction)
192
.uid(providerContext.generateUid("custom-table-source"))
193
.map(new RecordToRowDataMapper())
194
.uid(providerContext.generateUid("record-mapper"));
195
}
196
197
@Override
198
public Optional<Integer> getParallelism() {
199
return Optional.of(config.getSourceParallelism());
200
}
201
};
202
}
203
204
@Override
205
public DynamicTableSource copy() {
206
return new CustomTableSource();
207
}
208
209
@Override
210
public String asSummaryString() {
211
return "CustomTableSource";
212
}
213
}
214
```
215
216
### Custom Sink Connector
217
218
Complete example of implementing a custom table sink with DataStream integration.
219
220
```java
221
import org.apache.flink.table.connector.sink.DynamicTableSink;
222
223
public class CustomTableSink implements DynamicTableSink {
224
225
@Override
226
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
227
return new DataStreamSinkProvider() {
228
@Override
229
public DataStreamSink<?> consumeDataStream(
230
ProviderContext providerContext,
231
DataStream<RowData> dataStream) {
232
233
// Apply pre-processing transformations
234
DataStream<CustomRecord> processedStream = dataStream
235
.map(new RowDataToCustomRecordMapper())
236
.uid(providerContext.generateUid("sink-mapper"))
237
.filter(new CustomRecordFilter())
238
.uid(providerContext.generateUid("sink-filter"));
239
240
// Create sink function
241
CustomSinkFunction sinkFunction = new CustomSinkFunction(config);
242
243
return processedStream
244
.addSink(sinkFunction)
245
.uid(providerContext.generateUid("custom-table-sink"));
246
}
247
248
@Override
249
public Optional<Integer> getParallelism() {
250
return Optional.of(config.getSinkParallelism());
251
}
252
};
253
}
254
255
@Override
256
public DynamicTableSink copy() {
257
return new CustomTableSink();
258
}
259
260
@Override
261
public String asSummaryString() {
262
return "CustomTableSink";
263
}
264
}
265
```
266
267
### Legacy Sink Function Provider
268
269
Provider for legacy sink functions in table sinks.
270
271
```java { .api }
272
/**
273
* Provider for sink functions in legacy table sinks
274
*/
275
public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider {
276
277
/**
278
* Creates a sink function for consuming table data
279
* @return SinkFunction instance
280
*/
281
SinkFunction<RowData> createSinkFunction();
282
283
/**
284
* Optional parallelism for the sink function
285
* @return Optional parallelism setting
286
*/
287
default Optional<Integer> getParallelism() {
288
return Optional.empty();
289
}
290
}
291
```
292
293
### Legacy Source Function Provider
294
295
Provider for legacy source functions in table sources.
296
297
```java { .api }
298
/**
299
* Provider for source functions in legacy table sources
300
*/
301
public interface SourceFunctionProvider extends ScanTableSource.ScanRuntimeProvider {
302
303
/**
304
* Creates a source function for producing table data
305
* @return SourceFunction instance
306
*/
307
SourceFunction<RowData> createSourceFunction();
308
309
/**
310
* Optional parallelism for the source function
311
* @return Optional parallelism setting
312
*/
313
default Optional<Integer> getParallelism() {
314
return Optional.empty();
315
}
316
}
317
```
318
319
## Best Practices
320
321
### Unique ID Generation
322
323
Always use ProviderContext.generateUid() for transformation IDs to ensure savepoint compatibility:
324
325
```java
326
// Good - uses context for unique IDs
327
DataStream<RowData> stream = execEnv
328
.addSource(sourceFunction)
329
.uid(providerContext.generateUid("my-source"))
330
.map(mapper)
331
.uid(providerContext.generateUid("my-mapper"));
332
333
// Bad - hardcoded IDs may conflict
334
DataStream<RowData> stream = execEnv
335
.addSource(sourceFunction)
336
.uid("hardcoded-source") // May conflict with other connectors
337
.map(mapper)
338
.uid("hardcoded-mapper");
339
```
340
341
### Parallelism Consistency
342
343
When using custom parallelism in sinks, ensure all transformations use the same parallelism:
344
345
```java
346
@Override
347
public DataStreamSink<?> consumeDataStream(
348
ProviderContext providerContext,
349
DataStream<RowData> dataStream) {
350
351
int customParallelism = 4;
352
353
return dataStream
354
.map(mapper)
355
.setParallelism(customParallelism) // Same parallelism
356
.uid(providerContext.generateUid("mapper"))
357
.addSink(sinkFunction)
358
.setParallelism(customParallelism) // Same parallelism
359
.uid(providerContext.generateUid("sink"));
360
}
361
```
362
363
## Types
364
365
### Core Connector Types
366
367
```java { .api }
368
import org.apache.flink.table.connector.source.DataStreamScanProvider;
369
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
370
import org.apache.flink.table.connector.ProviderContext;
371
import org.apache.flink.table.connector.ParallelismProvider;
372
```
373
374
### DataStream Integration Types
375
376
```java { .api }
377
import org.apache.flink.streaming.api.datastream.DataStream;
378
import org.apache.flink.streaming.api.datastream.DataStreamSink;
379
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
380
import org.apache.flink.table.data.RowData;
381
```
382
383
### Legacy Provider Types
384
385
```java { .api }
386
import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider;
387
import org.apache.flink.legacy.table.connector.source.SourceFunctionProvider;
388
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
389
import org.apache.flink.streaming.api.functions.source.SourceFunction;
390
```