0
# Connector Integration
1
2
Connector integration interfaces and utilities enable seamless integration between custom table sources/sinks and the Flink planner. These components provide the bridge between external data systems and Flink's internal execution model.
3
4
## Package Information
5
6
```java
7
import org.apache.flink.table.planner.connectors.TransformationScanProvider;
8
import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
9
import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
10
import org.apache.flink.table.planner.connectors.DynamicSinkUtils;
11
import org.apache.flink.table.planner.utils.ShortcutUtils;
12
import org.apache.flink.table.planner.typeutils.DataViewUtils;
13
import org.apache.flink.table.connector.source.ScanTableSource;
14
import org.apache.flink.table.connector.sink.DynamicTableSink;
15
import org.apache.flink.api.dag.Transformation;
16
import org.apache.flink.table.data.RowData;
17
```
18
19
## Capabilities
20
21
### TransformationScanProvider
22
23
Provider interface for transformation-based table sources, enabling direct integration with Flink's transformation API.
24
25
```java { .api }
26
public interface TransformationScanProvider extends ScanTableSource.ScanRuntimeProvider {
27
28
/**
29
* Creates the transformation for this scan provider.
30
*
31
* @param context The context containing runtime information
32
* @return Transformation that produces the scanned data
33
*/
34
Transformation<RowData> createTransformation(Context context);
35
36
/**
37
* Context interface providing runtime information for transformation creation.
38
*/
39
interface Context {
40
String getTableName();
41
Configuration getConfiguration();
42
ClassLoader getClassLoader();
43
int getParallelism();
44
}
45
}
46
```
47
48
The `TransformationScanProvider` allows table sources to directly provide Flink transformations rather than going through the DataStream API. This provides more control over the execution graph and better integration with the planner's optimization process.
49
50
**Usage Example:**
51
52
```java
53
import org.apache.flink.table.planner.connectors.TransformationScanProvider;
54
import org.apache.flink.streaming.api.operators.SourceOperator;
55
import org.apache.flink.streaming.api.transformations.SourceTransformation;
56
57
public class MyTableSource implements ScanTableSource {
58
59
@Override
60
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
61
return new TransformationScanProvider() {
62
@Override
63
public Transformation<RowData> createTransformation(Context providerContext) {
64
// Create source operator
65
SourceOperator<RowData> sourceOperator = new SourceOperator<>(
66
mySourceFunction,
67
WatermarkStrategy.noWatermarks(),
68
SimpleVersionedSerializerAdapter.create(mySerializer)
69
);
70
71
// Create and configure transformation
72
SourceTransformation<RowData> transformation =
73
new SourceTransformation<>(
74
"MyTableSource",
75
sourceOperator,
76
TypeInformation.of(RowData.class),
77
providerContext.getParallelism()
78
);
79
80
return transformation;
81
}
82
};
83
}
84
}
85
```
86
87
### TransformationSinkProvider
88
89
Provider interface for transformation-based table sinks, enabling direct integration with Flink's transformation API for data output.
90
91
```java { .api }
92
public interface TransformationSinkProvider extends DynamicTableSink.SinkRuntimeProvider {
93
94
/**
95
* Creates the transformation for this sink provider.
96
*
97
* @param context The context containing runtime information
98
* @return Transformation that consumes the sink data
99
*/
100
Transformation<?> createTransformation(Context context);
101
102
/**
103
* Context interface providing runtime information for transformation creation.
104
*/
105
interface Context {
106
String getTableName();
107
Configuration getConfiguration();
108
ClassLoader getClassLoader();
109
int getParallelism();
110
Transformation<RowData> getInputTransformation();
111
}
112
}
113
```
114
115
The `TransformationSinkProvider` enables table sinks to integrate directly with the transformation graph, providing precise control over how data flows into external systems.
116
117
**Usage Example:**
118
119
```java
120
import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
121
import org.apache.flink.streaming.api.operators.StreamSink;
122
import org.apache.flink.streaming.api.transformations.SinkTransformation;
123
124
public class MyTableSink implements DynamicTableSink {
125
126
@Override
127
public SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
128
return new TransformationSinkProvider() {
129
@Override
130
public Transformation<?> createTransformation(Context providerContext) {
131
// Get input transformation
132
Transformation<RowData> input = providerContext.getInputTransformation();
133
134
// Create sink operator
135
StreamSink<RowData> sinkOperator = new StreamSink<>(mySinkFunction);
136
137
// Create sink transformation
138
SinkTransformation<RowData> transformation =
139
new SinkTransformation<>(
140
input,
141
"MyTableSink",
142
sinkOperator,
143
providerContext.getParallelism()
144
);
145
146
return transformation;
147
}
148
};
149
}
150
}
151
```
152
153
### DynamicSourceUtils
154
155
Utility class for converting dynamic table sources to relational nodes in the optimization process.
156
157
```java { .api }
158
public final class DynamicSourceUtils {
159
160
/**
161
* Converts a DataStream to a RelNode for integration with Calcite optimization.
162
*/
163
public static RelNode convertDataStreamToRel(
164
StreamTableEnvironment tableEnv,
165
DataStream<RowData> dataStream,
166
List<String> fieldNames
167
);
168
169
/**
170
* Converts a table source to a RelNode with statistics for optimization.
171
*/
172
public static RelNode convertSourceToRel(
173
FlinkOptimizeContext optimizeContext,
174
RelOptTable relOptTable,
175
DynamicTableSource tableSource,
176
FlinkStatistic statistic
177
);
178
179
/**
180
* Creates a scan rel node from a table source.
181
*/
182
public static RelNode createScanRelNode(
183
FlinkOptimizeContext optimizeContext,
184
RelOptTable relOptTable,
185
DynamicTableSource tableSource
186
);
187
}
188
```
189
190
**Usage Example:**
191
192
```java
193
import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
194
import org.apache.calcite.rel.RelNode;
195
196
// Convert DataStream to RelNode for optimization
197
DataStream<RowData> sourceStream = // your data stream
198
List<String> fieldNames = Arrays.asList("id", "name", "timestamp");
199
200
RelNode relNode = DynamicSourceUtils.convertDataStreamToRel(
201
tableEnv,
202
sourceStream,
203
fieldNames
204
);
205
206
// Convert table source with statistics
207
FlinkStatistic statistics = FlinkStatistic.builder()
208
.tableStats(new TableStats(1000000L)) // 1M rows estimated
209
.build();
210
211
RelNode optimizedRel = DynamicSourceUtils.convertSourceToRel(
212
optimizeContext,
213
relOptTable,
214
myTableSource,
215
statistics
216
);
217
```
218
219
### DynamicSinkUtils
220
221
Utility class for converting dynamic table sinks to relational nodes and managing sink operations.
222
223
```java { .api }
224
public final class DynamicSinkUtils {
225
226
/**
227
* Converts a collect sink to a RelNode for query planning.
228
*/
229
public static RelNode convertCollectToRel(
230
FlinkOptimizeContext optimizeContext,
231
RelNode input,
232
DynamicTableSink tableSink,
233
String sinkName
234
);
235
236
/**
237
* Converts a table sink to a RelNode for integration with optimization.
238
*/
239
public static RelNode convertSinkToRel(
240
FlinkOptimizeContext optimizeContext,
241
RelNode input,
242
RelOptTable relOptTable,
243
DynamicTableSink tableSink,
244
String sinkName
245
);
246
247
/**
248
* Validates sink compatibility with input schema.
249
*/
250
public static void validateSchemaCompatibility(
251
ResolvedSchema inputSchema,
252
ResolvedSchema sinkSchema,
253
String sinkName
254
);
255
}
256
```
257
258
**Usage Example:**
259
260
```java
261
import org.apache.flink.table.planner.connectors.DynamicSinkUtils;
262
263
// Convert sink to RelNode for optimization
264
RelNode inputRel = // input relation from query
265
RelNode sinkRel = DynamicSinkUtils.convertSinkToRel(
266
optimizeContext,
267
inputRel,
268
relOptTable,
269
myTableSink,
270
"my_output_table"
271
);
272
273
// Validate schema compatibility
274
ResolvedSchema inputSchema = // schema from query result
275
ResolvedSchema sinkSchema = myTableSink.getConsumedDataType().getLogicalType();
276
277
DynamicSinkUtils.validateSchemaCompatibility(
278
inputSchema,
279
sinkSchema,
280
"my_output_table"
281
);
282
```
283
284
### ShortcutUtils
285
286
Utility methods for shortcut operations and performance optimizations in connector integration.
287
288
```java { .api }
289
public final class ShortcutUtils {
290
291
/**
292
* Determines if a shortcut can be applied for the given operation.
293
*/
294
public static boolean canApplyShortcut(
295
RelNode input,
296
TableSink<?> tableSink
297
);
298
299
/**
300
* Applies shortcut optimization to bypass unnecessary transformations.
301
*/
302
public static Transformation<?> applyShortcut(
303
RelNode input,
304
TableSink<?> tableSink,
305
String sinkName
306
);
307
308
/**
309
* Checks if source supports pushed down predicates.
310
*/
311
public static boolean supportsPredicatePushDown(
312
DynamicTableSource tableSource,
313
List<Expression> predicates
314
);
315
}
316
```
317
318
**Usage Example:**
319
320
```java
321
import org.apache.flink.table.planner.utils.ShortcutUtils;
322
323
// Check if shortcut optimization can be applied
324
if (ShortcutUtils.canApplyShortcut(inputRel, tableSink)) {
325
// Apply shortcut to bypass unnecessary operations
326
Transformation<?> optimizedTransformation = ShortcutUtils.applyShortcut(
327
inputRel,
328
tableSink,
329
"output_table"
330
);
331
}
332
333
// Check predicate pushdown support
334
List<Expression> predicates = // filter predicates from query
335
if (ShortcutUtils.supportsPredicatePushDown(tableSource, predicates)) {
336
// Enable predicate pushdown optimization
337
tableSource.applyFilters(predicates);
338
}
339
```
340
341
### DataViewUtils
342
343
Utilities for DataView operations in aggregations, essential for stateful stream processing with custom aggregates.
344
345
```java { .api }
346
public final class DataViewUtils {
347
348
/**
349
* Creates a state descriptor for DataView storage.
350
*/
351
public static <T> ValueStateDescriptor<T> createDataViewStateDescriptor(
352
String name,
353
Class<T> dataViewClass,
354
TypeInformation<T> typeInfo
355
);
356
357
/**
358
* Binds DataView to state backend for persistence.
359
*/
360
public static void bindDataViewToState(
361
Object dataView,
362
RuntimeContext runtimeContext,
363
String stateName
364
);
365
366
/**
367
* Cleans up DataView state when no longer needed.
368
*/
369
public static void cleanupDataViewState(
370
RuntimeContext runtimeContext,
371
String stateName
372
);
373
374
/**
375
* Checks if a class contains DataView fields.
376
*/
377
public static boolean hasDataViewFields(Class<?> clazz);
378
}
379
```
380
381
**Usage Example:**
382
383
```java
384
import org.apache.flink.table.planner.typeutils.DataViewUtils;
385
import org.apache.flink.api.common.state.ValueStateDescriptor;
386
387
// Create state descriptor for custom aggregate DataView
388
public class MyAggregateFunction extends TableAggregateFunction<Row, MyAccumulator> {
389
390
public static class MyAccumulator {
391
public MapView<String, Integer> dataView; // Custom DataView
392
}
393
394
@Override
395
public void open(FunctionContext context) throws Exception {
396
// Check if accumulator has DataView fields
397
if (DataViewUtils.hasDataViewFields(MyAccumulator.class)) {
398
// Create state descriptor
399
ValueStateDescriptor<MapView<String, Integer>> stateDesc =
400
DataViewUtils.createDataViewStateDescriptor(
401
"myDataView",
402
MapView.class,
403
Types.MAP(Types.STRING, Types.INT)
404
);
405
406
// Bind DataView to state backend
407
DataViewUtils.bindDataViewToState(
408
accumulator.dataView,
409
getRuntimeContext(),
410
"myDataView"
411
);
412
}
413
}
414
}
415
```
416
417
## Integration Patterns
418
419
### Source Integration Pattern
420
421
```java
422
// Complete source integration example
423
public class MyCustomSource implements ScanTableSource, SupportsFilterPushDown {
424
425
@Override
426
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
427
return new TransformationScanProvider() {
428
@Override
429
public Transformation<RowData> createTransformation(Context providerContext) {
430
// Create optimized source transformation
431
return createOptimizedSourceTransformation(providerContext);
432
}
433
};
434
}
435
436
@Override
437
public Result applyFilters(List<Expression> filters) {
438
// Implement predicate pushdown
439
List<Expression> acceptedFilters = new ArrayList<>();
440
List<Expression> remainingFilters = new ArrayList<>();
441
442
for (Expression filter : filters) {
443
if (canPushDownFilter(filter)) {
444
acceptedFilters.add(filter);
445
} else {
446
remainingFilters.add(filter);
447
}
448
}
449
450
return Result.of(acceptedFilters, remainingFilters);
451
}
452
}
453
```
454
455
### Sink Integration Pattern
456
457
```java
458
// Complete sink integration example
459
public class MyCustomSink implements DynamicTableSink, SupportsPartitioning {
460
461
@Override
462
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
463
return new TransformationSinkProvider() {
464
@Override
465
public Transformation<?> createTransformation(Context providerContext) {
466
// Create optimized sink transformation
467
return createOptimizedSinkTransformation(providerContext);
468
}
469
};
470
}
471
472
@Override
473
public boolean requiresPartitionGrouping(boolean supportsGrouping) {
474
// Enable partition-aware processing
475
return true;
476
}
477
}
478
```
479
480
### Error Handling in Connectors
481
482
```java
483
// Robust error handling pattern
484
public class RobustTableSource implements ScanTableSource {
485
486
@Override
487
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
488
return new TransformationScanProvider() {
489
@Override
490
public Transformation<RowData> createTransformation(Context providerContext) {
491
try {
492
return createSourceTransformation(providerContext);
493
} catch (Exception e) {
494
throw new TableException(
495
"Failed to create source transformation for table: " +
496
providerContext.getTableName(), e
497
);
498
}
499
}
500
};
501
}
502
}
503
```