CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-2-11

Apache Flink Table API for SQL-like operations on streaming and batch data

Pending
Overview
Eval results
Files

sql-integration.mddocs/

SQL Integration

The Flink Table API provides comprehensive SQL support through Apache Calcite integration, enabling both DDL (Data Definition Language) and DML (Data Manipulation Language) operations alongside standard SQL queries.

Capabilities

SQL Query Execution

Execute SQL queries directly and convert results to Table objects.

/**
 * Executes a SQL query and returns the result as a Table
 * @param query The SQL query string
 * @returns Table containing query results
 */
def sqlQuery(query: String): Table

/**
 * Executes a SQL statement (DDL/DML operations)
 * @param stmt The SQL statement string
 */
def sqlUpdate(stmt: String): Unit

Usage Examples:

// Basic SELECT queries
val basicQuery = tEnv.sqlQuery("SELECT name, age FROM Users WHERE age > 21")

val aggregateQuery = tEnv.sqlQuery("""
  SELECT department, 
         COUNT(*) as employee_count,
         AVG(salary) as avg_salary,
         MAX(salary) as max_salary
  FROM Employees 
  GROUP BY department
""")

// Complex joins and subqueries
val complexQuery = tEnv.sqlQuery("""
  SELECT e.name, e.salary, d.department_name, d.budget
  FROM Employees e
  JOIN Departments d ON e.dept_id = d.id
  WHERE e.salary > (
    SELECT AVG(salary) * 1.2 
    FROM Employees 
    WHERE dept_id = e.dept_id
  )
""")

// Window queries
val windowQuery = tEnv.sqlQuery("""
  SELECT 
    user_id,
    TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start,
    TUMBLE_END(event_time, INTERVAL '1' HOUR) as window_end,
    SUM(amount) as total_amount
  FROM Transactions
  GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' HOUR)
""")

Data Definition Language (DDL)

Create and manage table structures, views, and database objects.

// Table creation and management
tEnv.sqlUpdate("""
  CREATE TABLE Users (
    id BIGINT,
    name STRING,
    email STRING,
    age INT,
    registration_time TIMESTAMP(3),
    WATERMARK FOR registration_time AS registration_time - INTERVAL '5' SECOND
  ) WITH (
    'connector' = 'kafka',
    'topic' = 'users',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
  )
""")

// Temporary table creation
tEnv.sqlUpdate("""
  CREATE TEMPORARY TABLE TempResults (
    category STRING,
    total_sales DOUBLE,
    avg_price DOUBLE
  ) WITH (
    'connector' = 'print'
  )
""")

// View creation
tEnv.sqlUpdate("""
  CREATE VIEW ActiveUsers AS
  SELECT id, name, email
  FROM Users
  WHERE last_login > CURRENT_TIMESTAMP - INTERVAL '30' DAY
""")

// Temporary view creation
tEnv.sqlUpdate("""
  CREATE TEMPORARY VIEW RecentOrders AS
  SELECT o.*, u.name as user_name
  FROM Orders o
  JOIN Users u ON o.user_id = u.id
  WHERE o.order_time > CURRENT_TIMESTAMP - INTERVAL '7' DAY
""")

Data Manipulation Language (DML)

Insert, update, and manipulate data using SQL statements.

// Insert operations
tEnv.sqlUpdate("""
  INSERT INTO UserStatistics
  SELECT 
    user_id,
    COUNT(*) as order_count,
    SUM(amount) as total_spent,
    AVG(amount) as avg_order_value
  FROM Orders
  GROUP BY user_id
""")

// Insert with specific columns
tEnv.sqlUpdate("""
  INSERT INTO AuditLog (event_type, user_id, timestamp)
  SELECT 'ORDER_CREATED', user_id, order_time
  FROM Orders
  WHERE order_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR
""")

// Insert from multiple sources
tEnv.sqlUpdate("""
  INSERT INTO AllTransactions
  SELECT transaction_id, amount, 'CREDIT' as type, timestamp
  FROM CreditTransactions
  UNION ALL
  SELECT transaction_id, amount, 'DEBIT' as type, timestamp  
  FROM DebitTransactions
""")

Built-in SQL Functions

