This document covers the essential table environment setup, table creation, and basic table operations in Apache Flink Table Uber Blink.
The TableEnvironment is the main entry point for all table operations.
static TableEnvironment create(EnvironmentSettings settings);
static TableEnvironment create(Configuration configuration);Usage:
// Create with settings builder
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// Create with configuration
Configuration config = new Configuration();
config.setString("table.sql-dialect", "default");
TableEnvironment tEnv = TableEnvironment.create(config);class EnvironmentSettings {
static EnvironmentSettings.Builder newInstance();
interface Builder {
Builder useBlinkPlanner();
Builder inStreamingMode();
Builder inBatchMode();
Builder withBuiltInCatalogName(String catalogName);
Builder withBuiltInDatabaseName(String databaseName);
Builder withConfiguration(Configuration configuration);
EnvironmentSettings build();
}
}TableResult executeSql(String statement);
Table sqlQuery(String query);Usage:
// Execute DDL/DML statements
TableResult result = tEnv.executeSql(
"CREATE TABLE users (id INT, name STRING, age INT) WITH ('connector' = 'filesystem', 'path' = '/data/users', 'format' = 'json')"
);
// Execute queries returning tables
Table userTable = tEnv.sqlQuery("SELECT * FROM users WHERE age > 18");void createTable(String path, TableDescriptor descriptor);
void createTemporaryTable(String path, TableDescriptor descriptor);
void createTemporaryView(String path, Table view);
void dropTable(String path);
void dropTemporaryTable(String path);
void dropTemporaryView(String path);
boolean tableExists(String path);Table from(String path);
Table scan(String... tablePath);
Table fromValues(Object... values);
Table fromValues(AbstractDataType<?> rowType, Object... values);
Table fromValues(DataType rowType, Object... values);Usage:
// Create table with descriptor
TableDescriptor descriptor = TableDescriptor.forConnector("filesystem")
.schema(Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build())
.option("path", "/data/users")
.option("format", "json")
.build();
tEnv.createTable("users", descriptor);
// Access existing table
Table userTable = tEnv.from("users");
// Create table from values
Table valuesTable = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING())
),
Row.of(1, "Alice"),
Row.of(2, "Bob")
);
// Create temporary view
tEnv.createTemporaryView("user_view", userTable.where($"age".isGreater(18)));The Table interface provides fluent API for data transformations.
interface Table {
Table select(Expression... fields);
Table addColumns(Expression... fields);
Table addOrReplaceColumns(Expression... fields);
Table renameColumns(Expression... fields);
Table dropColumns(Expression... fields);
}interface Table {
Table where(Expression predicate);
Table filter(Expression predicate);
Table distinct();
Table orderBy(Expression... fields);
Table offset(int offset);
Table fetch(int fetch);
Table limit(int fetch);
}interface Table {
GroupedTable groupBy(Expression... fields);
WindowGroupedTable window(GroupWindow window);
}
interface GroupedTable {
Table select(Expression... fields);
AggregatedTable aggregate(Expression aggregateFunction);
FlatAggregateTable flatAggregate(Expression tableAggregateFunction);
}Usage:
Table users = tEnv.from("users");
// Select specific columns
Table result = users.select($("name"), $("age"));
// Filter data
Table adults = users.where($("age").isGreaterOrEqual(18));
// Group and aggregate
Table ageGroups = users
.groupBy($("age"))
.select($("age"), $("age").count().as("count"));interface Table {
Table join(Table right);
Table join(Table right, Expression joinPredicate);
Table leftOuterJoin(Table right);
Table leftOuterJoin(Table right, Expression joinPredicate);
Table rightOuterJoin(Table right);
Table rightOuterJoin(Table right, Expression joinPredicate);
Table fullOuterJoin(Table right);
Table fullOuterJoin(Table right, Expression joinPredicate);
}Usage:
Table users = tEnv.from("users");
Table orders = tEnv.from("orders");
// Inner join
Table userOrders = users.join(orders, $("users.id").isEqual($("orders.user_id")));
// Left outer join
Table usersWithOrders = users.leftOuterJoin(orders, $("users.id").isEqual($("orders.user_id")));interface Table {
Table unionAll(Table right);
Table union(Table right);
Table intersect(Table right);
Table intersectAll(Table right);
Table minus(Table right);
Table minusAll(Table right);
}interface Table {
TableResult execute();
void executeInsert(String tablePath);
TableResult executeInsert(String tablePath, boolean overwrite);
CloseableIterator<Row> execute().collect();
}For executing multiple DML statements as a single job:
interface TableEnvironment {
StatementSet createStatementSet();
}
interface StatementSet {
StatementSet addInsertSql(String statement);
StatementSet addInsert(String targetPath, Table table);
StatementSet addInsert(String targetPath, Table table, boolean overwrite);
String explain(ExplainDetail... extraDetails);
TableResult execute();
}Usage:
// Create statement set for multi-sink execution
StatementSet statementSet = tEnv.createStatementSet();
// Add multiple insert operations
statementSet.addInsert("sink_table_1", processedData);
statementSet.addInsertSql("INSERT INTO sink_table_2 SELECT * FROM source WHERE condition = true");
// Execute all operations as single job
TableResult result = statementSet.execute();Table result = users.select($("name"), $("age")).where($("age").isGreater(21));
// Execute and print results
result.execute().print();
// Collect results as iterator
try (CloseableIterator<Row> iterator = result.execute().collect()) {
while (iterator.hasNext()) {
Row row = iterator.next();
System.out.println(row);
}
}
// Insert into another table
result.executeInsert("young_adults");interface Table {
ResolvedSchema getResolvedSchema();
TableSchema getSchema(); // Deprecated
String explain();
String explain(ExplainDetail... extraDetails);
}interface TableEnvironment {
TableConfig getConfig();
Configuration getConfiguration();
}
class TableConfig {
Configuration getConfiguration();
ZoneId getLocalTimeZone();
void setLocalTimeZone(ZoneId zoneId);
}Common exceptions when working with table operations:
class TableException extends RuntimeException;
class SqlParserException extends TableException;
class ValidationException extends TableException;
class CodeGenException extends TableException;interface Table extends Serializable;
class ResolvedSchema {
List<Column> getColumns();
List<String> getColumnNames();
List<DataType> getColumnDataTypes();
Optional<UniqueConstraint> getPrimaryKey();
List<UniqueConstraint> getUniqueConstraints();
}
enum ExplainDetail {
CHANGELOG_MODE,
ESTIMATED_COST,
JSON_EXECUTION_PLAN
}
interface CloseableIterator<T> extends Iterator<T>, AutoCloseable;