or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdconnector-integration.mdcore-planning.mdenums-constants.mdexecution-nodes.mdfactory-classes.mdindex.mdtype-system.md
tile.json

connector-integration.mddocs/

Connector Integration

Connector integration interfaces and utilities enable seamless integration between custom table sources/sinks and the Flink planner. These components provide the bridge between external data systems and Flink's internal execution model.

Package Information

import org.apache.flink.table.planner.connectors.TransformationScanProvider;
import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
import org.apache.flink.table.planner.connectors.DynamicSinkUtils;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.planner.typeutils.DataViewUtils;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.data.RowData;

Capabilities

TransformationScanProvider

Provider interface for transformation-based table sources, enabling direct integration with Flink's transformation API.

public interface TransformationScanProvider extends ScanTableSource.ScanRuntimeProvider {
    
    /**
     * Creates the transformation for this scan provider.
     *
     * @param context The context containing runtime information
     * @return Transformation that produces the scanned data
     */
    Transformation<RowData> createTransformation(Context context);
    
    /**
     * Context interface providing runtime information for transformation creation.
     */
    interface Context {
        String getTableName();
        Configuration getConfiguration();
        ClassLoader getClassLoader();
        int getParallelism();
    }
}

The TransformationScanProvider allows table sources to directly provide Flink transformations rather than going through the DataStream API. This provides more control over the execution graph and better integration with the planner's optimization process.

Usage Example:

import org.apache.flink.table.planner.connectors.TransformationScanProvider;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.transformations.SourceTransformation;

public class MyTableSource implements ScanTableSource {
    
    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
        return new TransformationScanProvider() {
            @Override
            public Transformation<RowData> createTransformation(Context providerContext) {
                // Create source operator
                SourceOperator<RowData> sourceOperator = new SourceOperator<>(
                    mySourceFunction,
                    WatermarkStrategy.noWatermarks(),
                    SimpleVersionedSerializerAdapter.create(mySerializer)
                );
                
                // Create and configure transformation
                SourceTransformation<RowData> transformation = 
                    new SourceTransformation<>(
                        "MyTableSource",
                        sourceOperator,
                        TypeInformation.of(RowData.class),
                        providerContext.getParallelism()
                    );
                
                return transformation;
            }
        };
    }
}

TransformationSinkProvider

Provider interface for transformation-based table sinks, enabling direct integration with Flink's transformation API for data output.

public interface TransformationSinkProvider extends DynamicTableSink.SinkRuntimeProvider {
    
    /**
     * Creates the transformation for this sink provider.
     *
     * @param context The context containing runtime information  
     * @return Transformation that consumes the sink data
     */
    Transformation<?> createTransformation(Context context);
    
    /**
     * Context interface providing runtime information for transformation creation.
     */
    interface Context {
        String getTableName();
        Configuration getConfiguration();
        ClassLoader getClassLoader();
        int getParallelism();
        Transformation<RowData> getInputTransformation();
    }
}

The TransformationSinkProvider enables table sinks to integrate directly with the transformation graph, providing precise control over how data flows into external systems.

Usage Example:

import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.transformations.SinkTransformation;

public class MyTableSink implements DynamicTableSink {
    
    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
        return new TransformationSinkProvider() {
            @Override
            public Transformation<?> createTransformation(Context providerContext) {
                // Get input transformation
                Transformation<RowData> input = providerContext.getInputTransformation();
                
                // Create sink operator
                StreamSink<RowData> sinkOperator = new StreamSink<>(mySinkFunction);
                
                // Create sink transformation
                SinkTransformation<RowData> transformation = 
                    new SinkTransformation<>(
                        input,
                        "MyTableSink", 
                        sinkOperator,
                        providerContext.getParallelism()
                    );
                
                return transformation;
            }
        };
    }
}

DynamicSourceUtils

Utility class for converting dynamic table sources to relational nodes in the optimization process.

public final class DynamicSourceUtils {
    
    /**
     * Converts a DataStream to a RelNode for integration with Calcite optimization.
     */
    public static RelNode convertDataStreamToRel(
        StreamTableEnvironment tableEnv,
        DataStream<RowData> dataStream, 
        List<String> fieldNames
    );
    
