CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java-bridge

Java bridge for seamless integration between Flink's Table/SQL API and DataStream API, enabling conversion between streams and tables with unified batch and stream processing.

Pending
Overview
Eval results
Files

datastream-connectors.mddocs/

DataStream Connectors

Provider interfaces for advanced connector development that integrate directly with DataStream API. These providers enable custom connectors to produce and consume DataStreams while maintaining full integration with Flink's table ecosystem.

Capabilities

DataStream Scan Provider

Provider interface for creating table sources that produce DataStreams directly.

/**
 * Provider that produces a Java DataStream as runtime implementation for ScanTableSource
 * Note: This provider is only meant for advanced connector developers
 */
public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
    
    /**
     * Creates a scan DataStream from a StreamExecutionEnvironment
     * Note: Must set unique identifiers for transformations when using CompiledPlan feature
     * @param providerContext Context providing utilities like UID generation
     * @param execEnv StreamExecutionEnvironment for creating the DataStream
     * @return DataStream producing RowData for the table source
     */
    DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv);
}

Usage Examples:

import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;

public class CustomDataStreamScanProvider implements DataStreamScanProvider {
    
    @Override
    public DataStream<RowData> produceDataStream(
            ProviderContext providerContext, 
            StreamExecutionEnvironment execEnv) {
        
        // Create custom data source
        DataStream<RowData> sourceStream = execEnv
            .addSource(new CustomSourceFunction())
            .uid(providerContext.generateUid("custom-source")); // Unique ID for savepoint compatibility
        
        // Apply transformations with unique IDs
        return sourceStream
            .map(new CustomRowDataMapper())
            .uid(providerContext.generateUid("custom-mapper"));
    }
    
    @Override
    public Optional<Integer> getParallelism() {
        return Optional.of(4); // Custom parallelism
    }
}

DataStream Sink Provider

Provider interface for creating table sinks that consume DataStreams directly.

/**
 * Provider that consumes a Java DataStream as runtime implementation for DynamicTableSink
 * Note: This provider is only meant for advanced connector developers
 */
public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
    
    /**
     * Consumes the given DataStream and returns the sink transformation
     * Note: Must set unique identifiers for transformations when using CompiledPlan feature
     * @param providerContext Context providing utilities like UID generation
     * @param dataStream Input DataStream of RowData to consume
     * @return DataStreamSink representing the sink transformation
     */
    DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream);
    
    /**
     * Custom parallelism for the sink operations
     * Note: If multiple transformations are applied, set same parallelism to avoid changelog issues
     * @return Optional parallelism setting
     */
    @Override
    default Optional<Integer> getParallelism() {
        return Optional.empty();
    }
}

Usage Examples:

import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.data.RowData;

public class CustomDataStreamSinkProvider implements DataStreamSinkProvider {
    
    @Override
    public DataStreamSink<?> consumeDataStream(
            ProviderContext providerContext,
            DataStream<RowData> dataStream) {
        
        // Apply transformations to the input stream
        DataStream<String> transformedStream = dataStream
            .map(new RowDataToStringMapper())
            .uid(providerContext.generateUid("sink-mapper"));
        
        // Create sink with unique ID
        return transformedStream
            .addSink(new CustomSinkFunction())
            .uid(providerContext.generateUid("custom-sink"));
    }
    
    @Override
    public Optional<Integer> getParallelism() {
        return Optional.of(2); // Custom parallelism for all sink operations
    }
}

Provider Context

Context interface providing utilities for connector providers.

/**
 * Context providing utilities for runtime providers
 */
public interface ProviderContext {
    
    /**
     * Generates topology-wide unique identifier for transformations
     * Essential for stateful upgrades and savepoint compatibility
     * @param operatorName Base name for the operator
     * @return Unique identifier string
     */
    String generateUid(String operatorName);
}

Parallelism Provider

Interface for specifying custom parallelism in connector providers.

/**
 * Provider interface for specifying custom parallelism
 */
public interface ParallelismProvider {
    
    /**
     * Returns custom parallelism for the connector operations
     * @return Optional parallelism setting, empty means use default
     */
    default Optional<Integer> getParallelism() {
        return Optional.empty();
    }
}

Advanced Implementation Patterns

Custom Source Connector

Complete example of implementing a custom table source with DataStream integration.

import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;

