CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-2-11

Apache Flink Table API for SQL-like operations on streaming and batch data

Pending
Overview
Eval results
Files

table-environment.mddocs/

Table Environment

The TableEnvironment is the main entry point of the Table API, providing methods for registering tables, executing SQL queries, and configuring the table program.

Capabilities

Environment Creation

Factory methods for creating batch and streaming table environments.

object TableEnvironment {
  /**
   * Creates a batch table environment with an ExecutionEnvironment
   * @param executionEnvironment The batch execution environment
   * @returns BatchTableEnvironment for batch processing
   */
  def getTableEnvironment(executionEnvironment: ExecutionEnvironment): BatchTableEnvironment

  /**
   * Creates a streaming table environment with a StreamExecutionEnvironment  
   * @param executionEnvironment The stream execution environment
   * @returns StreamTableEnvironment for stream processing
   */  
  def getTableEnvironment(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment
}

Usage Examples:

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._

// Batch environment
val batchEnv = ExecutionEnvironment.getExecutionEnvironment
val batchTableEnv = TableEnvironment.getTableEnvironment(batchEnv)

// Streaming environment  
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = TableEnvironment.getTableEnvironment(streamEnv)

Table Registration and Management

Register and manage tables within the table environment.

/**
 * Registers a table in the catalog for later reference
 * @param name Name to register the table under
 * @param table Table instance to register
 */
def registerTable(name: String, table: Table): Unit

/**
 * Registers a table source in the catalog
 * @param name Name to register the source under  
 * @param tableSource The table source to register
 */
def registerTableSource(name: String, tableSource: TableSource[_]): Unit

/**
 * Registers a table sink in the catalog
 * @param name Name to register the sink under
 * @param fieldNames Field names of the sink
 * @param fieldTypes Field types of the sink
 * @param tableSink The table sink to register
 */
def registerTableSink(
  name: String, 
  fieldNames: Array[String], 
  fieldTypes: Array[TypeInformation[_]], 
  tableSink: TableSink[_]
): Unit

/**
 * Scans a registered table by name
 * @param tablePath Table name or path components
 * @returns Table instance for the registered table
 */
def scan(tablePath: String*): Table

/**
 * Creates a table from a table source
 * @param source The table source to create table from
 * @returns Table instance wrapping the source
 */
def fromTableSource(source: TableSource[_]): Table

/**
 * Lists all registered tables in the environment
 * @returns Array of table names
 */
def listTables(): Array[String]

Usage Examples:

// Register a table
val dataSet = env.fromElements((1, "Alice"), (2, "Bob"))
val table = dataSet.toTable(tEnv, 'id, 'name)
tEnv.registerTable("Users", table)

// Scan registered table
val users = tEnv.scan("Users")

// Register and use table source
val csvSource = new CsvTableSource("/path/to/file.csv", Array("id", "name"), Array(Types.INT, Types.STRING))
tEnv.registerTableSource("CsvUsers", csvSource)
val csvTable = tEnv.scan("CsvUsers")

SQL Execution

Execute SQL queries and statements directly on the table environment.

/**
 * Executes a SQL query and returns the result as a Table
 * @param query The SQL query string  
 * @returns Table containing query results
 */
def sqlQuery(query: String): Table

/**
 * Executes a SQL statement (DDL/DML)
 * @param stmt The SQL statement string
 */
def sqlUpdate(stmt: String): Unit

Usage Examples:

// SQL queries
val result = tEnv.sqlQuery("SELECT id, name FROM Users WHERE id > 1")
val aggregated = tEnv.sqlQuery("SELECT COUNT(*) as user_count FROM Users")

// SQL DDL/DML
tEnv.sqlUpdate("CREATE TABLE OutputTable (id INT, name STRING)")
tEnv.sqlUpdate("INSERT INTO OutputTable SELECT * FROM Users")

Function Registration

Register user-defined functions for use in queries.

/**
 * Registers a scalar function
 * @param name Function name for SQL usage
 * @param function ScalarFunction implementation
 */
def registerFunction(name: String, function: ScalarFunction): Unit

/**
 * Lists all registered user-defined functions
 * @returns Array of function names
 */
def listUserDefinedFunctions(): Array[String]

Usage Examples:

// Register custom function
class AddOne extends ScalarFunction {
  def eval(x: Int): Int = x + 1
}

tEnv.registerFunction("addOne", new AddOne())

// Use in SQL
val result = tEnv.sqlQuery("SELECT addOne(id) FROM Users")

External Catalog Management

Register external catalogs for accessing external metadata stores.

/**
 * Registers an external catalog
 * @param name Catalog name
 * @param externalCatalog External catalog implementation
 */
def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit

/**
 * Gets the default catalog
 * @returns The default external catalog
 */
def getDefaultCatalog: ExternalCatalog

Configuration and Utilities

Access configuration and utility methods.

/**
 * Gets the table configuration
 * @returns TableConfig instance
 */
def getConfig: TableConfig

/**
 * Explains the execution plan for a table
 * @param table Table to explain
 * @returns String representation of execution plan
 */
def explain(table: Table): String

Usage Examples:

// Configure timezone
val config = tEnv.getConfig
config.setTimeZone(TimeZone.getTimeZone("UTC"))

// Explain query plan
val table = tEnv.scan("Users").select('name)
println(tEnv.explain(table))

Specialized Environments

BatchTableEnvironment

Batch-specific table environment with DataSet conversion capabilities.

abstract class BatchTableEnvironment extends TableEnvironment {
  /**
   * Converts a Table to a DataSet
   * @param table Table to convert
   * @returns DataSet of Row
   */
  def toDataSet[T](table: Table): DataSet[T]
  
  /**
   * Converts a Table to a DataSet with specific type
   * @param table Table to convert  
   * @param typeInfo Type information for conversion
   * @returns Typed DataSet
   */
  def toDataSet[T](table: Table, typeInfo: TypeInformation[T]): DataSet[T]
}

StreamTableEnvironment

Stream-specific table environment with DataStream conversion capabilities.

abstract class StreamTableEnvironment extends TableEnvironment {
  /**
   * Converts a Table to an append-only DataStream
   * @param table Table to convert
   * @returns DataStream of Row
   */
  def toAppendStream[T](table: Table): DataStream[T]
  
  /**
   * Converts a Table to a retract DataStream
   * @param table Table to convert
   * @returns DataStream of (Boolean, T) where Boolean indicates add/retract
   */
  def toRetractStream[T](table: Table): DataStream[(Boolean, T)]
}

Usage Examples:

// Batch conversion
val batchResult: DataSet[Row] = batchTableEnv.toDataSet(table)

// Stream conversions
val appendStream: DataStream[Row] = streamTableEnv.toAppendStream(table)  
val retractStream: DataStream[(Boolean, Row)] = streamTableEnv.toRetractStream(aggregatedTable)

Types

abstract class TableEnvironment
abstract class BatchTableEnvironment extends TableEnvironment  
abstract class StreamTableEnvironment extends TableEnvironment

trait ExternalCatalog {
  /**
   * Gets a table from the external catalog
   * @param tablePath Table path components
   * @returns Table from the external catalog
   */
  def getTable(tablePath: String*): Table
  
  /**
   * Lists all tables in the external catalog
   * @returns List of table names
   */
  def listTables(): java.util.List[String]
  
  /**
   * Gets a database from the external catalog
   * @param databaseName Name of the database
   * @returns External catalog database
   */
  def getDatabase(databaseName: String): ExternalCatalogDatabase
}

trait ExternalCatalogDatabase {
  /**
   * Gets a table from the database
   * @param tableName Name of the table
   * @returns Table from the database
   */
  def getTable(tableName: String): Table
  
  /**
   * Lists all tables in the database
   * @returns List of table names in the database
   */
  def listTables(): java.util.List[String]
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-2-11

docs

index.md

sources-sinks.md

sql-integration.md

table-environment.md

table-operations.md

type-system.md

user-defined-functions.md

window-operations.md

tile.json