Comprehensive set of built-in functions for data processing and transformation.

// String functions
val stringFunctions = tEnv.sqlQuery("""
  SELECT 
    UPPER(name) as upper_name,
    LOWER(email) as lower_email,
    SUBSTRING(phone, 1, 3) as area_code,
    CONCAT(first_name, ' ', last_name) as full_name,
    LENGTH(description) as desc_length,
    TRIM(BOTH ' ' FROM padded_text) as trimmed
  FROM Users
""")

// Numeric functions  
val numericFunctions = tEnv.sqlQuery("""
  SELECT
    ABS(balance) as abs_balance,
    ROUND(salary, 2) as rounded_salary,
    CEILING(score) as ceil_score,
    FLOOR(rating) as floor_rating,
    MOD(id, 10) as id_mod,
    POWER(base_value, 2) as squared_value
  FROM Employees
""")

// Date and time functions
val timeFunctions = tEnv.sqlQuery("""
  SELECT
    CURRENT_TIMESTAMP as current_ts,
    CURRENT_DATE as current_date,
    CURRENT_TIME as current_time,
    EXTRACT(YEAR FROM birth_date) as birth_year,
    EXTRACT(MONTH FROM hire_date) as hire_month,
    DATEDIFF(CURRENT_DATE, hire_date) as days_employed,
    DATE_FORMAT(event_time, 'yyyy-MM-dd HH:mm') as formatted_time
  FROM Employees
""")

// Conditional functions
val conditionalFunctions = tEnv.sqlQuery("""
  SELECT
    name,
    CASE 
      WHEN age < 18 THEN 'Minor'
      WHEN age >= 18 AND age < 65 THEN 'Adult'
      ELSE 'Senior'
    END as age_category,
    COALESCE(middle_name, 'N/A') as middle_name_safe,
    NULLIF(status, 'UNKNOWN') as clean_status,
    IF(salary > 50000, 'High', 'Standard') as salary_tier
  FROM Employees
""")

Aggregate Functions

SQL aggregate functions for data summarization and analysis.

// Basic aggregations
val basicAggregates = tEnv.sqlQuery("""
  SELECT
    department,
    COUNT(*) as employee_count,
    COUNT(DISTINCT position) as unique_positions,
    SUM(salary) as total_salary,
    AVG(salary) as avg_salary,
    MIN(hire_date) as earliest_hire,
    MAX(salary) as highest_salary,
    STDDEV(salary) as salary_stddev,
    VAR_SAMP(salary) as salary_variance
  FROM Employees
  GROUP BY department
""")

// Advanced aggregations with HAVING
val advancedAggregates = tEnv.sqlQuery("""
  SELECT
    department,
    position,
    COUNT(*) as count,
    AVG(salary) as avg_salary,
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY salary) as median_salary,
    COLLECT(name) as employee_names
  FROM Employees
  GROUP BY department, position
  HAVING COUNT(*) >= 3 AND AVG(salary) > 40000
""")

// Grouping sets and rollup
val groupingSets = tEnv.sqlQuery("""
  SELECT
    department,
    position,
    gender,
    COUNT(*) as employee_count,
    AVG(salary) as avg_salary
  FROM Employees
  GROUP BY GROUPING SETS (
    (department, position, gender),
    (department, position),
    (department),
    ()
  )
""")

Window Functions in SQL

Analytical window functions for advanced data analysis.

// Time-based windows
val timeWindows = tEnv.sqlQuery("""
  SELECT
    user_id,
    transaction_time,
    amount,
    TUMBLE_START(transaction_time, INTERVAL '1' HOUR) as window_start,
    TUMBLE_END(transaction_time, INTERVAL '1' HOUR) as window_end,
    SUM(amount) OVER (
      PARTITION BY user_id, TUMBLE(transaction_time, INTERVAL '1' HOUR)
    ) as hourly_total
  FROM Transactions
""")

// Ranking and analytical functions
val analyticalFunctions = tEnv.sqlQuery("""
  SELECT
    employee_id,
    department,
    salary,
    ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as dept_rank,
    RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dept_rank_ties,
    DENSE_RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dept_dense_rank,
    PERCENT_RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dept_percent_rank,
    NTILE(4) OVER (PARTITION BY department ORDER BY salary DESC) as salary_quartile
  FROM Employees
""")

