Java bridge API for Apache Flink's Table/SQL functionality, enabling seamless integration between table operations and DataStream APIs
npx @tessl/cli install tessl/maven-flink-table-api-java-bridge@1.20.0Apache Flink's Table API Java Bridge provides seamless integration between Flink's Table/SQL API and the DataStream API. It enables developers to convert between DataStream and Table representations, register custom connectors, and execute table operations within streaming applications.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.20.2</version>
</dependency>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Schema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
import org.apache.flink.table.types.AbstractDataType;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
// Create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Convert DataStream to Table
DataStream<Row> dataStream = // ... your data stream
Table table = tableEnv.fromDataStream(dataStream);
// Execute SQL queries
Table result = tableEnv.sqlQuery("SELECT * FROM " + table + " WHERE amount > 100");
// Convert back to DataStream
DataStream<Row> resultStream = tableEnv.toDataStream(result);The Flink Table API Java Bridge is organized around several key components:
Core functionality for creating and managing table environments that bridge DataStream and Table APIs.
public interface StreamTableEnvironment extends TableEnvironment {
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);
}Bi-directional conversion between DataStream and Table with support for custom schemas and changelog processing.
// DataStream to Table conversion
<T> Table fromDataStream(DataStream<T> dataStream);
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
Table fromChangelogStream(DataStream<Row> dataStream);
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
// Table to DataStream conversion
DataStream<Row> toDataStream(Table table);
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
DataStream<Row> toChangelogStream(Table table);
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
// Temporary view creation
<T> void createTemporaryView(String path, DataStream<T> dataStream);
<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);
// Statement set creation
StreamStatementSet createStatementSet();
// Deprecated methods (maintained for backward compatibility)
@Deprecated <T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);
@Deprecated <T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields);
@Deprecated <T> DataStream<T> toAppendStream(Table table, Class<T> clazz);
@Deprecated <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo);
@Deprecated <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);
@Deprecated <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo);Provider interfaces for implementing custom table sources and sinks that integrate with DataStream API.
public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv);
}
public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream);
Optional<Integer> getParallelism();
}
public interface ProviderContext {
String generateUid(String name);
}
public interface ParallelismProvider {
Optional<Integer> getParallelism();
}Production-ready connectors for common use cases including test data generation, development output, and performance testing.
// DataGen connector for test data generation
public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
public String factoryIdentifier(); // Returns "datagen"
}
// Print connector for development output
public class PrintTableSinkFactory implements DynamicTableSinkFactory {
public String factoryIdentifier(); // Returns "print"
}
// BlackHole connector for performance testing
public class BlackHoleTableSinkFactory implements DynamicTableSinkFactory {
public String factoryIdentifier(); // Returns "blackhole"
}Batch execution of multiple table operations for optimized query planning and execution.
public interface StreamStatementSet extends StatementSet {
StreamStatementSet add(TablePipeline tablePipeline);
StreamStatementSet addInsertSql(String statement);
StreamStatementSet addInsert(String targetPath, Table table);
StreamStatementSet addInsert(String targetPath, Table table, boolean overwrite);
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table);
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table, boolean overwrite);
void attachAsDataStream();
StreamStatementSet printExplain(ExplainDetail... extraDetails);
}Framework for stored procedure execution with access to StreamExecutionEnvironment.
public interface ProcedureContext {
StreamExecutionEnvironment getExecutionEnvironment();
}
public class DefaultProcedureContext implements ProcedureContext {
public DefaultProcedureContext(StreamExecutionEnvironment executionEnvironment);
}import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TablePipeline;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.ParallelismProvider;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.Optional;// Schema builder for custom table schemas
Schema schema = Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.columnByExpression("computed", "id * 2")
.columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
.watermark("rowtime", "rowtime - INTERVAL '5' SECOND")
.build();// Changelog mode options
ChangelogMode.all() // INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE
ChangelogMode.insertOnly() // INSERT only
ChangelogMode.upsert() // INSERT, UPDATE_AFTER, DELETE