Connector integration interfaces and utilities enable seamless integration between custom table sources/sinks and the Flink planner. These components provide the bridge between external data systems and Flink's internal execution model.
import org.apache.flink.table.planner.connectors.TransformationScanProvider;
import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
import org.apache.flink.table.planner.connectors.DynamicSinkUtils;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.planner.typeutils.DataViewUtils;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.data.RowData;Provider interface for transformation-based table sources, enabling direct integration with Flink's transformation API.
public interface TransformationScanProvider extends ScanTableSource.ScanRuntimeProvider {
/**
* Creates the transformation for this scan provider.
*
* @param context The context containing runtime information
* @return Transformation that produces the scanned data
*/
Transformation<RowData> createTransformation(Context context);
/**
* Context interface providing runtime information for transformation creation.
*/
interface Context {
String getTableName();
Configuration getConfiguration();
ClassLoader getClassLoader();
int getParallelism();
}
}The TransformationScanProvider allows table sources to directly provide Flink transformations rather than going through the DataStream API. This provides more control over the execution graph and better integration with the planner's optimization process.
Usage Example:
import org.apache.flink.table.planner.connectors.TransformationScanProvider;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
public class MyTableSource implements ScanTableSource {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
return new TransformationScanProvider() {
@Override
public Transformation<RowData> createTransformation(Context providerContext) {
// Create source operator
SourceOperator<RowData> sourceOperator = new SourceOperator<>(
mySourceFunction,
WatermarkStrategy.noWatermarks(),
SimpleVersionedSerializerAdapter.create(mySerializer)
);
// Create and configure transformation
SourceTransformation<RowData> transformation =
new SourceTransformation<>(
"MyTableSource",
sourceOperator,
TypeInformation.of(RowData.class),
providerContext.getParallelism()
);
return transformation;
}
};
}
}Provider interface for transformation-based table sinks, enabling direct integration with Flink's transformation API for data output.
public interface TransformationSinkProvider extends DynamicTableSink.SinkRuntimeProvider {
/**
* Creates the transformation for this sink provider.
*
* @param context The context containing runtime information
* @return Transformation that consumes the sink data
*/
Transformation<?> createTransformation(Context context);
/**
* Context interface providing runtime information for transformation creation.
*/
interface Context {
String getTableName();
Configuration getConfiguration();
ClassLoader getClassLoader();
int getParallelism();
Transformation<RowData> getInputTransformation();
}
}The TransformationSinkProvider enables table sinks to integrate directly with the transformation graph, providing precise control over how data flows into external systems.
Usage Example:
import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
public class MyTableSink implements DynamicTableSink {
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
return new TransformationSinkProvider() {
@Override
public Transformation<?> createTransformation(Context providerContext) {
// Get input transformation
Transformation<RowData> input = providerContext.getInputTransformation();
// Create sink operator
StreamSink<RowData> sinkOperator = new StreamSink<>(mySinkFunction);
// Create sink transformation
SinkTransformation<RowData> transformation =
new SinkTransformation<>(
input,
"MyTableSink",
sinkOperator,
providerContext.getParallelism()
);
return transformation;
}
};
}
}Utility class for converting dynamic table sources to relational nodes in the optimization process.
public final class DynamicSourceUtils {
/**
* Converts a DataStream to a RelNode for integration with Calcite optimization.
*/
public static RelNode convertDataStreamToRel(
StreamTableEnvironment tableEnv,
DataStream<RowData> dataStream,
List<String> fieldNames
);
/**
* Converts a table source to a RelNode with statistics for optimization.
*/
public static RelNode convertSourceToRel(
FlinkOptimizeContext optimizeContext,
RelOptTable relOptTable,
DynamicTableSource tableSource,
FlinkStatistic statistic
);
/**
* Creates a scan rel node from a table source.
*/
public static RelNode createScanRelNode(
FlinkOptimizeContext optimizeContext,
RelOptTable relOptTable,
DynamicTableSource tableSource
);
}Usage Example:
import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
import org.apache.calcite.rel.RelNode;
// Convert DataStream to RelNode for optimization
DataStream<RowData> sourceStream = // your data stream
List<String> fieldNames = Arrays.asList("id", "name", "timestamp");
RelNode relNode = DynamicSourceUtils.convertDataStreamToRel(
tableEnv,
sourceStream,
fieldNames
);
// Convert table source with statistics
FlinkStatistic statistics = FlinkStatistic.builder()
.tableStats(new TableStats(1000000L)) // 1M rows estimated
.build();
RelNode optimizedRel = DynamicSourceUtils.convertSourceToRel(
optimizeContext,
relOptTable,
myTableSource,
statistics
);Utility class for converting dynamic table sinks to relational nodes and managing sink operations.
public final class DynamicSinkUtils {
/**
* Converts a collect sink to a RelNode for query planning.
*/
public static RelNode convertCollectToRel(
FlinkOptimizeContext optimizeContext,
RelNode input,
DynamicTableSink tableSink,
String sinkName
);
/**
* Converts a table sink to a RelNode for integration with optimization.
*/
public static RelNode convertSinkToRel(
FlinkOptimizeContext optimizeContext,
RelNode input,
RelOptTable relOptTable,
DynamicTableSink tableSink,
String sinkName
);
/**
* Validates sink compatibility with input schema.
*/
public static void validateSchemaCompatibility(
ResolvedSchema inputSchema,
ResolvedSchema sinkSchema,
String sinkName
);
}Usage Example:
import org.apache.flink.table.planner.connectors.DynamicSinkUtils;
// Convert sink to RelNode for optimization
RelNode inputRel = // input relation from query
RelNode sinkRel = DynamicSinkUtils.convertSinkToRel(
optimizeContext,
inputRel,
relOptTable,
myTableSink,
"my_output_table"
);
// Validate schema compatibility
ResolvedSchema inputSchema = // schema from query result
ResolvedSchema sinkSchema = myTableSink.getConsumedDataType().getLogicalType();
DynamicSinkUtils.validateSchemaCompatibility(
inputSchema,
sinkSchema,
"my_output_table"
);Utility methods for shortcut operations and performance optimizations in connector integration.
public final class ShortcutUtils {
/**
* Determines if a shortcut can be applied for the given operation.
*/
public static boolean canApplyShortcut(
RelNode input,
TableSink<?> tableSink
);
/**
* Applies shortcut optimization to bypass unnecessary transformations.
*/
public static Transformation<?> applyShortcut(
RelNode input,
TableSink<?> tableSink,
String sinkName
);
/**
* Checks if source supports pushed down predicates.
*/
public static boolean supportsPredicatePushDown(
DynamicTableSource tableSource,
List<Expression> predicates
);
}Usage Example:
import org.apache.flink.table.planner.utils.ShortcutUtils;
// Check if shortcut optimization can be applied
if (ShortcutUtils.canApplyShortcut(inputRel, tableSink)) {
// Apply shortcut to bypass unnecessary operations
Transformation<?> optimizedTransformation = ShortcutUtils.applyShortcut(
inputRel,
tableSink,
"output_table"
);
}
// Check predicate pushdown support
List<Expression> predicates = // filter predicates from query
if (ShortcutUtils.supportsPredicatePushDown(tableSource, predicates)) {
// Enable predicate pushdown optimization
tableSource.applyFilters(predicates);
}Utilities for DataView operations in aggregations, essential for stateful stream processing with custom aggregates.
public final class DataViewUtils {
/**
* Creates a state descriptor for DataView storage.
*/
public static <T> ValueStateDescriptor<T> createDataViewStateDescriptor(
String name,
Class<T> dataViewClass,
TypeInformation<T> typeInfo
);
/**
* Binds DataView to state backend for persistence.
*/
public static void bindDataViewToState(
Object dataView,
RuntimeContext runtimeContext,
String stateName
);
/**
* Cleans up DataView state when no longer needed.
*/
public static void cleanupDataViewState(
RuntimeContext runtimeContext,
String stateName
);
/**
* Checks if a class contains DataView fields.
*/
public static boolean hasDataViewFields(Class<?> clazz);
}Usage Example:
import org.apache.flink.table.planner.typeutils.DataViewUtils;
import org.apache.flink.api.common.state.ValueStateDescriptor;
// Create state descriptor for custom aggregate DataView
public class MyAggregateFunction extends TableAggregateFunction<Row, MyAccumulator> {
public static class MyAccumulator {
public MapView<String, Integer> dataView; // Custom DataView
}
@Override
public void open(FunctionContext context) throws Exception {
// Check if accumulator has DataView fields
if (DataViewUtils.hasDataViewFields(MyAccumulator.class)) {
// Create state descriptor
ValueStateDescriptor<MapView<String, Integer>> stateDesc =
DataViewUtils.createDataViewStateDescriptor(
"myDataView",
MapView.class,
Types.MAP(Types.STRING, Types.INT)
);
// Bind DataView to state backend
DataViewUtils.bindDataViewToState(
accumulator.dataView,
getRuntimeContext(),
"myDataView"
);
}
}
}// Complete source integration example
public class MyCustomSource implements ScanTableSource, SupportsFilterPushDown {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
return new TransformationScanProvider() {
@Override
public Transformation<RowData> createTransformation(Context providerContext) {
// Create optimized source transformation
return createOptimizedSourceTransformation(providerContext);
}
};
}
@Override
public Result applyFilters(List<Expression> filters) {
// Implement predicate pushdown
List<Expression> acceptedFilters = new ArrayList<>();
List<Expression> remainingFilters = new ArrayList<>();
for (Expression filter : filters) {
if (canPushDownFilter(filter)) {
acceptedFilters.add(filter);
} else {
remainingFilters.add(filter);
}
}
return Result.of(acceptedFilters, remainingFilters);
}
}// Complete sink integration example
public class MyCustomSink implements DynamicTableSink, SupportsPartitioning {
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return new TransformationSinkProvider() {
@Override
public Transformation<?> createTransformation(Context providerContext) {
// Create optimized sink transformation
return createOptimizedSinkTransformation(providerContext);
}
};
}
@Override
public boolean requiresPartitionGrouping(boolean supportsGrouping) {
// Enable partition-aware processing
return true;
}
}// Robust error handling pattern
public class RobustTableSource implements ScanTableSource {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
return new TransformationScanProvider() {
@Override
public Transformation<RowData> createTransformation(Context providerContext) {
try {
return createSourceTransformation(providerContext);
} catch (Exception e) {
throw new TableException(
"Failed to create source transformation for table: " +
providerContext.getTableName(), e
);
}
}
};
}
}