Java bridge for seamless integration between Flink's Table/SQL API and DataStream API, enabling conversion between streams and tables with unified batch and stream processing.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-api-java-bridge@2.1.0The Apache Flink Table API Java Bridge provides seamless integration between Flink's Table/SQL API and the DataStream API for Java applications. This module enables developers to convert between DataStreams and Tables, create StreamTableEnvironments for unified batch and stream processing, and leverage SQL queries on streaming data with comprehensive connector support.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>2.1.0</version>
</dependency>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
// Create execution environment and table environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Convert DataStream to Table
DataStream<Row> dataStream = env.fromElements(
Row.of("Alice", 25),
Row.of("Bob", 30)
);
Table table = tableEnv.fromDataStream(dataStream);
// Execute SQL on the table
Table result = tableEnv.sqlQuery("SELECT * FROM " + table + " WHERE f1 > 25");
// Convert Table back to DataStream
DataStream<Row> resultStream = tableEnv.toDataStream(result);
// Execute the pipeline
env.execute("Table API Bridge Example");The Flink Table API Java Bridge is built around several key components:
Core environment for creating and managing tables that integrate with DataStream API. Provides unified context for both batch and streaming operations.
public interface StreamTableEnvironment extends TableEnvironment {
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);
<T> Table fromDataStream(DataStream<T> dataStream);
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
<T> Table fromDataStream(DataStream<T> dataStream, Expression... fields); // @Deprecated
<T> void createTemporaryView(String path, DataStream<T> dataStream);
<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);
<T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields); // @Deprecated
DataStream<Row> toDataStream(Table table);
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
<T> DataStream<T> toAppendStream(Table table, Class<T> clazz); // @Deprecated
<T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo); // @Deprecated
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz); // @Deprecated
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo); // @Deprecated
StreamStatementSet createStatementSet();
}Advanced stream processing with support for changelog semantics including inserts, updates, and deletes.
public interface StreamTableEnvironment extends TableEnvironment {
Table fromChangelogStream(DataStream<Row> dataStream);
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
DataStream<Row> toChangelogStream(Table table);
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
}Provider interfaces for advanced connector development that integrate directly with DataStream API.
public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv);
}
public interface DataStreamSinkProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream);
Optional<Integer> getParallelism();
}Ready-to-use connectors for development, testing, and debugging table applications.
// BlackHole connector - discards all data for performance testing
CREATE TABLE sink_table (...) WITH ('connector' = 'blackhole');
// DataGen connector - generates test data
CREATE TABLE source_table (...) WITH (
'connector' = 'datagen',
'rows-per-second' = '100',
'fields.user_id.kind' = 'sequence',
'fields.name.kind' = 'random'
);
// Print connector - outputs data to console for debugging
CREATE TABLE debug_table (...) WITH ('connector' = 'print');Time-based event processing with configurable watermark assignment strategies for handling out-of-order events.
public abstract class PeriodicWatermarkAssigner {
public abstract void nextTimestamp(long timestamp);
public abstract Watermark getWatermark();
public abstract Map<String, String> toProperties();
}
public final class BoundedOutOfOrderTimestamps extends PeriodicWatermarkAssigner {
public BoundedOutOfOrderTimestamps(long delay);
}
public final class AscendingTimestamps extends PeriodicWatermarkAssigner {
public AscendingTimestamps();
}Batch execution optimization for multiple table operations with shared planning and execution.
public interface StreamStatementSet extends StatementSet {
StreamStatementSet add(TablePipeline tablePipeline);
StreamStatementSet addInsertSql(String statement);
StreamStatementSet addInsert(String targetPath, Table table);
StreamStatementSet addInsert(String targetPath, Table table, boolean overwrite);
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table);
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table, boolean overwrite);
StreamStatementSet printExplain(ExplainDetail... extraDetails);
void attachAsDataStream();
// Inherited from StatementSet:
// TableResult execute();
// CompiledPlan compilePlan(); // @Experimental
// String explain(ExplainDetail... extraDetails);
// String explain(ExplainFormat format, ExplainDetail... extraDetails);
}Context for stored procedure execution with access to StreamExecutionEnvironment.
public interface ProcedureContext {
StreamExecutionEnvironment getExecutionEnvironment();
}
public class DefaultProcedureContext implements ProcedureContext {
// Default implementation
}import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.ExplainFormat;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.expressions.Expression;import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.ParallelismProvider;
import org.apache.flink.table.data.RowData;
import java.util.Optional;