Java API for Apache Flink's Table ecosystem, enabling type-safe table operations and SQL query execution.
—
Flink Table API provides seamless integration with SQL, allowing you to mix SQL statements with Table API operations. The system supports full SQL DDL, DML, and query capabilities alongside programmatic table operations.
Execute SQL SELECT queries and retrieve results as Table objects.
/**
* Executes a SQL query and returns the result as a Table
* @param query SQL SELECT statement
* @return Table containing query results
*/
public Table sqlQuery(String query);Usage Examples:
// Basic SQL query
Table result = tableEnv.sqlQuery(
"SELECT customer_id, SUM(amount) as total_amount " +
"FROM orders " +
"WHERE order_date >= '2023-01-01' " +
"GROUP BY customer_id " +
"HAVING SUM(amount) > 1000"
);
// Complex joins and window functions
Table analyticsResult = tableEnv.sqlQuery(
"SELECT " +
" c.customer_name, " +
" o.order_date, " +
" o.amount, " +
" SUM(o.amount) OVER (PARTITION BY c.customer_id ORDER BY o.order_date) as running_total, " +
" ROW_NUMBER() OVER (PARTITION BY c.customer_id ORDER BY o.amount DESC) as amount_rank " +
"FROM customers c " +
"JOIN orders o ON c.customer_id = o.customer_id " +
"WHERE o.order_date >= CURRENT_DATE - INTERVAL '30' DAY"
);
// Subqueries and CTEs
Table complexQuery = tableEnv.sqlQuery(
"WITH monthly_sales AS ( " +
" SELECT " +
" EXTRACT(YEAR FROM order_date) as year, " +
" EXTRACT(MONTH FROM order_date) as month, " +
" SUM(amount) as monthly_total " +
" FROM orders " +
" GROUP BY EXTRACT(YEAR FROM order_date), EXTRACT(MONTH FROM order_date) " +
"), " +
"avg_sales AS ( " +
" SELECT AVG(monthly_total) as avg_monthly " +
" FROM monthly_sales " +
") " +
"SELECT ms.year, ms.month, ms.monthly_total, " +
" ms.monthly_total - a.avg_monthly as deviation " +
"FROM monthly_sales ms CROSS JOIN avg_sales a " +
"ORDER BY ms.year, ms.month"
);Execute SQL DDL and DML statements that modify schema or data.
/**
* Executes a SQL statement and returns execution results
* @param statement SQL DDL, DML, or query statement
* @return TableResult containing execution status and results
*/
public TableResult executeSql(String statement);Usage Examples:
// Create table with SQL DDL
tableEnv.executeSql(
"CREATE TABLE user_behavior ( " +
" user_id BIGINT, " +
" item_id BIGINT, " +
" category_id BIGINT, " +
" behavior STRING, " +
" ts TIMESTAMP(3), " +
" WATERMARK FOR ts AS ts - INTERVAL '5' SECOND " +
") WITH ( " +
" 'connector' = 'kafka', " +
" 'topic' = 'user_behavior', " +
" 'properties.bootstrap.servers' = 'localhost:9092', " +
" 'format' = 'json' " +
")"
);
// Create sink table
tableEnv.executeSql(
"CREATE TABLE result_table ( " +
" user_id BIGINT, " +
" behavior_count BIGINT, " +
" PRIMARY KEY (user_id) NOT ENFORCED " +
") WITH ( " +
" 'connector' = 'jdbc', " +
" 'url' = 'jdbc:mysql://localhost:3306/test', " +
" 'table-name' = 'user_behavior_count' " +
")"
);
// Insert data with SQL
TableResult insertResult = tableEnv.executeSql(
"INSERT INTO result_table " +
"SELECT user_id, COUNT(*) as behavior_count " +
"FROM user_behavior " +
"WHERE behavior = 'purchase' " +
"GROUP BY user_id"
);
// Get job client for monitoring
Optional<JobClient> jobClient = insertResult.getJobClient();
if (jobClient.isPresent()) {
System.out.println("Job ID: " + jobClient.get().getJobID());
}Group multiple DML statements for efficient batch execution.
/**
* Creates a StatementSet for batching multiple statements
* @return StatementSet for adding multiple statements
*/
public StatementSet createStatementSet();
public interface StatementSet {
/**
* Adds an INSERT SQL statement to the set
* @param statement INSERT SQL statement
* @return StatementSet for method chaining
*/
StatementSet addInsertSql(String statement);
/**
* Adds an INSERT operation from a Table to the set
* @param targetPath Target table path
* @param table Table to insert
* @return StatementSet for method chaining
*/
StatementSet addInsert(String targetPath, Table table);
/**
* Executes all statements in the set
* @return TableResult containing execution status
*/
TableResult execute();
/**
* Explains the execution plan for all statements in the set
* @return String representation of the execution plan
*/
String explain();
}Usage Examples:
// Create statement set for multiple inserts
StatementSet stmtSet = tableEnv.createStatementSet();
// Add multiple SQL insert statements
stmtSet.addInsertSql(
"INSERT INTO daily_summary " +
"SELECT DATE(order_date) as day, COUNT(*) as order_count, SUM(amount) as total_amount " +
"FROM orders " +
"WHERE order_date >= CURRENT_DATE - INTERVAL '1' DAY " +
"GROUP BY DATE(order_date)"
);
stmtSet.addInsertSql(
"INSERT INTO customer_summary " +
"SELECT customer_id, COUNT(*) as order_count, SUM(amount) as total_spent " +
"FROM orders " +
"WHERE order_date >= CURRENT_DATE - INTERVAL '1' DAY " +
"GROUP BY customer_id"
);
// Add Table API insert
Table hourlyStats = sourceTable
.window(Tumble.over(lit(1).hour()).on($("event_time")).as("hourly"))
.groupBy($("hourly"))
.select($("hourly").start().as("hour"), count($("*")).as("event_count"));
stmtSet.addInsert("hourly_stats", hourlyStats);
// Execute all statements together
TableResult result = stmtSet.execute();
// Explain execution plan
String plan = stmtSet.explain();
System.out.println("Execution plan:\n" + plan);Seamlessly mix SQL queries with Table API operations.
Usage Examples:
// Start with SQL, continue with Table API
Table sqlTable = tableEnv.sqlQuery(
"SELECT customer_id, order_date, amount " +
"FROM orders " +
"WHERE amount > 100"
);
Table processed = sqlTable
.filter($("amount").isGreater(500))
.groupBy($("customer_id"))
.select($("customer_id"), $("amount").avg().as("avg_amount"));
// Start with Table API, continue with SQL
Table apiTable = sourceTable
.select($("id"), $("name"), $("salary"))
.filter($("salary").isGreater(50000));
// Register as temporary view for SQL access
tableEnv.createTemporaryView("high_earners", apiTable);
Table sqlResult = tableEnv.sqlQuery(
"SELECT name, salary, " +
" CASE WHEN salary > 100000 THEN 'Senior' " +
" WHEN salary > 75000 THEN 'Mid' " +
" ELSE 'Junior' END as level " +
"FROM high_earners " +
"ORDER BY salary DESC"
);
// Chain back to Table API
Table finalResult = sqlResult
.groupBy($("level"))
.select($("level"), count($("*")).as("count"), $("salary").avg().as("avg_salary"));Support for window functions, complex expressions, and advanced SQL constructs.
Usage Examples:
// Window functions and analytical queries
Table windowAnalysis = tableEnv.sqlQuery(
"SELECT " +
" product_id, " +
" sale_date, " +
" daily_sales, " +
" -- Moving average over 7 days " +
" AVG(daily_sales) OVER ( " +
" PARTITION BY product_id " +
" ORDER BY sale_date " +
" ROWS BETWEEN 6 PRECEDING AND CURRENT ROW " +
" ) as moving_avg_7d, " +
" -- Cumulative sales " +
" SUM(daily_sales) OVER ( " +
" PARTITION BY product_id " +
" ORDER BY sale_date " +
" ROWS UNBOUNDED PRECEDING " +
" ) as cumulative_sales, " +
" -- Rank by sales within each month " +
" RANK() OVER ( " +
" PARTITION BY product_id, EXTRACT(YEAR_MONTH FROM sale_date) " +
" ORDER BY daily_sales DESC " +
" ) as monthly_rank " +
"FROM daily_product_sales"
);
// Complex case expressions and functions
Table caseAnalysis = tableEnv.sqlQuery(
"SELECT " +
" customer_id, " +
" order_count, " +
" total_amount, " +
" CASE " +
" WHEN total_amount > 10000 AND order_count > 20 THEN 'VIP' " +
" WHEN total_amount > 5000 OR order_count > 10 THEN 'Premium' " +
" WHEN total_amount > 1000 THEN 'Regular' " +
" ELSE 'New' " +
" END as customer_tier, " +
" -- Calculate percentile rank " +
" PERCENT_RANK() OVER (ORDER BY total_amount) as amount_percentile, " +
" -- String manipulation " +
" CONCAT('CUSTOMER_', LPAD(CAST(customer_id AS STRING), 8, '0')) as customer_code, " +
" -- Date functions " +
" EXTRACT(DAYOFWEEK FROM first_order_date) as first_order_dow, " +
" DATEDIFF(last_order_date, first_order_date) as customer_lifetime_days " +
"FROM customer_summary"
);
// Array and map operations (if supported by connectors)
Table arrayOperations = tableEnv.sqlQuery(
"SELECT " +
" user_id, " +
" tags, " +
" CARDINALITY(tags) as tag_count, " +
" tags[1] as primary_tag, " +
" -- Check if array contains specific value " +
" 'premium' = ANY(tags) as is_premium_user, " +
" -- String aggregation " +
" ARRAY_JOIN(tags, ', ') as tags_string " +
"FROM user_profiles " +
"WHERE CARDINALITY(tags) > 0"
);Time-based operations and temporal table functions in SQL.
Usage Examples:
// Temporal joins (for lookups with time-based versioning)
Table temporalJoin = tableEnv.sqlQuery(
"SELECT " +
" o.order_id, " +
" o.customer_id, " +
" o.product_id, " +
" o.order_time, " +
" o.amount, " +
" p.product_name, " +
" p.category, " +
" -- Get product info as of order time " +
" p.price as price_at_order_time " +
"FROM orders o " +
"JOIN product_changelog FOR SYSTEM_TIME AS OF o.order_time AS p " +
" ON o.product_id = p.product_id"
);
// Interval operations
Table intervalQuery = tableEnv.sqlQuery(
"SELECT " +
" user_id, " +
" login_time, " +
" logout_time, " +
" logout_time - login_time as session_duration, " +
" -- Check if session was longer than 30 minutes " +
" logout_time - login_time > INTERVAL '30' MINUTE as long_session, " +
" -- Add time intervals " +
" login_time + INTERVAL '1' HOUR as expected_timeout " +
"FROM user_sessions " +
"WHERE logout_time - login_time > INTERVAL '1' MINUTE"
);
// Watermark and event time operations
Table eventTimeQuery = tableEnv.sqlQuery(
"SELECT " +
" sensor_id, " +
" TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start, " +
" TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end, " +
" COUNT(*) as reading_count, " +
" AVG(temperature) as avg_temperature, " +
" MIN(temperature) as min_temperature, " +
" MAX(temperature) as max_temperature " +
"FROM sensor_readings " +
"GROUP BY sensor_id, TUMBLE(event_time, INTERVAL '1' HOUR)"
);Configure SQL parsing and execution behavior.
/**
* SQL dialect configuration
*/
public enum SqlDialect {
/** Default Flink SQL dialect */
DEFAULT,
/** Hive-compatible SQL dialect for migration scenarios */
HIVE
}Usage Examples:
// Set SQL dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
// Hive dialect for compatibility
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// Use Hive-specific syntax when in Hive dialect
if (tableEnv.getConfig().getSqlDialect() == SqlDialect.HIVE) {
tableEnv.executeSql(
"CREATE TABLE hive_table ( " +
" id BIGINT, " +
" name STRING, " +
" part_date STRING " +
") PARTITIONED BY (part_date) " +
"STORED AS PARQUET " +
"LOCATION '/warehouse/hive_table'"
);
}
// Configuration through table config
Configuration config = tableEnv.getConfig().getConfiguration();
config.setString("table.sql-dialect", "default");
config.setString("table.local-time-zone", "UTC");Handle SQL parsing and execution errors.
/**
* Exception thrown for SQL parsing errors
*/
public class SqlParserException extends RuntimeException {
public SqlParserException(String message);
public SqlParserException(String message, Throwable cause);
}
/**
* Exception for SQL parsing errors at end of input
*/
public class SqlParserEOFException extends SqlParserException {
public SqlParserEOFException(String message);
}Usage Examples:
// Handle SQL parsing errors
try {
Table result = tableEnv.sqlQuery(
"SELECT customer_id, SUM(amount " + // Missing closing parenthesis
"FROM orders " +
"GROUP BY customer_id"
);
} catch (SqlParserException e) {
System.err.println("SQL parsing failed: " + e.getMessage());
// Handle parsing error, perhaps show user-friendly error message
}
// Handle execution errors
try {
TableResult result = tableEnv.executeSql(
"INSERT INTO non_existent_table " +
"SELECT * FROM orders"
);
} catch (Exception e) {
System.err.println("SQL execution failed: " + e.getMessage());
// Handle execution error
}
// Validate SQL before execution
try {
String sql = "SELECT * FROM orders WHERE amount > ?";
// In practice, you might want to validate parameter binding
Table validated = tableEnv.sqlQuery(sql.replace("?", "100"));
System.out.println("SQL is valid");
} catch (SqlParserException e) {
System.err.println("Invalid SQL: " + e.getMessage());
}Create and manage database objects using SQL DDL.
Usage Examples:
// Create catalog
tableEnv.executeSql(
"CREATE CATALOG my_catalog WITH ( " +
" 'type' = 'hive', " +
" 'hive-conf-dir' = '/opt/hive/conf' " +
")"
);
// Create database
tableEnv.executeSql(
"CREATE DATABASE IF NOT EXISTS analytics " +
"COMMENT 'Analytics database for business intelligence'"
);
// Create function
tableEnv.executeSql(
"CREATE TEMPORARY FUNCTION hash_func AS 'com.example.HashFunction'"
);
// Create view
tableEnv.executeSql(
"CREATE VIEW high_value_customers AS " +
"SELECT customer_id, SUM(amount) as total_spent " +
"FROM orders " +
"GROUP BY customer_id " +
"HAVING SUM(amount) > 5000"
);
// Alter table
tableEnv.executeSql(
"ALTER TABLE orders ADD COLUMN discount_amount DECIMAL(10,2)"
);
// Drop objects
tableEnv.executeSql("DROP VIEW IF EXISTS high_value_customers");
tableEnv.executeSql("DROP TEMPORARY FUNCTION hash_func");
tableEnv.executeSql("DROP TABLE temp_results");Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java