    /**
     * Converts a table source to a RelNode with statistics for optimization.
     */
    public static RelNode convertSourceToRel(
        FlinkOptimizeContext optimizeContext,
        RelOptTable relOptTable,
        DynamicTableSource tableSource,
        FlinkStatistic statistic
    );
    
    /**
     * Creates a scan rel node from a table source.
     */
    public static RelNode createScanRelNode(
        FlinkOptimizeContext optimizeContext,
        RelOptTable relOptTable,
        DynamicTableSource tableSource
    );
}

Usage Example:

import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
import org.apache.calcite.rel.RelNode;

// Convert DataStream to RelNode for optimization
DataStream<RowData> sourceStream = // your data stream
List<String> fieldNames = Arrays.asList("id", "name", "timestamp");

RelNode relNode = DynamicSourceUtils.convertDataStreamToRel(
    tableEnv,
    sourceStream,
    fieldNames
);

// Convert table source with statistics
FlinkStatistic statistics = FlinkStatistic.builder()
    .tableStats(new TableStats(1000000L)) // 1M rows estimated
    .build();

RelNode optimizedRel = DynamicSourceUtils.convertSourceToRel(
    optimizeContext,
    relOptTable,
    myTableSource,
    statistics
);

DynamicSinkUtils

Utility class for converting dynamic table sinks to relational nodes and managing sink operations.

public final class DynamicSinkUtils {
    
    /**
     * Converts a collect sink to a RelNode for query planning.
     */
    public static RelNode convertCollectToRel(
        FlinkOptimizeContext optimizeContext,
        RelNode input,
        DynamicTableSink tableSink,
        String sinkName
    );
    
    /**
     * Converts a table sink to a RelNode for integration with optimization.
     */
    public static RelNode convertSinkToRel(
        FlinkOptimizeContext optimizeContext,
        RelNode input,
        RelOptTable relOptTable,
        DynamicTableSink tableSink,
        String sinkName
    );
    
    /**
     * Validates sink compatibility with input schema.
     */
    public static void validateSchemaCompatibility(
        ResolvedSchema inputSchema,
        ResolvedSchema sinkSchema,
        String sinkName
    );
}

Usage Example:

import org.apache.flink.table.planner.connectors.DynamicSinkUtils;

// Convert sink to RelNode for optimization
RelNode inputRel = // input relation from query
RelNode sinkRel = DynamicSinkUtils.convertSinkToRel(
    optimizeContext,
    inputRel,
    relOptTable,
    myTableSink,
    "my_output_table"
);

// Validate schema compatibility
ResolvedSchema inputSchema = // schema from query result  
ResolvedSchema sinkSchema = myTableSink.getConsumedDataType().getLogicalType();

DynamicSinkUtils.validateSchemaCompatibility(
    inputSchema,
    sinkSchema,
    "my_output_table"
);

ShortcutUtils

Utility methods for shortcut operations and performance optimizations in connector integration.

public final class ShortcutUtils {
    
    /**
     * Determines if a shortcut can be applied for the given operation.
     */
    public static boolean canApplyShortcut(
        RelNode input,
        TableSink<?> tableSink
    );
    
    /**
     * Applies shortcut optimization to bypass unnecessary transformations.
     */
    public static Transformation<?> applyShortcut(
        RelNode input,
        TableSink<?> tableSink,
        String sinkName
    );
    
    /**
     * Checks if source supports pushed down predicates.
     */
    public static boolean supportsPredicatePushDown(
        DynamicTableSource tableSource,
        List<Expression> predicates
    );
}

Usage Example:

import org.apache.flink.table.planner.utils.ShortcutUtils;

// Check if shortcut optimization can be applied
if (ShortcutUtils.canApplyShortcut(inputRel, tableSink)) {
    // Apply shortcut to bypass unnecessary operations
    Transformation<?> optimizedTransformation = ShortcutUtils.applyShortcut(
        inputRel,
        tableSink, 
        "output_table"
    );
}

