Bridge component for Apache Flink's Table/SQL API that enables Java developers to write table programs that seamlessly interact with Flink's streaming and batch processing APIs.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11@1.14.0Apache Flink Table API Java Bridge is a critical component that enables Java developers to write table programs that seamlessly interact with Flink's streaming and batch processing APIs. This bridge provides the integration layer between Flink's high-level Table API and the core Java DataStream API, offering a unified programming model for both stream and batch table operations.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
import org.apache.flink.table.api.Schema;
// Create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create table environment
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Convert DataStream to Table with automatic schema derivation
DataStream<MyPojo> dataStream = env.fromElements(new MyPojo("John", 25), new MyPojo("Jane", 30));
Table table = tableEnv.fromDataStream(dataStream);
// Or with explicit schema
Schema schema = Schema.newBuilder()
.column("name", "STRING")
.column("age", "INT")
.build();
Table tableWithSchema = tableEnv.fromDataStream(dataStream, schema);
// Perform table operations
Table result = table.select($("name"), $("age").plus(1).as("age_plus_one"));
// Convert back to DataStream
DataStream<Row> resultStream = tableEnv.toDataStream(result);
// Execute
env.execute("Table Bridge Example");The Flink Table API Java Bridge consists of several key components:
StreamTableEnvironment, StreamStatementSet)The main entry point for creating and managing table environments that integrate with DataStream API.
public interface StreamTableEnvironment extends TableEnvironment {
// Factory methods
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);
@Deprecated
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, TableConfig tableConfig);
// Function registration (deprecated)
@Deprecated
<T> void registerFunction(String name, TableFunction<T> tableFunction);
@Deprecated
<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction);
@Deprecated
<T, ACC> void registerFunction(String name, TableAggregateFunction<T, ACC> tableAggregateFunction);
// DataStream to Table conversions
<T> Table fromDataStream(DataStream<T> dataStream);
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
@Deprecated
<T> Table fromDataStream(DataStream<T> dataStream, String fields);
@Deprecated
<T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);
Table fromChangelogStream(DataStream<Row> dataStream);
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
// Temporary view creation
<T> void createTemporaryView(String path, DataStream<T> dataStream);
<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);
@Deprecated
<T> void createTemporaryView(String path, DataStream<T> dataStream, String fields);
@Deprecated
<T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields);
@Deprecated
<T> void registerDataStream(String name, DataStream<T> dataStream);
@Deprecated
<T> void registerDataStream(String name, DataStream<T> dataStream, String fields);
// Table to DataStream conversions
DataStream<Row> toDataStream(Table table);
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
@Deprecated
<T> DataStream<T> toAppendStream(Table table, Class<T> clazz);
@Deprecated
<T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo);
DataStream<Row> toChangelogStream(Table table);
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
@Deprecated
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);
@Deprecated
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo);
// Statement set creation
StreamStatementSet createStatementSet();
// Job execution (deprecated)
@Deprecated
JobExecutionResult execute(String jobName) throws Exception;
}Convert between DataStream and Table representations for seamless integration.
// DataStream to Table conversions
<T> Table fromDataStream(DataStream<T> dataStream);
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
Table fromChangelogStream(DataStream<Row> dataStream);
// Table to DataStream conversions
DataStream<Row> toDataStream(Table table);
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
DataStream<Row> toChangelogStream(Table table);New connector interfaces following FLIP-95 design for better integration with DataStream API.
public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider {
DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv);
}
public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream);
@Override
default Optional<Integer> getParallelism() {
return Optional.empty();
}
}
public interface SourceFunctionProvider extends ScanTableSource.ScanRuntimeProvider {
static SourceFunctionProvider of(SourceFunction<RowData> sourceFunction, boolean isBounded);
SourceFunction<RowData> createSourceFunction();
boolean isBounded();
}
public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider {
static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction);
static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction, @Nullable Integer parallelism);
SinkFunction<RowData> createSinkFunction();
Optional<Integer> getParallelism();
}Ready-to-use connectors for development and testing scenarios.
// DataGen Connector for generating test data
public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
// Factory methods and configuration
}
// Print Connector for outputting results
public class PrintTableSinkFactory implements DynamicTableSinkFactory {
// Factory methods and configuration
}
// BlackHole Connector for performance testing
public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {
// Factory methods and configuration
}Time-based watermarking strategies for event-time processing in streaming applications. These are legacy watermark strategy classes for table sources.
public abstract class PeriodicWatermarkAssigner extends WatermarkStrategy {
public abstract void nextTimestamp(long timestamp);
public abstract Watermark getWatermark();
public abstract Map<String, String> toProperties();
}
public abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
public abstract Watermark getWatermark(Row row, long timestamp);
public abstract Map<String, String> toProperties();
}
public final class AscendingTimestamps extends PeriodicWatermarkAssigner {
@Override
public void nextTimestamp(long timestamp);
@Override
public Watermark getWatermark();
@Override
public Map<String, String> toProperties();
}
public final class BoundedOutOfOrderTimestamps extends PeriodicWatermarkAssigner {
public BoundedOutOfOrderTimestamps(long maxOutOfOrderness);
@Override
public void nextTimestamp(long timestamp);
@Override
public Watermark getWatermark();
@Override
public Map<String, String> toProperties();
}Deprecated but maintained interfaces for backward compatibility with existing connector implementations.
@Deprecated
public interface StreamTableSource<T> extends TableSource<T> {
default boolean isBounded() { return false; }
DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);
}
@Deprecated
public interface StreamTableSink<T> extends TableSink<T> {
DataStreamSink<?> consumeDataStream(DataStream<T> dataStream);
}public interface StreamStatementSet extends StatementSet {
@Override
StreamStatementSet addInsertSql(String statement);
@Override
StreamStatementSet addInsert(String targetPath, Table table);
@Override
StreamStatementSet addInsert(String targetPath, Table table, boolean overwrite);
@Override
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table);
@Override
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table, boolean overwrite);
void attachAsDataStream();
}
public class Schema {
public static Schema.Builder newBuilder();
// Schema definition for DataStream to Table conversions
}
public enum ChangelogMode {
INSERT_ONLY,
UPSERT,
ALL;
public static ChangelogMode insertOnly();
public static ChangelogMode upsert();
public static ChangelogMode all();
}
public abstract class WatermarkStrategy {
public abstract Map<String, String> toProperties();
}
public interface ParallelismProvider {
Optional<Integer> getParallelism();
}