Apache Flink Table API for SQL-like operations on streaming and batch data
—
The Flink Table API provides comprehensive SQL support through Apache Calcite integration, enabling both DDL (Data Definition Language) and DML (Data Manipulation Language) operations alongside standard SQL queries.
Execute SQL queries directly and convert results to Table objects.
/**
* Executes a SQL query and returns the result as a Table
* @param query The SQL query string
* @returns Table containing query results
*/
def sqlQuery(query: String): Table
/**
* Executes a SQL statement (DDL/DML operations)
* @param stmt The SQL statement string
*/
def sqlUpdate(stmt: String): UnitUsage Examples:
// Basic SELECT queries
val basicQuery = tEnv.sqlQuery("SELECT name, age FROM Users WHERE age > 21")
val aggregateQuery = tEnv.sqlQuery("""
SELECT department,
COUNT(*) as employee_count,
AVG(salary) as avg_salary,
MAX(salary) as max_salary
FROM Employees
GROUP BY department
""")
// Complex joins and subqueries
val complexQuery = tEnv.sqlQuery("""
SELECT e.name, e.salary, d.department_name, d.budget
FROM Employees e
JOIN Departments d ON e.dept_id = d.id
WHERE e.salary > (
SELECT AVG(salary) * 1.2
FROM Employees
WHERE dept_id = e.dept_id
)
""")
// Window queries
val windowQuery = tEnv.sqlQuery("""
SELECT
user_id,
TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start,
TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end,
SUM(amount) as total_amount
FROM Transactions
GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' HOUR)
""")Create and manage table structures, views, and database objects.
// Table creation and management
tEnv.sqlUpdate("""
CREATE TABLE Users (
id BIGINT,
name STRING,
email STRING,
age INT,
registration_time TIMESTAMP(3),
WATERMARK FOR registration_time AS registration_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'users',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")
// Temporary table creation
tEnv.sqlUpdate("""
CREATE TEMPORARY TABLE TempResults (
category STRING,
total_sales DOUBLE,
avg_price DOUBLE
) WITH (
'connector' = 'print'
)
""")
// View creation
tEnv.sqlUpdate("""
CREATE VIEW ActiveUsers AS
SELECT id, name, email
FROM Users
WHERE last_login > CURRENT_TIMESTAMP - INTERVAL '30' DAY
""")
// Temporary view creation
tEnv.sqlUpdate("""
CREATE TEMPORARY VIEW RecentOrders AS
SELECT o.*, u.name as user_name
FROM Orders o
JOIN Users u ON o.user_id = u.id
WHERE o.order_time > CURRENT_TIMESTAMP - INTERVAL '7' DAY
""")Insert, update, and manipulate data using SQL statements.
// Insert operations
tEnv.sqlUpdate("""
INSERT INTO UserStatistics
SELECT
user_id,
COUNT(*) as order_count,
SUM(amount) as total_spent,
AVG(amount) as avg_order_value
FROM Orders
GROUP BY user_id
""")
// Insert with specific columns
tEnv.sqlUpdate("""
INSERT INTO AuditLog (event_type, user_id, timestamp)
SELECT 'ORDER_CREATED', user_id, order_time
FROM Orders
WHERE order_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR
""")
// Insert from multiple sources
tEnv.sqlUpdate("""
INSERT INTO AllTransactions
SELECT transaction_id, amount, 'CREDIT' as type, timestamp
FROM CreditTransactions
UNION ALL
SELECT transaction_id, amount, 'DEBIT' as type, timestamp
FROM DebitTransactions
""")Comprehensive set of built-in functions for data processing and transformation.
// String functions
val stringFunctions = tEnv.sqlQuery("""
SELECT
UPPER(name) as upper_name,
LOWER(email) as lower_email,
SUBSTRING(phone, 1, 3) as area_code,
CONCAT(first_name, ' ', last_name) as full_name,
LENGTH(description) as desc_length,
TRIM(BOTH ' ' FROM padded_text) as trimmed
FROM Users
""")
// Numeric functions
val numericFunctions = tEnv.sqlQuery("""
SELECT
ABS(balance) as abs_balance,
ROUND(salary, 2) as rounded_salary,
CEILING(score) as ceil_score,
FLOOR(rating) as floor_rating,
MOD(id, 10) as id_mod,
POWER(base_value, 2) as squared_value
FROM Employees
""")
// Date and time functions
val timeFunctions = tEnv.sqlQuery("""
SELECT
CURRENT_TIMESTAMP as current_ts,
CURRENT_DATE as current_date,
CURRENT_TIME as current_time,
EXTRACT(YEAR FROM birth_date) as birth_year,
EXTRACT(MONTH FROM hire_date) as hire_month,
DATEDIFF(CURRENT_DATE, hire_date) as days_employed,
DATE_FORMAT(event_time, 'yyyy-MM-dd HH:mm') as formatted_time
FROM Employees
""")
// Conditional functions
val conditionalFunctions = tEnv.sqlQuery("""
SELECT
name,
CASE
WHEN age < 18 THEN 'Minor'
WHEN age >= 18 AND age < 65 THEN 'Adult'
ELSE 'Senior'
END as age_category,
COALESCE(middle_name, 'N/A') as middle_name_safe,
NULLIF(status, 'UNKNOWN') as clean_status,
IF(salary > 50000, 'High', 'Standard') as salary_tier
FROM Employees
""")SQL aggregate functions for data summarization and analysis.
// Basic aggregations
val basicAggregates = tEnv.sqlQuery("""
SELECT
department,
COUNT(*) as employee_count,
COUNT(DISTINCT position) as unique_positions,
SUM(salary) as total_salary,
AVG(salary) as avg_salary,
MIN(hire_date) as earliest_hire,
MAX(salary) as highest_salary,
STDDEV(salary) as salary_stddev,
VAR_SAMP(salary) as salary_variance
FROM Employees
GROUP BY department
""")
// Advanced aggregations with HAVING
val advancedAggregates = tEnv.sqlQuery("""
SELECT
department,
position,
COUNT(*) as count,
AVG(salary) as avg_salary,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY salary) as median_salary,
COLLECT(name) as employee_names
FROM Employees
GROUP BY department, position
HAVING COUNT(*) >= 3 AND AVG(salary) > 40000
""")
// Grouping sets and rollup
val groupingSets = tEnv.sqlQuery("""
SELECT
department,
position,
gender,
COUNT(*) as employee_count,
AVG(salary) as avg_salary
FROM Employees
GROUP BY GROUPING SETS (
(department, position, gender),
(department, position),
(department),
()
)
""")Analytical window functions for advanced data analysis.
// Time-based windows
val timeWindows = tEnv.sqlQuery("""
SELECT
user_id,
transaction_time,
amount,
TUMBLE_START(transaction_time, INTERVAL '1' HOUR) as window_start,
TUMBLE_END(transaction_time, INTERVAL '1' HOUR) as window_end,
SUM(amount) OVER (
PARTITION BY user_id, TUMBLE(transaction_time, INTERVAL '1' HOUR)
) as hourly_total
FROM Transactions
""")
// Ranking and analytical functions
val analyticalFunctions = tEnv.sqlQuery("""
SELECT
employee_id,
department,
salary,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as dept_rank,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dept_rank_ties,
DENSE_RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dept_dense_rank,
PERCENT_RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dept_percent_rank,
NTILE(4) OVER (PARTITION BY department ORDER BY salary DESC) as salary_quartile
FROM Employees
""")
// Frame-based window functions
val frameFunctions = tEnv.sqlQuery("""
SELECT
employee_id,
salary,
hire_date,
SUM(salary) OVER (
ORDER BY hire_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as running_salary_sum,
AVG(salary) OVER (
ORDER BY hire_date
ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING
) as moving_avg_salary,
LAG(salary, 1) OVER (ORDER BY hire_date) as prev_salary,
LEAD(salary, 1) OVER (ORDER BY hire_date) as next_salary,
FIRST_VALUE(salary) OVER (
PARTITION BY department
ORDER BY hire_date
ROWS UNBOUNDED PRECEDING
) as first_dept_salary
FROM Employees
""")Functions that return tables for complex data transformations.
// JSON parsing function (example of table-valued function usage)
val jsonParsing = tEnv.sqlQuery("""
SELECT
user_id,
json_data,
parsed.name,
parsed.age,
parsed.preferences
FROM Users u,
LATERAL TABLE(JSON_PARSE(u.json_data)) as parsed(name, age, preferences)
""")
// String splitting function
val stringSplitting = tEnv.sqlQuery("""
SELECT
user_id,
tag
FROM Users u,
LATERAL TABLE(SPLIT(u.tags, ',')) as tag_table(tag)
WHERE tag IS NOT NULL AND LENGTH(tag) > 0
""")
// Range generation function
val rangeGeneration = tEnv.sqlQuery("""
SELECT
generate_series.value as day_offset,
DATE_ADD(CURRENT_DATE, generate_series.value) as date
FROM LATERAL TABLE(GENERATE_SERIES(0, 30)) as generate_series(value)
""")Hierarchical and recursive queries using WITH clauses.
// Basic CTE usage
val cteQuery = tEnv.sqlQuery("""
WITH department_stats AS (
SELECT
department,
COUNT(*) as emp_count,
AVG(salary) as avg_salary,
MAX(salary) as max_salary
FROM Employees
GROUP BY department
),
high_performing_depts AS (
SELECT department, avg_salary
FROM department_stats
WHERE emp_count >= 5 AND avg_salary > 50000
)
SELECT
e.name,
e.salary,
e.department,
hpd.avg_salary as dept_avg_salary,
(e.salary - hpd.avg_salary) as salary_diff
FROM Employees e
JOIN high_performing_depts hpd ON e.department = hpd.department
WHERE e.salary > hpd.avg_salary * 1.1
""")
// Recursive CTE for hierarchical data
val recursiveCTE = tEnv.sqlQuery("""
WITH RECURSIVE employee_hierarchy AS (
-- Anchor: top-level managers
SELECT employee_id, name, manager_id, 1 as level, name as path
FROM Employees
WHERE manager_id IS NULL
UNION ALL
-- Recursive: employees with managers
SELECT
e.employee_id,
e.name,
e.manager_id,
eh.level + 1,
eh.path || ' -> ' || e.name
FROM Employees e
JOIN employee_hierarchy eh ON e.manager_id = eh.employee_id
WHERE eh.level < 10 -- Prevent infinite recursion
)
SELECT employee_id, name, level, path
FROM employee_hierarchy
ORDER BY level, name
""")Complex SQL constructs for sophisticated data processing.
// Pivot operations (using CASE expressions)
val pivotQuery = tEnv.sqlQuery("""
SELECT
department,
SUM(CASE WHEN quarter = 'Q1' THEN sales END) as Q1_sales,
SUM(CASE WHEN quarter = 'Q2' THEN sales END) as Q2_sales,
SUM(CASE WHEN quarter = 'Q3' THEN sales END) as Q3_sales,
SUM(CASE WHEN quarter = 'Q4' THEN sales END) as Q4_sales
FROM QuarterlySales
GROUP BY department
""")
// Set operations
val setOperations = tEnv.sqlQuery("""
SELECT employee_id, name FROM CurrentEmployees
UNION
SELECT employee_id, name FROM FormerEmployees
INTERSECT
SELECT employee_id, name FROM EligibleForRehire
EXCEPT
SELECT employee_id, name FROM BlacklistedEmployees
""")
// Correlated subqueries
val correlatedSubqueries = tEnv.sqlQuery("""
SELECT
e1.name,
e1.salary,
e1.department,
(SELECT COUNT(*)
FROM Employees e2
WHERE e2.department = e1.department AND e2.salary > e1.salary
) as employees_with_higher_salary,
(SELECT AVG(salary)
FROM Employees e3
WHERE e3.department = e1.department
) as dept_avg_salary
FROM Employees e1
WHERE e1.salary > (
SELECT AVG(salary) * 1.2
FROM Employees e4
WHERE e4.department = e1.department
)
""")SQL-specific configuration and query optimization hints.
// Query hints for optimization
val optimizedQuery = tEnv.sqlQuery("""
SELECT /*+ USE_HASH_JOIN(e, d) */
e.name,
d.department_name
FROM Employees e
JOIN /*+ BROADCAST(d) */ Departments d ON e.dept_id = d.id
""")
// Configuration for SQL execution
val tableConfig = tEnv.getConfig
tableConfig.setSqlDialect(SqlDialect.HIVE) // Set SQL dialect
tableConfig.getConfiguration.setString("table.optimizer.join-reorder-enabled", "true")
// Enable different SQL features
tEnv.getConfig.getConfiguration.setString("table.sql-dialect", "default")
tEnv.getConfig.getConfiguration.setBoolean("table.optimizer.agg-phase-strategy", true)SQL-related error handling and query debugging techniques.
// Explain query plans
val queryPlan = tEnv.explainSql("""
SELECT department, COUNT(*), AVG(salary)
FROM Employees
WHERE hire_date > '2020-01-01'
GROUP BY department
HAVING COUNT(*) > 5
""")
println(queryPlan)
// Try-catch for SQL execution
try {
val result = tEnv.sqlQuery("SELECT * FROM NonExistentTable")
} catch {
case ex: ValidationException =>
println(s"SQL validation error: ${ex.getMessage}")
case ex: SqlParserException =>
println(s"SQL parsing error: ${ex.getMessage}")
case ex: TableException =>
println(s"Table API error: ${ex.getMessage}")
}
// Debug SQL execution
tEnv.getConfig.getConfiguration.setString("table.exec.resource.default-parallelism", "1")
tEnv.getConfig.getConfiguration.setBoolean("table.optimizer.distinct-agg.split.enabled", false)Understanding SQL feature support and limitations in Flink Table API.
// Supported SQL features:
// - Standard SQL:2011 features
// - Window functions (TUMBLE, HOP, SESSION)
// - User-defined functions (scalar, table, aggregate)
// - Complex data types (ROW, ARRAY, MAP)
// - JSON functions and operators
// - Regular expressions and pattern matching
// Limitations and considerations:
// - Some advanced SQL features may not be supported
// - Performance characteristics differ from traditional databases
// - Streaming vs batch semantics affect result consistency
// - Late data handling in streaming mode
// - Watermark and time attribute requirements for event time operations// SQL-related exception types
class SqlParserException(message: String) extends RuntimeException(message)
class ValidationException(message: String) extends TableException(message)
// SQL dialect configuration
enum SqlDialect {
DEFAULT, HIVE
}
// Query execution result
trait QueryResult {
def print(): Unit
def collect(): java.util.List[Row]
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-2-11