// Check predicate pushdown support
List<Expression> predicates = // filter predicates from query
if (ShortcutUtils.supportsPredicatePushDown(tableSource, predicates)) {
    // Enable predicate pushdown optimization
    tableSource.applyFilters(predicates);
}

DataViewUtils

Utilities for DataView operations in aggregations, essential for stateful stream processing with custom aggregates.

public final class DataViewUtils {
    
    /**
     * Creates a state descriptor for DataView storage.
     */
    public static <T> ValueStateDescriptor<T> createDataViewStateDescriptor(
        String name,
        Class<T> dataViewClass,
        TypeInformation<T> typeInfo
    );
    
    /**
     * Binds DataView to state backend for persistence.
     */
    public static void bindDataViewToState(
        Object dataView,
        RuntimeContext runtimeContext,
        String stateName
    );
    
    /**
     * Cleans up DataView state when no longer needed.
     */
    public static void cleanupDataViewState(
        RuntimeContext runtimeContext,
        String stateName
    );
    
    /**
     * Checks if a class contains DataView fields.
     */
    public static boolean hasDataViewFields(Class<?> clazz);
}

Usage Example:

import org.apache.flink.table.planner.typeutils.DataViewUtils;
import org.apache.flink.api.common.state.ValueStateDescriptor;

// Create state descriptor for custom aggregate DataView
public class MyAggregateFunction extends TableAggregateFunction<Row, MyAccumulator> {
    
    public static class MyAccumulator {
        public MapView<String, Integer> dataView; // Custom DataView
    }
    
    @Override
    public void open(FunctionContext context) throws Exception {
        // Check if accumulator has DataView fields
        if (DataViewUtils.hasDataViewFields(MyAccumulator.class)) {
            // Create state descriptor
            ValueStateDescriptor<MapView<String, Integer>> stateDesc = 
                DataViewUtils.createDataViewStateDescriptor(
                    "myDataView",
                    MapView.class,
                    Types.MAP(Types.STRING, Types.INT)
                );
            
            // Bind DataView to state backend
            DataViewUtils.bindDataViewToState(
                accumulator.dataView,
                getRuntimeContext(),
                "myDataView"
            );
        }
    }
}

Integration Patterns

Source Integration Pattern

// Complete source integration example
public class MyCustomSource implements ScanTableSource, SupportsFilterPushDown {
    
    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
        return new TransformationScanProvider() {
            @Override
            public Transformation<RowData> createTransformation(Context providerContext) {
                // Create optimized source transformation
                return createOptimizedSourceTransformation(providerContext);
            }
        };
    }
    
    @Override
    public Result applyFilters(List<Expression> filters) {
        // Implement predicate pushdown
        List<Expression> acceptedFilters = new ArrayList<>();
        List<Expression> remainingFilters = new ArrayList<>();
        
        for (Expression filter : filters) {
            if (canPushDownFilter(filter)) {
                acceptedFilters.add(filter);
            } else {
                remainingFilters.add(filter);
            }
        }
        
        return Result.of(acceptedFilters, remainingFilters);
    }
}

Sink Integration Pattern

// Complete sink integration example  
public class MyCustomSink implements DynamicTableSink, SupportsPartitioning {
    
    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        return new TransformationSinkProvider() {
            @Override
            public Transformation<?> createTransformation(Context providerContext) {
                // Create optimized sink transformation
                return createOptimizedSinkTransformation(providerContext);
            }
        };
    }
    
    @Override
    public boolean requiresPartitionGrouping(boolean supportsGrouping) {
        // Enable partition-aware processing
        return true; 
    }
}

Error Handling in Connectors

// Robust error handling pattern
public class RobustTableSource implements ScanTableSource {
    
    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
        return new TransformationScanProvider() {
            @Override
            public Transformation<RowData> createTransformation(Context providerContext) {
                try {
                    return createSourceTransformation(providerContext);
                } catch (Exception e) {
                    throw new TableException(
                        "Failed to create source transformation for table: " + 
                        providerContext.getTableName(), e
                    );
                }
            }
        };
    }
}