Java bridge for seamless integration between Flink's Table/SQL API and DataStream API, enabling conversion between streams and tables with unified batch and stream processing.
—
Provider interfaces for advanced connector development that integrate directly with DataStream API. These providers enable custom connectors to produce and consume DataStreams while maintaining full integration with Flink's table ecosystem.
Provider interface for creating table sources that produce DataStreams directly.
/**
* Provider that produces a Java DataStream as runtime implementation for ScanTableSource
* Note: This provider is only meant for advanced connector developers
*/
public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
/**
* Creates a scan DataStream from a StreamExecutionEnvironment
* Note: Must set unique identifiers for transformations when using CompiledPlan feature
* @param providerContext Context providing utilities like UID generation
* @param execEnv StreamExecutionEnvironment for creating the DataStream
* @return DataStream producing RowData for the table source
*/
DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv);
}Usage Examples:
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
public class CustomDataStreamScanProvider implements DataStreamScanProvider {
@Override
public DataStream<RowData> produceDataStream(
ProviderContext providerContext,
StreamExecutionEnvironment execEnv) {
// Create custom data source
DataStream<RowData> sourceStream = execEnv
.addSource(new CustomSourceFunction())
.uid(providerContext.generateUid("custom-source")); // Unique ID for savepoint compatibility
// Apply transformations with unique IDs
return sourceStream
.map(new CustomRowDataMapper())
.uid(providerContext.generateUid("custom-mapper"));
}
@Override
public Optional<Integer> getParallelism() {
return Optional.of(4); // Custom parallelism
}
}Provider interface for creating table sinks that consume DataStreams directly.
/**
* Provider that consumes a Java DataStream as runtime implementation for DynamicTableSink
* Note: This provider is only meant for advanced connector developers
*/
public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
/**
* Consumes the given DataStream and returns the sink transformation
* Note: Must set unique identifiers for transformations when using CompiledPlan feature
* @param providerContext Context providing utilities like UID generation
* @param dataStream Input DataStream of RowData to consume
* @return DataStreamSink representing the sink transformation
*/
DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream);
/**
* Custom parallelism for the sink operations
* Note: If multiple transformations are applied, set same parallelism to avoid changelog issues
* @return Optional parallelism setting
*/
@Override
default Optional<Integer> getParallelism() {
return Optional.empty();
}
}Usage Examples:
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.data.RowData;
public class CustomDataStreamSinkProvider implements DataStreamSinkProvider {
@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext,
DataStream<RowData> dataStream) {
// Apply transformations to the input stream
DataStream<String> transformedStream = dataStream
.map(new RowDataToStringMapper())
.uid(providerContext.generateUid("sink-mapper"));
// Create sink with unique ID
return transformedStream
.addSink(new CustomSinkFunction())
.uid(providerContext.generateUid("custom-sink"));
}
@Override
public Optional<Integer> getParallelism() {
return Optional.of(2); // Custom parallelism for all sink operations
}
}Context interface providing utilities for connector providers.
/**
* Context providing utilities for runtime providers
*/
public interface ProviderContext {
/**
* Generates topology-wide unique identifier for transformations
* Essential for stateful upgrades and savepoint compatibility
* @param operatorName Base name for the operator
* @return Unique identifier string
*/
String generateUid(String operatorName);
}Interface for specifying custom parallelism in connector providers.
/**
* Provider interface for specifying custom parallelism
*/
public interface ParallelismProvider {
/**
* Returns custom parallelism for the connector operations
* @return Optional parallelism setting, empty means use default
*/
default Optional<Integer> getParallelism() {
return Optional.empty();
}
}Complete example of implementing a custom table source with DataStream integration.
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
public class CustomTableSource implements ScanTableSource {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
return new DataStreamScanProvider() {
@Override
public DataStream<RowData> produceDataStream(
ProviderContext providerContext,
StreamExecutionEnvironment execEnv) {
// Create source with configuration
CustomSourceFunction sourceFunction = new CustomSourceFunction(config);
return execEnv
.addSource(sourceFunction)
.uid(providerContext.generateUid("custom-table-source"))
.map(new RecordToRowDataMapper())
.uid(providerContext.generateUid("record-mapper"));
}
@Override
public Optional<Integer> getParallelism() {
return Optional.of(config.getSourceParallelism());
}
};
}
@Override
public DynamicTableSource copy() {
return new CustomTableSource();
}
@Override
public String asSummaryString() {
return "CustomTableSource";
}
}Complete example of implementing a custom table sink with DataStream integration.
import org.apache.flink.table.connector.sink.DynamicTableSink;
public class CustomTableSink implements DynamicTableSink {
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return new DataStreamSinkProvider() {
@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext,
DataStream<RowData> dataStream) {
// Apply pre-processing transformations
DataStream<CustomRecord> processedStream = dataStream
.map(new RowDataToCustomRecordMapper())
.uid(providerContext.generateUid("sink-mapper"))
.filter(new CustomRecordFilter())
.uid(providerContext.generateUid("sink-filter"));
// Create sink function
CustomSinkFunction sinkFunction = new CustomSinkFunction(config);
return processedStream
.addSink(sinkFunction)
.uid(providerContext.generateUid("custom-table-sink"));
}
@Override
public Optional<Integer> getParallelism() {
return Optional.of(config.getSinkParallelism());
}
};
}
@Override
public DynamicTableSink copy() {
return new CustomTableSink();
}
@Override
public String asSummaryString() {
return "CustomTableSink";
}
}Provider for legacy sink functions in table sinks.
/**
* Provider for sink functions in legacy table sinks
*/
public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider {
/**
* Creates a sink function for consuming table data
* @return SinkFunction instance
*/
SinkFunction<RowData> createSinkFunction();
/**
* Optional parallelism for the sink function
* @return Optional parallelism setting
*/
default Optional<Integer> getParallelism() {
return Optional.empty();
}
}Provider for legacy source functions in table sources.
/**
* Provider for source functions in legacy table sources
*/
public interface SourceFunctionProvider extends ScanTableSource.ScanRuntimeProvider {
/**
* Creates a source function for producing table data
* @return SourceFunction instance
*/
SourceFunction<RowData> createSourceFunction();
/**
* Optional parallelism for the source function
* @return Optional parallelism setting
*/
default Optional<Integer> getParallelism() {
return Optional.empty();
}
}Always use ProviderContext.generateUid() for transformation IDs to ensure savepoint compatibility:
// Good - uses context for unique IDs
DataStream<RowData> stream = execEnv
.addSource(sourceFunction)
.uid(providerContext.generateUid("my-source"))
.map(mapper)
.uid(providerContext.generateUid("my-mapper"));
// Bad - hardcoded IDs may conflict
DataStream<RowData> stream = execEnv
.addSource(sourceFunction)
.uid("hardcoded-source") // May conflict with other connectors
.map(mapper)
.uid("hardcoded-mapper");When using custom parallelism in sinks, ensure all transformations use the same parallelism:
@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext,
DataStream<RowData> dataStream) {
int customParallelism = 4;
return dataStream
.map(mapper)
.setParallelism(customParallelism) // Same parallelism
.uid(providerContext.generateUid("mapper"))
.addSink(sinkFunction)
.setParallelism(customParallelism) // Same parallelism
.uid(providerContext.generateUid("sink"));
}import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.ParallelismProvider;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider;
import org.apache.flink.legacy.table.connector.source.SourceFunctionProvider;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge