CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11

Bridge component for Apache Flink's Table/SQL API that enables Java developers to write table programs that seamlessly interact with Flink's streaming and batch processing APIs.

Pending
Overview
Eval results
Files

legacy-connector-support.mddocs/

Legacy Connector Support

The Flink Table API Java Bridge maintains backward compatibility with legacy connector interfaces that were used before the introduction of the modern connector framework (FLIP-95). These interfaces are deprecated but still supported for existing connector implementations.

Legacy Factory Interfaces

StreamTableSourceFactory

Factory interface for creating legacy stream table sources:

@Deprecated
@PublicEvolving
public interface StreamTableSourceFactory<T> extends TableSourceFactory<T> {
    StreamTableSource<T> createStreamTableSource(Map<String, String> properties);
    
    // Inherited from TableSourceFactory
    TableSource<T> createTableSource(Map<String, String> properties);
}

Migration Note: New implementations should use DynamicTableSourceFactory instead.

StreamTableSinkFactory

Factory interface for creating legacy stream table sinks:

@Deprecated
@PublicEvolving
public interface StreamTableSinkFactory<T> extends TableSinkFactory<T> {
    StreamTableSink<T> createStreamTableSink(Map<String, String> properties);
    
    // Inherited from TableSinkFactory  
    TableSink<T> createTableSink(Map<String, String> properties);
}

Migration Note: New implementations should use DynamicTableSinkFactory instead.

Legacy Source Interfaces

StreamTableSource

Base interface for legacy streaming table sources:

@Deprecated
public interface StreamTableSource<T> extends TableSource<T> {
    default boolean isBounded() {
        return false;
    }
    
    DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);
}

Usage Example (Deprecated):

@Deprecated
public class MyLegacyTableSource implements StreamTableSource<Row> {
    private final String[] fieldNames;
    private final TypeInformation<?>[] fieldTypes;
    private final MySourceConfig config;
    
    public MyLegacyTableSource(String[] fieldNames, TypeInformation<?>[] fieldTypes, MySourceConfig config) {
        this.fieldNames = fieldNames;
        this.fieldTypes = fieldTypes;
        this.config = config;
    }
    
    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
        return execEnv
            .addSource(new MySourceFunction(config))
            .map(new MyRowMapper());
    }
    
    @Override
    public boolean isBounded() {
        return config.isBounded();
    }
    
    @Override
    public TableSchema getTableSchema() {
        return TableSchema.builder()
            .fields(fieldNames, fieldTypes)
            .build();
    }
    
    @Override
    public String explainSource() {
        return "MyLegacyTableSource";
    }
}

InputFormatTableSource

Abstract class for bounded table sources based on InputFormat:

@Deprecated
@Experimental
public abstract class InputFormatTableSource<T> extends StreamTableSource<T> {
    public abstract InputFormat<T, ?> getInputFormat();
    
    @Override
    public final boolean isBounded() {
        return true;
    }
    
    @Override
    public final DataStream<T> getDataStream(StreamExecutionEnvironment execEnv) {
        return execEnv.createInput(getInputFormat(), getReturnType());
    }
}

Usage Example (Deprecated):

@Deprecated
public class MyInputFormatTableSource extends InputFormatTableSource<Row> {
    private final MyInputFormat inputFormat;
    private final RowTypeInfo returnType;
    
    public MyInputFormatTableSource(MyInputFormat inputFormat, RowTypeInfo returnType) {
        this.inputFormat = inputFormat;
        this.returnType = returnType;
    }
    
    @Override
    public InputFormat<Row, ?> getInputFormat() {
        return inputFormat;
    }
    
    @Override
    public TypeInformation<Row> getReturnType() {
        return returnType;
    }
    
    @Override
    public TableSchema getTableSchema() {
        return TableSchema.fromTypeInfo(returnType);
    }
}

Legacy Sink Interfaces

StreamTableSink

Base interface for legacy streaming table sinks:

@Deprecated
public interface StreamTableSink<T> extends TableSink<T> {
    DataStreamSink<?> consumeDataStream(DataStream<T> dataStream);
}

Usage Example (Deprecated):

@Deprecated
public class MyLegacyTableSink implements StreamTableSink<Row> {
    private final String[] fieldNames;
    private final TypeInformation<?>[] fieldTypes;
    private final MySinkConfig config;
    
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
        return dataStream
            .map(new MyRowConverter(config))
            .addSink(new MySinkFunction(config))
            .name("MyLegacySink");
    }
    
    @Override
    public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
        return new MyLegacyTableSink(fieldNames, fieldTypes, config);
    }
    
    @Override
    public TableSchema getTableSchema() {
        return TableSchema.builder()
            .fields(fieldNames, fieldTypes)  
            .build();
    }
}

AppendStreamTableSink

Interface for append-only stream table sinks:

@Deprecated
@PublicEvolving
public interface AppendStreamTableSink<T> extends StreamTableSink<T> {
    // Inherits all methods from StreamTableSink
    // Semantically indicates append-only capability
}

RetractStreamTableSink

Interface for retractable stream table sinks that can handle updates:

@Deprecated
@PublicEvolving
public interface RetractStreamTableSink<T> extends StreamTableSink<Tuple2<Boolean, T>> {
    TypeInformation<T> getRecordType();
    
    default TypeInformation<Tuple2<Boolean, T>> getOutputType() {
        return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
    }
}

Usage Example (Deprecated):

@Deprecated
public class MyRetractStreamTableSink implements RetractStreamTableSink<Row> {
    
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
        return dataStream
            .process(new ProcessFunction<Tuple2<Boolean, Row>, MyRecord>() {
                @Override
                public void processElement(Tuple2<Boolean, Row> value, Context ctx, Collector<MyRecord> out) {
                    Boolean isInsert = value.f0;
                    Row row = value.f1;
                    
                    if (isInsert) {
                        out.collect(MyRecord.fromRow(row, ChangeType.INSERT));
                    } else {
                        out.collect(MyRecord.fromRow(row, ChangeType.DELETE));
                    }
                }
            })
            .addSink(new MyChangelogSinkFunction());
    }
    
    @Override
    public TypeInformation<Row> getRecordType() {
        return Types.ROW_NAMED(fieldNames, fieldTypes);
    }
}

UpsertStreamTableSink

Interface for upsert stream table sinks that can handle insert/update/delete operations:

@Deprecated
@PublicEvolving
public interface UpsertStreamTableSink<T> extends StreamTableSink<Tuple2<Boolean, T>> {
    void setKeyFields(String[] keys);
    void setIsAppendOnly(Boolean isAppendOnly);
    TypeInformation<T> getRecordType();
    
    default TypeInformation<Tuple2<Boolean, T>> getOutputType() {
        return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
    }
}

Usage Example (Deprecated):

@Deprecated
public class MyUpsertStreamTableSink implements UpsertStreamTableSink<Row> {
    private String[] keyFields;
    private Boolean isAppendOnly;
    
    @Override
    public void setKeyFields(String[] keys) {
        this.keyFields = keys;
    }
    
    @Override
    public void setIsAppendOnly(Boolean isAppendOnly) {
        this.isAppendOnly = isAppendOnly;
    }
    
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
        return dataStream
            .keyBy(new KeySelector<Tuple2<Boolean, Row>, String>() {
                @Override
                public String getKey(Tuple2<Boolean, Row> value) throws Exception {
                    Row row = value.f1;
                    // Build key from key fields
                    StringBuilder keyBuilder = new StringBuilder();
                    for (String keyField : keyFields) {
                        int index = getFieldIndex(keyField);
                        keyBuilder.append(row.getField(index)).append("|");
                    }
                    return keyBuilder.toString();
                }
            })
            .addSink(new MyUpsertSinkFunction(keyFields, isAppendOnly));
    }
    
    @Override
    public TypeInformation<Row> getRecordType() {
        return Types.ROW_NAMED(fieldNames, fieldTypes);
    }
}

OutputFormatTableSink

Abstract class for table sinks based on OutputFormat:

@Deprecated
public abstract class OutputFormatTableSink<T> implements StreamTableSink<T> {
    public abstract OutputFormat<T> getOutputFormat();
    
    @Override
    public DataStreamSink<T> consumeDataStream(DataStream<T> dataStream) {
        return dataStream.writeUsingOutputFormat(getOutputFormat());
    }
}

Legacy CSV Connector (Testing Only)

The CSV connector implementations are maintained only for testing the legacy connector stack:

CsvTableSource

