CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java-bridge

Java bridge for seamless integration between Flink's Table/SQL API and DataStream API, enabling conversion between streams and tables with unified batch and stream processing.

Pending
Overview
Eval results
Files

stream-table-environment.mddocs/

Stream Table Environment

The StreamTableEnvironment is the entry point and central context for creating Table and SQL API programs that integrate with the Java DataStream API. It provides unified processing for both bounded and unbounded data.

Capabilities

Environment Creation

Factory methods for creating StreamTableEnvironment instances with optional configuration.

/**
 * Creates a table environment with default settings
 * @param executionEnvironment The StreamExecutionEnvironment for the TableEnvironment
 * @return StreamTableEnvironment instance
 */
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);

/**
 * Creates a table environment with custom settings
 * @param executionEnvironment The StreamExecutionEnvironment for the TableEnvironment  
 * @param settings The EnvironmentSettings for configuration
 * @return StreamTableEnvironment instance
 */
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);

Usage Examples:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;

// Basic environment creation
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Environment with custom settings
EnvironmentSettings settings = EnvironmentSettings.newInstance()
    .inStreamingMode()
    .build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

DataStream to Table Conversion

Convert DataStreams into Tables with optional schema customization.

/**
 * Converts DataStream to Table with automatic schema derivation
 * @param dataStream The DataStream to convert
 * @return Table with derived schema
 */
<T> Table fromDataStream(DataStream<T> dataStream);

/**
 * Converts DataStream to Table with custom schema
 * @param dataStream The DataStream to convert
 * @param schema Custom schema for the resulting table
 * @return Table with specified schema
 */
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);

Usage Examples:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Schema;
import org.apache.flink.types.Row;

// Automatic schema derivation
DataStream<Row> dataStream = env.fromElements(
    Row.of("Alice", 25),
    Row.of("Bob", 30)
);
Table table = tableEnv.fromDataStream(dataStream);

// Custom schema with renamed columns
Schema schema = Schema.newBuilder()
    .column("name", "STRING")
    .column("age", "INT")
    .build();
Table customTable = tableEnv.fromDataStream(dataStream, schema);

// Schema with computed columns and watermarks
Schema advancedSchema = Schema.newBuilder()
    .column("f0", "STRING")
    .column("f1", "INT")
    .columnByExpression("upper_name", "UPPER(f0)")
    .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
    .watermark("rowtime", "SOURCE_WATERMARK()")
    .build();
Table advancedTable = tableEnv.fromDataStream(dataStream, advancedSchema);

Table to DataStream Conversion

Convert Tables back to DataStreams with type safety and optional type specification.

/**
 * Converts Table to DataStream<Row> for insert-only tables
 * @param table The Table to convert (must be insert-only)
 * @return DataStream of Row objects
 */
DataStream<Row> toDataStream(Table table);

/**
 * Converts Table to typed DataStream for insert-only tables
 * @param table The Table to convert (must be insert-only)
 * @param targetClass Target class for type conversion
 * @return Typed DataStream
 */
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);

/**
 * Converts Table to DataStream with specific data type
 * @param table The Table to convert (must be insert-only)
 * @param targetDataType Target data type specification
 * @return Typed DataStream with specified data type
 */
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);

Usage Examples:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.types.Row;

// Convert to Row DataStream
Table sourceTable = tableEnv.sqlQuery("SELECT name, age FROM users WHERE age > 21");
DataStream<Row> resultStream = tableEnv.toDataStream(sourceTable);

// Convert to POJO DataStream
public static class User {
    public String name;
    public Integer age;
    // constructors, getters, setters...
}

DataStream<User> userStream = tableEnv.toDataStream(sourceTable, User.class);

// Convert with specific data type
DataStream<User> typedUserStream = tableEnv.toDataStream(
    sourceTable, 
    DataTypes.of(User.class)
);

Temporary View Creation

Create temporary views from DataStreams for SQL querying.

/**
 * Creates temporary view from DataStream with automatic schema
 * @param path View path (catalog.database.view or database.view or view)
 * @param dataStream The DataStream to create view from
 */
<T> void createTemporaryView(String path, DataStream<T> dataStream);

/**
 * Creates temporary view from DataStream with custom schema
 * @param path View path (catalog.database.view or database.view or view)
 * @param dataStream The DataStream to create view from
 * @param schema Custom schema for the view
 */
<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);

Usage Examples:

// Create view with automatic schema
DataStream<Row> orderStream = env.fromElements(
    Row.of("order1", "user1", 100.0),
    Row.of("order2", "user2", 250.0)
);
tableEnv.createTemporaryView("orders", orderStream);

// Query the view with SQL
Table result = tableEnv.sqlQuery("SELECT * FROM orders WHERE f2 > 150");

// Create view with custom schema
Schema orderSchema = Schema.newBuilder()
    .column("order_id", "STRING")
    .column("user_id", "STRING") 
    .column("amount", "DOUBLE")
    .build();
tableEnv.createTemporaryView("orders_named", orderStream, orderSchema);

// Query with named columns
Table namedResult = tableEnv.sqlQuery("SELECT user_id, amount FROM orders_named WHERE amount > 150");

Statement Set Creation

Create StreamStatementSet for batch execution optimization.

/**
 * Creates StreamStatementSet for batch execution of multiple statements
 * @return StreamStatementSet for adding multiple operations
 */
StreamStatementSet createStatementSet();

Usage Examples:

import org.apache.flink.table.api.bridge.java.StreamStatementSet;

// Create statement set for batch execution
StreamStatementSet statementSet = tableEnv.createStatementSet();

// Add multiple operations
statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE type = 'A'");
statementSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE type = 'B'");
statementSet.addInsert("sink3", tableEnv.sqlQuery("SELECT COUNT(*) FROM source"));

// Execute all statements together
statementSet.execute();

Types

Schema Configuration

import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.expressions.Expression;

DataStream Integration

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.types.Row;

Deprecated Methods

The following methods are deprecated and should be replaced with newer Schema-based alternatives:

// Deprecated - use fromDataStream(DataStream<T>, Schema) instead
@Deprecated
<T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);

// Deprecated - use createTemporaryView(String, DataStream<T>, Schema) instead  
@Deprecated
<T> void createTemporaryView(String path, DataStream<T> dataStream, Expression... fields);

// Deprecated - use toDataStream(Table, Class<T>) instead
@Deprecated
<T> DataStream<T> toAppendStream(Table table, Class<T> clazz);

// Deprecated - use toDataStream(Table, Class<T>) instead
@Deprecated
<T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo);

// Deprecated - use toChangelogStream(Table, Schema) instead
@Deprecated  
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);

// Deprecated - use toChangelogStream(Table, Schema) instead
@Deprecated
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge

docs

builtin-connectors.md

changelog-processing.md

datastream-connectors.md

index.md

procedures.md

statement-sets.md

stream-table-environment.md

watermark-strategies.md

tile.json