Bridge component for Apache Flink's Table/SQL API that enables Java developers to write table programs that seamlessly interact with Flink's streaming and batch processing APIs.
—
The Modern Connector Framework in Flink Table API Java Bridge provides new connector interfaces following the FLIP-95 design. These interfaces offer better integration with the DataStream API and improved flexibility for connector development.
The modern connector framework introduces provider-based interfaces that integrate directly with Flink's DataStream API, replacing the legacy table source/sink interfaces. This approach provides better type safety, improved performance, and cleaner separation of concerns.
The DataStreamScanProvider interface allows connectors to directly produce DataStream instances:
@PublicEvolving
public interface DataStreamScanProvider extends ScanRuntimeProvider {
DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv);
}Usage Example:
public class MyDataStreamScanProvider implements DataStreamScanProvider {
private final MySourceConfig config;
public MyDataStreamScanProvider(MySourceConfig config) {
this.config = config;
}
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
return execEnv.addSource(new MySourceFunction(config))
.map(new MyRowDataMapper());
}
}
// In your DynamicTableSource implementation
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
return new MyDataStreamScanProvider(sourceConfig);
}The SourceFunctionProvider interface provides a way to create connector sources using Flink's SourceFunction:
@PublicEvolving
public interface SourceFunctionProvider extends ScanTableSource.ScanRuntimeProvider {
static SourceFunctionProvider of(SourceFunction<RowData> sourceFunction, boolean isBounded);
SourceFunction<RowData> createSourceFunction();
// Note: isBounded() method is inherited from ScanRuntimeProvider parent interface
}Usage Example:
// Create a bounded source function provider
SourceFunction<RowData> mySourceFunction = new MyBoundedSourceFunction(config);
SourceFunctionProvider provider = SourceFunctionProvider.of(mySourceFunction, true);
// Create an unbounded source function provider
SourceFunction<RowData> streamingSource = new MyStreamingSourceFunction(config);
SourceFunctionProvider streamingProvider = SourceFunctionProvider.of(streamingSource, false);
// In your DynamicTableSource implementation
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
if (config.isBounded()) {
return SourceFunctionProvider.of(new MyBoundedSourceFunction(config), true);
} else {
return SourceFunctionProvider.of(new MyStreamingSourceFunction(config), false);
}
}The DataStreamSinkProvider interface allows connectors to directly consume DataStream instances:
@PublicEvolving
public interface DataStreamSinkProvider extends SinkRuntimeProvider {
DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream);
default Optional<Integer> getParallelism() {
return Optional.empty();
}
}Usage Example:
public class MyDataStreamSinkProvider implements DataStreamSinkProvider {
private final MySinkConfig config;
public MyDataStreamSinkProvider(MySinkConfig config) {
this.config = config;
}
@Override
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
return dataStream
.map(new MyRowDataConverter(config))
.addSink(new MySinkFunction(config))
.name("My Custom Sink");
}
@Override
public Optional<Integer> getParallelism() {
return Optional.ofNullable(config.getParallelism());
}
}
// In your DynamicTableSink implementation
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return new MyDataStreamSinkProvider(sinkConfig);
}The SinkFunctionProvider interface provides a way to create connector sinks using Flink's SinkFunction:
@PublicEvolving
public interface SinkFunctionProvider extends SinkRuntimeProvider {
static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction);
static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction, @Nullable Integer parallelism);
SinkFunction<RowData> createSinkFunction();
default Optional<Integer> getParallelism() {
return Optional.empty();
}
}Usage Example:
// Create a sink function provider without parallelism constraint
SinkFunction<RowData> mySinkFunction = new MySinkFunction(config);
SinkFunctionProvider provider = SinkFunctionProvider.of(mySinkFunction);
// Create a sink function provider with specific parallelism
SinkFunction<RowData> parallelSink = new MyParallelSinkFunction(config);
SinkFunctionProvider parallelProvider = SinkFunctionProvider.of(parallelSink, 4);
// In your DynamicTableSink implementation
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
SinkFunction<RowData> sinkFunction = new MySinkFunction(sinkConfig);
if (sinkConfig.getParallelism() != null) {
return SinkFunctionProvider.of(sinkFunction, sinkConfig.getParallelism());
} else {
return SinkFunctionProvider.of(sinkFunction);
}
}public class MyCustomTableSource implements DynamicTableSource {
private final MySourceConfig config;
private final ResolvedSchema resolvedSchema;
public MyCustomTableSource(MySourceConfig config, ResolvedSchema resolvedSchema) {
this.config = config;
this.resolvedSchema = resolvedSchema;
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
return new DataStreamScanProvider() {
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
return execEnv
.addSource(new MySourceFunction(config))
.returns(context.createTypeInformation())
.map(new MyToRowDataMapper(resolvedSchema));
}
};
}
@Override
public DynamicTableSource copy() {
return new MyCustomTableSource(config, resolvedSchema);
}
@Override
public String asSummaryString() {
return "MyCustomSource";
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
}public class MyCustomTableSink implements DynamicTableSink {
private final MySinkConfig config;
private final ResolvedSchema resolvedSchema;
public MyCustomTableSink(MySinkConfig config, ResolvedSchema resolvedSchema) {
this.config = config;
this.resolvedSchema = resolvedSchema;
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return new DataStreamSinkProvider() {
@Override
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
return dataStream
.map(new MyFromRowDataMapper(resolvedSchema))
.addSink(new MySinkFunction(config))
.name("MyCustomSink");
}
@Override
public Optional<Integer> getParallelism() {
return Optional.ofNullable(config.getParallelism());
}
};
}
@Override
public DynamicTableSink copy() {
return new MyCustomTableSink(config, resolvedSchema);
}
@Override
public String asSummaryString() {
return "MyCustomSink";
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
}public class WatermarkedDataStreamScanProvider implements DataStreamScanProvider {
private final MySourceConfig config;
private final WatermarkStrategy<MyRecord> watermarkStrategy;
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
return execEnv
.fromSource(
new MyFLinkSource(config),
watermarkStrategy,
"MyWatermarkedSource"
)
.map(new MyToRowDataMapper());
}
}public class StatefulDataStreamSinkProvider implements DataStreamSinkProvider {
private final MySinkConfig config;
@Override
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
return dataStream
.keyBy(new MyKeySelector())
.process(new MyStatefulSinkFunction(config))
.addSink(new MyOutputSink(config));
}
}public class SerializationAwareDataStreamSinkProvider implements DataStreamSinkProvider {
private final MySinkConfig config;
private final ResolvedSchema schema;
@Override
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
// Create custom serializer based on schema
MyCustomSerializer serializer = new MyCustomSerializer(schema);
return dataStream
.map(rowData -> serializer.serialize(rowData))
.addSink(new MySinkFunction(config));
}
}public class ResilientDataStreamScanProvider implements DataStreamScanProvider {
private final MySourceConfig config;
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
return execEnv
.addSource(new MySourceFunction(config))
.map(new MyToRowDataMapper())
.process(new ProcessFunction<RowData, RowData>() {
@Override
public void processElement(RowData value, Context ctx, Collector<RowData> out) {
try {
// Validate and process
validateRowData(value);
out.collect(value);
} catch (Exception e) {
// Log error and optionally send to side output
getRuntimeContext().getMetricGroup()
.counter("malformed_records")
.inc();
}
}
});
}
}public class ResilientDataStreamSinkProvider implements DataStreamSinkProvider {
private final MySinkConfig config;
@Override
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
return dataStream
.map(new MyRowDataConverter())
.process(new ProcessFunction<MyRecord, MyRecord>() {
@Override
public void processElement(MyRecord value, Context ctx, Collector<MyRecord> out) {
try {
out.collect(value);
} catch (Exception e) {
// Handle serialization errors
ctx.output(errorOutputTag, new ErrorRecord(value, e));
}
}
})
.addSink(new MyResilientSinkFunction(config));
}
}public class BatchingDataStreamSinkProvider implements DataStreamSinkProvider {
private final MySinkConfig config;
@Override
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
return dataStream
.map(new MyRowDataConverter())
.countWindow(config.getBatchSize())
.apply(new WindowFunction<MyRecord, List<MyRecord>, GlobalWindow>() {
@Override
public void apply(GlobalWindow window,
Iterable<MyRecord> values,
Collector<List<MyRecord>> out) {
List<MyRecord> batch = new ArrayList<>();
values.forEach(batch::add);
out.collect(batch);
}
})
.addSink(new MyBatchingSinkFunction(config));
}
}public class ParallelDataStreamSinkProvider implements DataStreamSinkProvider {
private final MySinkConfig config;
@Override
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
return dataStream
.rebalance() // Distribute evenly across parallel instances
.map(new MyRowDataConverter())
.addSink(new MySinkFunction(config))
.setParallelism(config.getParallelism());
}
@Override
public Optional<Integer> getParallelism() {
return Optional.of(config.getParallelism());
}
}When migrating from legacy StreamTableSource/StreamTableSink interfaces:
DynamicTableSource with DataStreamScanProviderDynamicTableSink with DataStreamSinkProviderDynamicTableSourceFactory/DynamicTableSinkFactoryConfigOption instead of string-based propertiesBefore (Legacy):
public class LegacyTableSource implements StreamTableSource<Row> {
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
return execEnv.addSource(new MySourceFunction());
}
}After (Modern):
public class ModernTableSource implements DynamicTableSource {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
return new DataStreamScanProvider() {
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
return execEnv.addSource(new MySourceFunction())
.map(new MyRowDataMapper());
}
};
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11