CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11

Bridge component for Apache Flink's Table/SQL API that enables Java developers to write table programs that seamlessly interact with Flink's streaming and batch processing APIs.

Pending
Overview
Eval results
Files

modern-connector-framework.mddocs/

Modern Connector Framework

The Modern Connector Framework in Flink Table API Java Bridge provides new connector interfaces following the FLIP-95 design. These interfaces offer better integration with the DataStream API and improved flexibility for connector development.

Overview

The modern connector framework introduces provider-based interfaces that integrate directly with Flink's DataStream API, replacing the legacy table source/sink interfaces. This approach provides better type safety, improved performance, and cleaner separation of concerns.

Source Providers

DataStreamScanProvider

The DataStreamScanProvider interface allows connectors to directly produce DataStream instances:

@PublicEvolving
public interface DataStreamScanProvider extends ScanRuntimeProvider {
    DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv);
}

Usage Example:

public class MyDataStreamScanProvider implements DataStreamScanProvider {
    private final MySourceConfig config;
    
    public MyDataStreamScanProvider(MySourceConfig config) {
        this.config = config;
    }
    
    @Override
    public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
        return execEnv.addSource(new MySourceFunction(config))
                     .map(new MyRowDataMapper());
    }
}

// In your DynamicTableSource implementation
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
    return new MyDataStreamScanProvider(sourceConfig);
}

SourceFunctionProvider

The SourceFunctionProvider interface provides a way to create connector sources using Flink's SourceFunction:

@PublicEvolving  
public interface SourceFunctionProvider extends ScanTableSource.ScanRuntimeProvider {
    static SourceFunctionProvider of(SourceFunction<RowData> sourceFunction, boolean isBounded);
    
    SourceFunction<RowData> createSourceFunction();
    
    // Note: isBounded() method is inherited from ScanRuntimeProvider parent interface
}

Usage Example:

// Create a bounded source function provider
SourceFunction<RowData> mySourceFunction = new MyBoundedSourceFunction(config);
SourceFunctionProvider provider = SourceFunctionProvider.of(mySourceFunction, true);

// Create an unbounded source function provider  
SourceFunction<RowData> streamingSource = new MyStreamingSourceFunction(config);
SourceFunctionProvider streamingProvider = SourceFunctionProvider.of(streamingSource, false);

// In your DynamicTableSource implementation
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
    if (config.isBounded()) {
        return SourceFunctionProvider.of(new MyBoundedSourceFunction(config), true);
    } else {
        return SourceFunctionProvider.of(new MyStreamingSourceFunction(config), false);
    }
}

Sink Providers

DataStreamSinkProvider

The DataStreamSinkProvider interface allows connectors to directly consume DataStream instances:

@PublicEvolving
public interface DataStreamSinkProvider extends SinkRuntimeProvider {
    DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream);
    
    default Optional<Integer> getParallelism() {
        return Optional.empty();
    }
}

Usage Example:

public class MyDataStreamSinkProvider implements DataStreamSinkProvider {
    private final MySinkConfig config;
    
    public MyDataStreamSinkProvider(MySinkConfig config) {
        this.config = config;
    }
    
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
        return dataStream
            .map(new MyRowDataConverter(config))
            .addSink(new MySinkFunction(config))
            .name("My Custom Sink");
    }
    
    @Override
    public Optional<Integer> getParallelism() {
        return Optional.ofNullable(config.getParallelism());
    }
}

// In your DynamicTableSink implementation
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
    return new MyDataStreamSinkProvider(sinkConfig);
}

SinkFunctionProvider

The SinkFunctionProvider interface provides a way to create connector sinks using Flink's SinkFunction:

@PublicEvolving
public interface SinkFunctionProvider extends SinkRuntimeProvider {
    static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction);
    static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction, @Nullable Integer parallelism);
    
    SinkFunction<RowData> createSinkFunction();
    
    default Optional<Integer> getParallelism() {
        return Optional.empty();
    }
}