// Frame-based window functions
val frameFunctions = tEnv.sqlQuery("""
  SELECT
    employee_id,
    salary,
    hire_date,
    SUM(salary) OVER (
      ORDER BY hire_date 
      ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) as running_salary_sum,
    AVG(salary) OVER (
      ORDER BY hire_date
      ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING  
    ) as moving_avg_salary,
    LAG(salary, 1) OVER (ORDER BY hire_date) as prev_salary,
    LEAD(salary, 1) OVER (ORDER BY hire_date) as next_salary,
    FIRST_VALUE(salary) OVER (
      PARTITION BY department 
      ORDER BY hire_date
      ROWS UNBOUNDED PRECEDING
    ) as first_dept_salary
  FROM Employees
""")

Table-Valued Functions

Functions that return tables for complex data transformations.

// JSON parsing function (example of table-valued function usage)
val jsonParsing = tEnv.sqlQuery("""
  SELECT 
    user_id,
    json_data,
    parsed.name,
    parsed.age,
    parsed.preferences
  FROM Users u,
  LATERAL TABLE(JSON_PARSE(u.json_data)) as parsed(name, age, preferences)
""")

// String splitting function
val stringSplitting = tEnv.sqlQuery("""
  SELECT 
    user_id,
    tag
  FROM Users u,
  LATERAL TABLE(SPLIT(u.tags, ',')) as tag_table(tag)
  WHERE tag IS NOT NULL AND LENGTH(tag) > 0
""")

// Range generation function  
val rangeGeneration = tEnv.sqlQuery("""
  SELECT 
    generate_series.value as day_offset,
    DATE_ADD(CURRENT_DATE, generate_series.value) as date
  FROM LATERAL TABLE(GENERATE_SERIES(0, 30)) as generate_series(value)
""")

Common Table Expressions (CTEs)

Hierarchical and recursive queries using WITH clauses.

// Basic CTE usage
val cteQuery = tEnv.sqlQuery("""
  WITH department_stats AS (
    SELECT 
      department,
      COUNT(*) as emp_count,
      AVG(salary) as avg_salary,
      MAX(salary) as max_salary
    FROM Employees
    GROUP BY department
  ),
  high_performing_depts AS (
    SELECT department, avg_salary
    FROM department_stats
    WHERE emp_count >= 5 AND avg_salary > 50000
  )
  SELECT 
    e.name,
    e.salary,
    e.department,
    hpd.avg_salary as dept_avg_salary,
    (e.salary - hpd.avg_salary) as salary_diff
  FROM Employees e
  JOIN high_performing_depts hpd ON e.department = hpd.department
  WHERE e.salary > hpd.avg_salary * 1.1
""")

// Recursive CTE for hierarchical data
val recursiveCTE = tEnv.sqlQuery("""
  WITH RECURSIVE employee_hierarchy AS (
    -- Anchor: top-level managers
    SELECT employee_id, name, manager_id, 1 as level, name as path
    FROM Employees
    WHERE manager_id IS NULL
    
    UNION ALL
    
    -- Recursive: employees with managers
    SELECT 
      e.employee_id, 
      e.name, 
      e.manager_id, 
      eh.level + 1,
      eh.path || ' -> ' || e.name
    FROM Employees e
    JOIN employee_hierarchy eh ON e.manager_id = eh.employee_id
    WHERE eh.level < 10  -- Prevent infinite recursion
  )
  SELECT employee_id, name, level, path
  FROM employee_hierarchy
  ORDER BY level, name
""")

Advanced SQL Features

Complex SQL constructs for sophisticated data processing.

// Pivot operations (using CASE expressions)
val pivotQuery = tEnv.sqlQuery("""
  SELECT 
    department,
    SUM(CASE WHEN quarter = 'Q1' THEN sales END) as Q1_sales,
    SUM(CASE WHEN quarter = 'Q2' THEN sales END) as Q2_sales,
    SUM(CASE WHEN quarter = 'Q3' THEN sales END) as Q3_sales,
    SUM(CASE WHEN quarter = 'Q4' THEN sales END) as Q4_sales
  FROM QuarterlySales
  GROUP BY department
""")