@Deprecated
public class CsvTableSource extends InputFormatTableSource<Row> {
    // Constructor options
    public CsvTableSource(String path, String[] fieldNames, TypeInformation<?>[] fieldTypes);
    public CsvTableSource(
        String path, 
        String[] fieldNames, 
        TypeInformation<?>[] fieldTypes,
        String fieldDelim, 
        String rowDelim, 
        Character quoteCharacter,
        boolean ignoreFirstLine, 
        String ignoreComments, 
        boolean lenient
    );
    
    // Builder pattern support
    public static CsvTableSource.Builder builder();
}

CsvTableSink

@Deprecated
public class CsvTableSink extends OutputFormatTableSink<Row> {
    public CsvTableSink(String path, String fieldDelim, int numFiles, WriteMode writeMode);
    
    @Override
    public OutputFormat<Row> getOutputFormat() {
        return new CsvOutputFormat<>(path, fieldDelim, numFiles, writeMode);
    }
}

Migration Guidelines

From Legacy to Modern Connectors

Step 1: Replace Factory Interface

// Old (Deprecated)
public class MyConnectorFactory implements StreamTableSourceFactory<Row> {
    @Override
    public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
        return new MyLegacyTableSource(properties);
    }
}

// New (Recommended)  
public class MyConnectorFactory implements DynamicTableSourceFactory {
    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        return new MyModernTableSource(context);
    }
}

Step 2: Replace Source Interface

// Old (Deprecated)
public class MyLegacyTableSource implements StreamTableSource<Row> {
    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
        return execEnv.addSource(new MySourceFunction());
    }
}

// New (Recommended)
public class MyModernTableSource implements DynamicTableSource {
    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
        return new DataStreamScanProvider() {
            @Override
            public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
                return execEnv.addSource(new MySourceFunction())
                             .map(new MyRowDataMapper());
            }
        };
    }
}

Step 3: Replace Sink Interface

// Old (Deprecated)
public class MyLegacyTableSink implements StreamTableSink<Row> {
    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
        return dataStream.addSink(new MySinkFunction());
    }
}

// New (Recommended)
public class MyModernTableSink implements DynamicTableSink {
    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        return new DataStreamSinkProvider() {
            @Override
            public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
                return dataStream
                    .map(new MyRowDataConverter())
                    .addSink(new MySinkFunction());
            }
        };
    }
}

Configuration Migration

Legacy String-based Properties:

// Old approach
Map<String, String> properties = new HashMap<>();
properties.put("connector.type", "my-connector");
properties.put("connector.host", "localhost");
properties.put("connector.port", "8080");

Modern ConfigOption-based Configuration:

// New approach
public class MyConnectorOptions {
    public static final ConfigOption<String> HOST = 
        ConfigOptions.key("host")
            .stringType()
            .defaultValue("localhost");
            
    public static final ConfigOption<Integer> PORT =
        ConfigOptions.key("port")
            .intType()
            .defaultValue(8080);
}

Compatibility Considerations

Runtime Compatibility

  1. Legacy connectors continue to work with current Flink versions
  2. Mixed usage of legacy and modern connectors is supported
  3. Gradual migration can be performed incrementally

Feature Limitations

Legacy connectors have limitations compared to modern ones:

  1. No support for complex data types introduced in newer versions
  2. Limited metadata access compared to modern metadata handling
  3. No support for advanced features like watermark pushdown
  4. String-based configuration instead of type-safe ConfigOptions

Performance Implications

  1. Legacy connectors may have slight performance overhead
  2. Type conversions between Row and RowData may be needed
  3. Modern connectors are optimized for current Flink runtime

Best Practices for Legacy Support

When to Use Legacy Interfaces

  1. Maintaining existing connectors that haven't been migrated yet
  2. Quick prototyping when familiar with legacy APIs
  3. Compatibility requirements with older Flink versions

Migration Strategy

  1. Plan migration during major version upgrades
  2. Test thoroughly with both legacy and modern implementations
  3. Migrate incrementally by component rather than all at once
  4. Document migration progress and remaining legacy components

Code Organization

// Organize legacy code clearly
@Deprecated
@SuppressWarnings("deprecation")
public class LegacyConnectorSupport {
    
    // Legacy implementation
    public static class LegacySource implements StreamTableSource<Row> {
        // Implementation
    }
    
    // Migration helper
    public static DynamicTableSource migrateToModern(LegacySource legacy) {
        return new ModernSourceAdapter(legacy);
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11

docs

built-in-connectors.md

datastream-conversions.md

index.md

legacy-connector-support.md

modern-connector-framework.md

stream-table-environment.md

watermark-strategies.md

tile.json