public class CustomTableSource implements ScanTableSource {
    
    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
        return new DataStreamScanProvider() {
            @Override
            public DataStream<RowData> produceDataStream(
                    ProviderContext providerContext,
                    StreamExecutionEnvironment execEnv) {
                
                // Create source with configuration
                CustomSourceFunction sourceFunction = new CustomSourceFunction(config);
                
                return execEnv
                    .addSource(sourceFunction)
                    .uid(providerContext.generateUid("custom-table-source"))
                    .map(new RecordToRowDataMapper())
                    .uid(providerContext.generateUid("record-mapper"));
            }
            
            @Override
            public Optional<Integer> getParallelism() {
                return Optional.of(config.getSourceParallelism());
            }
        };
    }
    
    @Override
    public DynamicTableSource copy() {
        return new CustomTableSource();
    }
    
    @Override
    public String asSummaryString() {
        return "CustomTableSource";
    }
}

Custom Sink Connector

Complete example of implementing a custom table sink with DataStream integration.

import org.apache.flink.table.connector.sink.DynamicTableSink;

public class CustomTableSink implements DynamicTableSink {
    
    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        return new DataStreamSinkProvider() {
            @Override
            public DataStreamSink<?> consumeDataStream(
                    ProviderContext providerContext,
                    DataStream<RowData> dataStream) {
                
                // Apply pre-processing transformations
                DataStream<CustomRecord> processedStream = dataStream
                    .map(new RowDataToCustomRecordMapper())
                    .uid(providerContext.generateUid("sink-mapper"))
                    .filter(new CustomRecordFilter())
                    .uid(providerContext.generateUid("sink-filter"));
                
                // Create sink function
                CustomSinkFunction sinkFunction = new CustomSinkFunction(config);
                
                return processedStream
                    .addSink(sinkFunction)
                    .uid(providerContext.generateUid("custom-table-sink"));
            }
            
            @Override
            public Optional<Integer> getParallelism() {
                return Optional.of(config.getSinkParallelism());
            }
        };
    }
    
    @Override
    public DynamicTableSink copy() {
        return new CustomTableSink();
    }
    
    @Override
    public String asSummaryString() {
        return "CustomTableSink";
    }
}

Legacy Sink Function Provider

Provider for legacy sink functions in table sinks.

/**
 * Provider for sink functions in legacy table sinks
 */
public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider {
    
    /**
     * Creates a sink function for consuming table data
     * @return SinkFunction instance
     */
    SinkFunction<RowData> createSinkFunction();
    
    /**
     * Optional parallelism for the sink function
     * @return Optional parallelism setting
     */
    default Optional<Integer> getParallelism() {
        return Optional.empty();
    }
}

Legacy Source Function Provider

Provider for legacy source functions in table sources.

/**
 * Provider for source functions in legacy table sources
 */
public interface SourceFunctionProvider extends ScanTableSource.ScanRuntimeProvider {
    
    /**
     * Creates a source function for producing table data
     * @return SourceFunction instance
     */
    SourceFunction<RowData> createSourceFunction();
    
    /**
     * Optional parallelism for the source function
     * @return Optional parallelism setting
     */
    default Optional<Integer> getParallelism() {
        return Optional.empty();
    }
}

Best Practices

Unique ID Generation

Always use ProviderContext.generateUid() for transformation IDs to ensure savepoint compatibility:

// Good - uses context for unique IDs
DataStream<RowData> stream = execEnv
    .addSource(sourceFunction)
    .uid(providerContext.generateUid("my-source"))
    .map(mapper)
    .uid(providerContext.generateUid("my-mapper"));

// Bad - hardcoded IDs may conflict
DataStream<RowData> stream = execEnv
    .addSource(sourceFunction)
    .uid("hardcoded-source") // May conflict with other connectors
    .map(mapper)
    .uid("hardcoded-mapper");

Parallelism Consistency

When using custom parallelism in sinks, ensure all transformations use the same parallelism:

@Override
public DataStreamSink<?> consumeDataStream(
        ProviderContext providerContext,
        DataStream<RowData> dataStream) {
    
    int customParallelism = 4;
    
    return dataStream
        .map(mapper)
        .setParallelism(customParallelism) // Same parallelism
        .uid(providerContext.generateUid("mapper"))
        .addSink(sinkFunction)
        .setParallelism(customParallelism) // Same parallelism
        .uid(providerContext.generateUid("sink"));
}

Types

Core Connector Types

import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.ParallelismProvider;

DataStream Integration Types

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;

Legacy Provider Types

import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider;
import org.apache.flink.legacy.table.connector.source.SourceFunctionProvider;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge

docs

builtin-connectors.md

changelog-processing.md

datastream-connectors.md

index.md

procedures.md

statement-sets.md

stream-table-environment.md

watermark-strategies.md

tile.json