Usage Example:

// Create a sink function provider without parallelism constraint
SinkFunction<RowData> mySinkFunction = new MySinkFunction(config);
SinkFunctionProvider provider = SinkFunctionProvider.of(mySinkFunction);

// Create a sink function provider with specific parallelism
SinkFunction<RowData> parallelSink = new MyParallelSinkFunction(config);
SinkFunctionProvider parallelProvider = SinkFunctionProvider.of(parallelSink, 4);

// In your DynamicTableSink implementation
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
    SinkFunction<RowData> sinkFunction = new MySinkFunction(sinkConfig);
    
    if (sinkConfig.getParallelism() != null) {
        return SinkFunctionProvider.of(sinkFunction, sinkConfig.getParallelism());
    } else {
        return SinkFunctionProvider.of(sinkFunction);
    }
}

Complete Connector Implementation Examples

Custom DataStream Source

public class MyCustomTableSource implements DynamicTableSource {
    private final MySourceConfig config;
    private final ResolvedSchema resolvedSchema;
    
    public MyCustomTableSource(MySourceConfig config, ResolvedSchema resolvedSchema) {
        this.config = config;
        this.resolvedSchema = resolvedSchema;
    }
    
    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
        return new DataStreamScanProvider() {
            @Override
            public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
                return execEnv
                    .addSource(new MySourceFunction(config))
                    .returns(context.createTypeInformation())
                    .map(new MyToRowDataMapper(resolvedSchema));
            }
        };
    }
    
    @Override
    public DynamicTableSource copy() {
        return new MyCustomTableSource(config, resolvedSchema);
    }
    
    @Override
    public String asSummaryString() {
        return "MyCustomSource";
    }
    
    @Override
    public ChangelogMode getChangelogMode() {
        return ChangelogMode.insertOnly();
    }
}

Custom DataStream Sink

public class MyCustomTableSink implements DynamicTableSink {
    private final MySinkConfig config;
    private final ResolvedSchema resolvedSchema;
    
    public MyCustomTableSink(MySinkConfig config, ResolvedSchema resolvedSchema) {
        this.config = config;
        this.resolvedSchema = resolvedSchema;
    }
    
    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        return new DataStreamSinkProvider() {
            @Override
            public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
                return dataStream
                    .map(new MyFromRowDataMapper(resolvedSchema))
                    .addSink(new MySinkFunction(config))
                    .name("MyCustomSink");
            }
            
            @Override
            public Optional<Integer> getParallelism() {
                return Optional.ofNullable(config.getParallelism());
            }
        };
    }
    
    @Override
    public DynamicTableSink copy() {
        return new MyCustomTableSink(config, resolvedSchema);
    }
    
    @Override
    public String asSummaryString() {
        return "MyCustomSink";
    }
    
    @Override
    public ChangelogMode getChangelogMode() {
        return ChangelogMode.insertOnly();
    }
}

Advanced Integration Patterns

Source with Watermark Strategy

public class WatermarkedDataStreamScanProvider implements DataStreamScanProvider {
    private final MySourceConfig config;
    private final WatermarkStrategy<MyRecord> watermarkStrategy;
    
    @Override
    public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
        return execEnv
            .fromSource(
                new MyFLinkSource(config),
                watermarkStrategy,
                "MyWatermarkedSource"
            )
            .map(new MyToRowDataMapper());
    }
}

Sink with State Backend Integration

public class StatefulDataStreamSinkProvider implements DataStreamSinkProvider {
    private final MySinkConfig config;
    
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
        return dataStream
            .keyBy(new MyKeySelector())
            .process(new MyStatefulSinkFunction(config))
            .addSink(new MyOutputSink(config));
    }
}

Connector with Custom Serialization

