Comprehensive uber JAR that consolidates all Java APIs for Apache Flink's Table/SQL ecosystem, enabling developers to write table programs and integrate with other Flink APIs through a single dependency.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-api-java-uber@2.1.0Apache Flink Table API Java Uber JAR provides a comprehensive set of Java APIs for working with tables and SQL in the Apache Flink ecosystem. This uber JAR consolidates all table-related modules into a single dependency, enabling developers to build stream and batch processing applications using both programmatic Table API and declarative SQL approaches.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-uber</artifactId>
<version>2.1.0</version>
</dependency>import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.*;
import static org.apache.flink.table.api.Expressions.*;
// Create a TableEnvironment for batch processing
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// Define a table using SQL DDL
tEnv.executeSql("CREATE TABLE Orders (" +
"id BIGINT," +
"product STRING," +
"amount DECIMAL(10,2)" +
") WITH ('connector' = 'datagen')");
// Query using Table API
Table orders = tEnv.from("Orders")
.select($("product"), $("amount"))
.filter($("amount").isGreater(lit(100)))
.groupBy($("product"))
.select($("product"), $("amount").sum().as("total_amount"));
// Execute and print results
orders.execute().print();
// Or execute SQL directly
tEnv.executeSql("SELECT product, SUM(amount) as total_amount " +
"FROM Orders WHERE amount > 100 " +
"GROUP BY product").print();The Apache Flink Table API is built around several key architectural components:
Primary table manipulation and query capabilities including creation, transformation, aggregation, and joining operations.
// TableEnvironment - Primary entry point
public static TableEnvironment create(EnvironmentSettings settings);
public Table from(String path);
public TableResult executeSql(String statement);
public Table sqlQuery(String query);
// Table - Core table operations interface
public Table select(Expression... fields);
public Table filter(Expression predicate);
public Table groupBy(Expression... fields);
public AggregatedTable aggregate(Expression... aggregateExpressions);
public Table join(Table right);
public TableResult execute();Comprehensive type system supporting primitive types, temporal types, and complex nested structures for defining table schemas.
// DataTypes - Factory for all Table API data types
public static DataType BOOLEAN();
public static DataType INT();
public static DataType STRING();
public static DataType TIMESTAMP();
public static DataType ARRAY(DataType elementType);
public static DataType ROW(Field... fields);
// Schema - Table schema definition
public static Schema.Builder newBuilder();
public Builder column(String name, DataType type);
public Builder watermark(String columnName, Expression watermarkExpr);Rich expression language for data manipulation, filtering, and computation with support for arithmetic, logical, string, and temporal operations.
// Expressions - Factory for SQL expressions
public static Expression $(String name);
public static Expression lit(Object value);
// Expression operations
public Expression plus(Object other);
public Expression isEqual(Object other);
public Expression and(Object other);
public Expression upperCase();
public Expression substring(int start, int length);Framework for extending Flink with custom scalar functions, table functions, and aggregate functions for specialized processing requirements.
// Base classes for user-defined functions
public abstract class ScalarFunction extends UserDefinedFunction {
public abstract Object eval(Object... args);
}
public abstract class TableFunction<T> extends UserDefinedFunction {
public abstract void eval(Object... args);
public void collect(T result);
}
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
public abstract ACC createAccumulator();
public abstract void accumulate(ACC accumulator, Object... args);
public abstract T getValue(ACC accumulator);
}Bridge between Table API and DataStream API enabling conversion between tables and data streams for hybrid stream/batch processing.
// StreamTableEnvironment - Bridge to DataStream API
public static StreamTableEnvironment create(StreamExecutionEnvironment streamEnv);
public Table fromDataStream(DataStream<?> dataStream);
public Table fromDataStream(DataStream<?> dataStream, Schema schema);
public <T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
public DataStream<Row> toChangelogStream(Table table);Remote SQL execution capabilities for building client-server architectures and multi-tenant SQL services.
// SqlGatewayService - Core SQL Gateway service
public SessionHandle openSession(SessionEnvironment environment);
public OperationHandle executeStatement(SessionHandle sessionHandle,
String statement,
long executionTimeoutMs,
Configuration executionConfig);
public ResultSet fetchResults(SessionHandle sessionHandle,
OperationHandle operationHandle,
FetchOrientation orientation,
int maxRows);Extensible connector architecture with built-in connectors for testing and development, plus framework for custom connector development.
// Base interfaces for connectors
public interface DynamicTableSource extends TableSource {
// Source connector interface
}
public interface DynamicTableSink extends TableSink {
// Sink connector interface
}
// Built-in connector factories
public class DataGenTableSourceFactory implements DynamicTableSourceFactory;
public class PrintTableSinkFactory implements DynamicTableSinkFactory;// Set up streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Create source table
tEnv.executeSql("CREATE TABLE clicks (" +
"user_id BIGINT," +
"page STRING," +
"timestamp_ltz TIMESTAMP_LTZ(3)," +
"WATERMARK FOR timestamp_ltz AS timestamp_ltz - INTERVAL '5' SECOND" +
") WITH ('connector' = 'kafka', ...)");
// Windowed aggregation
Table result = tEnv.from("clicks")
.window(Tumble.over(lit(1).hours()).on($("timestamp_ltz")).as("window"))
.groupBy($("window"), $("page"))
.select($("page"), $("user_id").count().as("page_views"));
// Output results
result.executeInsert("output_table");// Batch environment
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// Load and transform data
Table orders = tEnv.from("source_table")
.select($("order_id"), $("customer_id"), $("amount"), $("order_date"))
.filter($("amount").isGreaterOrEqual(lit(100)))
.groupBy($("customer_id"))
.select($("customer_id"),
$("amount").sum().as("total_spent"),
$("order_id").count().as("order_count"));
// Write results
orders.executeInsert("customer_summary");Common exceptions and error patterns:
// Core exception types
public class TableException extends RuntimeException;
public class ValidationException extends TableException;
public class SqlParserException extends TableException;
public class CatalogNotExistException extends CatalogException;
public class TableNotExistException extends CatalogException;Handle common errors:
try {
TableResult result = tEnv.executeSql("SELECT * FROM non_existent_table");
} catch (TableNotExistException e) {
// Handle missing table
} catch (SqlParserException e) {
// Handle SQL syntax errors
} catch (ValidationException e) {
// Handle type/schema validation errors
}