// Set operations
val setOperations = tEnv.sqlQuery("""
  SELECT employee_id, name FROM CurrentEmployees
  UNION
  SELECT employee_id, name FROM FormerEmployees
  
  INTERSECT
  
  SELECT employee_id, name FROM EligibleForRehire
  
  EXCEPT
  
  SELECT employee_id, name FROM BlacklistedEmployees
""")

// Correlated subqueries
val correlatedSubqueries = tEnv.sqlQuery("""
  SELECT 
    e1.name,
    e1.salary,
    e1.department,
    (SELECT COUNT(*) 
     FROM Employees e2 
     WHERE e2.department = e1.department AND e2.salary > e1.salary
    ) as employees_with_higher_salary,
    (SELECT AVG(salary)
     FROM Employees e3
     WHERE e3.department = e1.department
    ) as dept_avg_salary
  FROM Employees e1
  WHERE e1.salary > (
    SELECT AVG(salary) * 1.2
    FROM Employees e4
    WHERE e4.department = e1.department
  )
""")

SQL Configuration and Optimization

SQL-specific configuration and query optimization hints.

// Query hints for optimization
val optimizedQuery = tEnv.sqlQuery("""
  SELECT /*+ USE_HASH_JOIN(e, d) */ 
    e.name,
    d.department_name
  FROM Employees e
  JOIN /*+ BROADCAST(d) */ Departments d ON e.dept_id = d.id
""")

// Configuration for SQL execution
val tableConfig = tEnv.getConfig
tableConfig.setSqlDialect(SqlDialect.HIVE)  // Set SQL dialect
tableConfig.getConfiguration.setString("table.optimizer.join-reorder-enabled", "true")

// Enable different SQL features
tEnv.getConfig.getConfiguration.setString("table.sql-dialect", "default")
tEnv.getConfig.getConfiguration.setBoolean("table.optimizer.agg-phase-strategy", true)

Error Handling and Debugging

SQL-related error handling and query debugging techniques.

// Explain query plans
val queryPlan = tEnv.explainSql("""
  SELECT department, COUNT(*), AVG(salary)
  FROM Employees
  WHERE hire_date > '2020-01-01'
  GROUP BY department
  HAVING COUNT(*) > 5
""")

println(queryPlan)

// Try-catch for SQL execution
try {
  val result = tEnv.sqlQuery("SELECT * FROM NonExistentTable")
} catch {
  case ex: ValidationException => 
    println(s"SQL validation error: ${ex.getMessage}")
  case ex: SqlParserException =>
    println(s"SQL parsing error: ${ex.getMessage}")
  case ex: TableException =>
    println(s"Table API error: ${ex.getMessage}")
}

// Debug SQL execution
tEnv.getConfig.getConfiguration.setString("table.exec.resource.default-parallelism", "1")
tEnv.getConfig.getConfiguration.setBoolean("table.optimizer.distinct-agg.split.enabled", false)

SQL Compatibility and Limitations

Understanding SQL feature support and limitations in Flink Table API.

// Supported SQL features:
// - Standard SQL:2011 features
// - Window functions (TUMBLE, HOP, SESSION)  
// - User-defined functions (scalar, table, aggregate)
// - Complex data types (ROW, ARRAY, MAP)
// - JSON functions and operators
// - Regular expressions and pattern matching

// Limitations and considerations:
// - Some advanced SQL features may not be supported
// - Performance characteristics differ from traditional databases
// - Streaming vs batch semantics affect result consistency
// - Late data handling in streaming mode
// - Watermark and time attribute requirements for event time operations

Types

// SQL-related exception types
class SqlParserException(message: String) extends RuntimeException(message)
class ValidationException(message: String) extends TableException(message)

// SQL dialect configuration
enum SqlDialect {
  DEFAULT, HIVE
}

// Query execution result
trait QueryResult {
  def print(): Unit
  def collect(): java.util.List[Row]
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-2-11

docs

index.md

sources-sinks.md

sql-integration.md

table-environment.md

table-operations.md

type-system.md

user-defined-functions.md

window-operations.md

tile.json