public class SerializationAwareDataStreamSinkProvider implements DataStreamSinkProvider {
    private final MySinkConfig config;
    private final ResolvedSchema schema;
    
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
        // Create custom serializer based on schema
        MyCustomSerializer serializer = new MyCustomSerializer(schema);
        
        return dataStream
            .map(rowData -> serializer.serialize(rowData))
            .addSink(new MySinkFunction(config));
    }
}

Error Handling and Resilience

Source Error Handling

public class ResilientDataStreamScanProvider implements DataStreamScanProvider {
    private final MySourceConfig config;
    
    @Override
    public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
        return execEnv
            .addSource(new MySourceFunction(config))
            .map(new MyToRowDataMapper())
            .process(new ProcessFunction<RowData, RowData>() {
                @Override
                public void processElement(RowData value, Context ctx, Collector<RowData> out) {
                    try {
                        // Validate and process
                        validateRowData(value);
                        out.collect(value);
                    } catch (Exception e) {
                        // Log error and optionally send to side output
                        getRuntimeContext().getMetricGroup()
                                         .counter("malformed_records")
                                         .inc();
                    }
                }
            });
    }
}

Sink Error Handling

public class ResilientDataStreamSinkProvider implements DataStreamSinkProvider {
    private final MySinkConfig config;
    
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
        return dataStream
            .map(new MyRowDataConverter())
            .process(new ProcessFunction<MyRecord, MyRecord>() {
                @Override
                public void processElement(MyRecord value, Context ctx, Collector<MyRecord> out) {
                    try {
                        out.collect(value);
                    } catch (Exception e) {
                        // Handle serialization errors
                        ctx.output(errorOutputTag, new ErrorRecord(value, e));
                    }
                }
            })
            .addSink(new MyResilientSinkFunction(config));
    }
}

Performance Optimization

Batching in Sinks

public class BatchingDataStreamSinkProvider implements DataStreamSinkProvider {
    private final MySinkConfig config;
    
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
        return dataStream
            .map(new MyRowDataConverter())
            .countWindow(config.getBatchSize())
            .apply(new WindowFunction<MyRecord, List<MyRecord>, GlobalWindow>() {
                @Override
                public void apply(GlobalWindow window, 
                                Iterable<MyRecord> values, 
                                Collector<List<MyRecord>> out) {
                    List<MyRecord> batch = new ArrayList<>();
                    values.forEach(batch::add);
                    out.collect(batch);
                }
            })
            .addSink(new MyBatchingSinkFunction(config));
    }
}

Parallel Processing

public class ParallelDataStreamSinkProvider implements DataStreamSinkProvider {
    private final MySinkConfig config;
    
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
        return dataStream
            .rebalance() // Distribute evenly across parallel instances
            .map(new MyRowDataConverter())
            .addSink(new MySinkFunction(config))
            .setParallelism(config.getParallelism());
    }
    
    @Override
    public Optional<Integer> getParallelism() {
        return Optional.of(config.getParallelism());
    }
}

Migration from Legacy Interfaces

When migrating from legacy StreamTableSource/StreamTableSink interfaces:

  1. Replace TableSource: Implement DynamicTableSource with DataStreamScanProvider
  2. Replace TableSink: Implement DynamicTableSink with DataStreamSinkProvider
  3. Update Factory: Implement DynamicTableSourceFactory/DynamicTableSinkFactory
  4. Handle Configuration: Use ConfigOption instead of string-based properties

Before (Legacy):

public class LegacyTableSource implements StreamTableSource<Row> {
    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
        return execEnv.addSource(new MySourceFunction());
    }
}

After (Modern):

public class ModernTableSource implements DynamicTableSource {
    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
        return new DataStreamScanProvider() {
            @Override
            public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
                return execEnv.addSource(new MySourceFunction())
                             .map(new MyRowDataMapper());
            }
        };
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11

docs

built-in-connectors.md

datastream-conversions.md

index.md

legacy-connector-support.md

modern-connector-framework.md

stream-table-environment.md

watermark-strategies.md

tile.json