CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11

Bridge component for Apache Flink's Table/SQL API that enables Java developers to write table programs that seamlessly interact with Flink's streaming and batch processing APIs.

Pending
Overview
Eval results
Files

stream-table-environment.mddocs/

Stream Table Environment

The StreamTableEnvironment is the main entry point for creating and managing table environments that integrate with Flink's DataStream API. It provides factory methods for creating table environments and comprehensive methods for converting between DataStream and Table representations.

Core Interface

@PublicEvolving
public interface StreamTableEnvironment extends TableEnvironment {
    // Factory methods
    static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
    static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);
    
    // DataStream to Table conversions
    <T> Table fromDataStream(DataStream<T> dataStream);
    <T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
    Table fromChangelogStream(DataStream<Row> dataStream);
    Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
    Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
    
    // Table to DataStream conversions  
    DataStream<Row> toDataStream(Table table);
    <T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
    <T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
    DataStream<Row> toChangelogStream(Table table);
    DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
    DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
    
    // Temporary view creation
    <T> void createTemporaryView(String path, DataStream<T> dataStream);
    <T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);
    
    // Statement set creation
    StreamStatementSet createStatementSet();
}

Factory Methods

Creating Basic Environment

Create a StreamTableEnvironment from an existing StreamExecutionEnvironment:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Creating with Custom Settings

Create a StreamTableEnvironment with specific configuration settings:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
    .inStreamingMode()
    .build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

DataStream to Table Conversion

Basic Conversion

Convert a DataStream to a Table using automatic schema inference:

DataStream<MyPojo> dataStream = env.fromElements(
    new MyPojo("Alice", 25),
    new MyPojo("Bob", 30)
);
Table table = tableEnv.fromDataStream(dataStream);

Conversion with Custom Schema

Convert a DataStream to a Table with a custom schema definition:

DataStream<Row> dataStream = env.fromElements(
    Row.of("Alice", 25),
    Row.of("Bob", 30)
);

Schema schema = Schema.newBuilder()
    .column("name", DataTypes.STRING())
    .column("age", DataTypes.INT())
    .build();

Table table = tableEnv.fromDataStream(dataStream, schema);

Changelog Stream Conversion

Convert a changelog DataStream to a Table:

DataStream<Row> changelogStream = // ... source of changelog data
Table table = tableEnv.fromChangelogStream(changelogStream);

// With custom schema
Schema schema = Schema.newBuilder()
    .column("id", DataTypes.BIGINT())
    .column("name", DataTypes.STRING())
    .column("value", DataTypes.DOUBLE())
    .build();

Table tableWithSchema = tableEnv.fromChangelogStream(changelogStream, schema);

// With changelog mode specification
ChangelogMode changelogMode = ChangelogMode.insertOnly();
Table tableWithMode = tableEnv.fromChangelogStream(changelogStream, schema, changelogMode);

Table to DataStream Conversion

Basic Conversion to Row

Convert a Table to a DataStream<Row>:

Table table = tableEnv.sqlQuery("SELECT name, age FROM users WHERE age > 21");
DataStream<Row> resultStream = tableEnv.toDataStream(table);

Conversion to Specific Type

Convert a Table to a DataStream of a specific class:

Table table = tableEnv.sqlQuery("SELECT name, age FROM users");
DataStream<MyPojo> pojoStream = tableEnv.toDataStream(table, MyPojo.class);

Conversion with Data Type

Convert a Table to a DataStream using a specific data type:

Table table = tableEnv.sqlQuery("SELECT name, age FROM users");
AbstractDataType<?> dataType = DataTypes.ROW(
    DataTypes.FIELD("name", DataTypes.STRING()),
    DataTypes.FIELD("age", DataTypes.INT())
);
DataStream<Row> typedStream = tableEnv.toDataStream(table, dataType);

Changelog Stream Conversion

Convert a Table to a changelog DataStream:

Table table = tableEnv.sqlQuery("SELECT id, name, COUNT(*) as cnt FROM events GROUP BY id, name");
DataStream<Row> changelogStream = tableEnv.toChangelogStream(table);

// With target schema
Schema targetSchema = Schema.newBuilder()
    .column("id", DataTypes.BIGINT())
    .column("name", DataTypes.STRING())
    .column("cnt", DataTypes.BIGINT())
    .build();

DataStream<Row> changelogWithSchema = tableEnv.toChangelogStream(table, targetSchema);

// With changelog mode
ChangelogMode mode = ChangelogMode.upsert();
DataStream<Row> upsertChangelog = tableEnv.toChangelogStream(table, targetSchema, mode);

Temporary View Creation

Basic View Creation

Create a temporary view from a DataStream:

DataStream<MyPojo> dataStream = env.fromElements(
    new MyPojo("Alice", 25),
    new MyPojo("Bob", 30)
);

tableEnv.createTemporaryView("users", dataStream);

// Now you can query the view
Table result = tableEnv.sqlQuery("SELECT * FROM users WHERE age > 25");

View Creation with Schema

Create a temporary view with a custom schema:

DataStream<Row> dataStream = env.fromElements(
    Row.of("Alice", 25),
    Row.of("Bob", 30)
);

Schema schema = Schema.newBuilder()
    .column("name", DataTypes.STRING())
    .column("age", DataTypes.INT())
    .build();

tableEnv.createTemporaryView("users_with_schema", dataStream, schema);
Table result = tableEnv.sqlQuery("SELECT name FROM users_with_schema WHERE age > 25");

Statement Set Creation

Create a StreamStatementSet for batch operations:

StreamStatementSet statementSet = tableEnv.createStatementSet();

// Add multiple insert operations
statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE condition1");
statementSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE condition2");

// Execute all statements together
statementSet.attachAsDataStream();

Deprecated Methods

The following methods are deprecated and should be avoided in new code:

  • registerFunction() - Use createFunction() instead
  • fromDataStream(DataStream, String) - Use fromDataStream(DataStream, Schema) instead
  • registerDataStream() - Use createTemporaryView() instead
  • toAppendStream() - Use toDataStream() instead
  • toRetractStream() - Use toChangelogStream() instead
  • execute() - Use StreamExecutionEnvironment.execute() instead

Error Handling

Common exceptions that may be thrown:

  • ValidationException - When schema validation fails during conversion
  • TableException - When table operations fail
  • IllegalArgumentException - When invalid parameters are provided

Always ensure proper error handling when working with conversions:

try {
    Table table = tableEnv.fromDataStream(dataStream, schema);
    DataStream<Row> result = tableEnv.toDataStream(table);
} catch (ValidationException e) {
    // Handle schema validation errors
    log.error("Schema validation failed: " + e.getMessage());
} catch (TableException e) {
    // Handle table operation errors
    log.error("Table operation failed: " + e.getMessage());
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11

docs

built-in-connectors.md

datastream-conversions.md

index.md

legacy-connector-support.md

modern-connector-framework.md

stream-table-environment.md

watermark-strategies.md

tile.json