0
# Apache Flink Table API Java Uber JAR
1
2
Apache Flink Table API Java Uber JAR provides a comprehensive set of Java APIs for working with tables and SQL in the Apache Flink ecosystem. This uber JAR consolidates all table-related modules into a single dependency, enabling developers to build stream and batch processing applications using both programmatic Table API and declarative SQL approaches.
3
4
## Package Information
5
6
- **Package Name**: flink-table-api-java-uber
7
- **Package Type**: Maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Version**: 2.1.0
11
- **Installation**:
12
```xml
13
<dependency>
14
<groupId>org.apache.flink</groupId>
15
<artifactId>flink-table-api-java-uber</artifactId>
16
<version>2.1.0</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```java
23
import org.apache.flink.table.api.TableEnvironment;
24
import org.apache.flink.table.api.EnvironmentSettings;
25
import org.apache.flink.table.api.Table;
26
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
27
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
28
```
29
30
## Basic Usage
31
32
```java
33
import org.apache.flink.table.api.*;
34
import static org.apache.flink.table.api.Expressions.*;
35
36
// Create a TableEnvironment for batch processing
37
EnvironmentSettings settings = EnvironmentSettings.newInstance()
38
.inBatchMode()
39
.build();
40
TableEnvironment tEnv = TableEnvironment.create(settings);
41
42
// Define a table using SQL DDL
43
tEnv.executeSql("CREATE TABLE Orders (" +
44
"id BIGINT," +
45
"product STRING," +
46
"amount DECIMAL(10,2)" +
47
") WITH ('connector' = 'datagen')");
48
49
// Query using Table API
50
Table orders = tEnv.from("Orders")
51
.select($("product"), $("amount"))
52
.filter($("amount").isGreater(lit(100)))
53
.groupBy($("product"))
54
.select($("product"), $("amount").sum().as("total_amount"));
55
56
// Execute and print results
57
orders.execute().print();
58
59
// Or execute SQL directly
60
tEnv.executeSql("SELECT product, SUM(amount) as total_amount " +
61
"FROM Orders WHERE amount > 100 " +
62
"GROUP BY product").print();
63
```
64
65
## Architecture
66
67
The Apache Flink Table API is built around several key architectural components:
68
69
- **TableEnvironment**: Central coordination point that manages catalogs, configuration, and execution
70
- **Table API**: Programmatic interface for table transformations with type safety and IDE support
71
- **SQL Integration**: Full ANSI SQL support with DDL, DML, and query capabilities
72
- **Type System**: Rich data type system supporting primitives, complex types, and temporal types
73
- **Expression System**: Comprehensive set of functions and operators for data manipulation
74
- **Connector Framework**: Extensible architecture for integrating with external systems
75
- **DataStream Bridge**: Seamless integration between Table API and DataStream API
76
- **SQL Gateway**: Remote SQL execution capabilities for client-server deployments
77
78
## Capabilities
79
80
### Core Table Operations
81
82
Primary table manipulation and query capabilities including creation, transformation, aggregation, and joining operations.
83
84
```java { .api }
85
// TableEnvironment - Primary entry point
86
public static TableEnvironment create(EnvironmentSettings settings);
87
public Table from(String path);
88
public TableResult executeSql(String statement);
89
public Table sqlQuery(String query);
90
91
// Table - Core table operations interface
92
public Table select(Expression... fields);
93
public Table filter(Expression predicate);
94
public Table groupBy(Expression... fields);
95
public AggregatedTable aggregate(Expression... aggregateExpressions);
96
public Table join(Table right);
97
public TableResult execute();
98
```
99
100
[Table Operations](./table-operations.md)
101
102
### Data Types System
103
104
Comprehensive type system supporting primitive types, temporal types, and complex nested structures for defining table schemas.
105
106
```java { .api }
107
// DataTypes - Factory for all Table API data types
108
public static DataType BOOLEAN();
109
public static DataType INT();
110
public static DataType STRING();
111
public static DataType TIMESTAMP();
112
public static DataType ARRAY(DataType elementType);
113
public static DataType ROW(Field... fields);
114
115
// Schema - Table schema definition
116
public static Schema.Builder newBuilder();
117
public Builder column(String name, DataType type);
118
public Builder watermark(String columnName, Expression watermarkExpr);
119
```
120
121
[Data Types](./data-types.md)
122
123
### Expression System
124
125
Rich expression language for data manipulation, filtering, and computation with support for arithmetic, logical, string, and temporal operations.
126
127
```java { .api }
128
// Expressions - Factory for SQL expressions
129
public static Expression $(String name);
130
public static Expression lit(Object value);
131
132
// Expression operations
133
public Expression plus(Object other);
134
public Expression isEqual(Object other);
135
public Expression and(Object other);
136
public Expression upperCase();
137
public Expression substring(int start, int length);
138
```
139
140
[Expressions](./expressions.md)
141
142
### User-Defined Functions
143
144
Framework for extending Flink with custom scalar functions, table functions, and aggregate functions for specialized processing requirements.
145
146
```java { .api }
147
// Base classes for user-defined functions
148
public abstract class ScalarFunction extends UserDefinedFunction {
149
public abstract Object eval(Object... args);
150
}
151
152
public abstract class TableFunction<T> extends UserDefinedFunction {
153
public abstract void eval(Object... args);
154
public void collect(T result);
155
}
156
157
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
158
public abstract ACC createAccumulator();
159
public abstract void accumulate(ACC accumulator, Object... args);
160
public abstract T getValue(ACC accumulator);
161
}
162
```
163
164
[Functions](./functions.md)
165
166
### DataStream Integration
167
168
Bridge between Table API and DataStream API enabling conversion between tables and data streams for hybrid stream/batch processing.
169
170
```java { .api }
171
// StreamTableEnvironment - Bridge to DataStream API
172
public static StreamTableEnvironment create(StreamExecutionEnvironment streamEnv);
173
public Table fromDataStream(DataStream<?> dataStream);
174
public Table fromDataStream(DataStream<?> dataStream, Schema schema);
175
public <T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
176
public DataStream<Row> toChangelogStream(Table table);
177
```
178
179
[DataStream Bridge](./datastream-bridge.md)
180
181
### SQL Gateway API
182
183
Remote SQL execution capabilities for building client-server architectures and multi-tenant SQL services.
184
185
```java { .api }
186
// SqlGatewayService - Core SQL Gateway service
187
public SessionHandle openSession(SessionEnvironment environment);
188
public OperationHandle executeStatement(SessionHandle sessionHandle,
189
String statement,
190
long executionTimeoutMs,
191
Configuration executionConfig);
192
public ResultSet fetchResults(SessionHandle sessionHandle,
193
OperationHandle operationHandle,
194
FetchOrientation orientation,
195
int maxRows);
196
```
197
198
[SQL Gateway](./sql-gateway.md)
199
200
### Connector Framework
201
202
Extensible connector architecture with built-in connectors for testing and development, plus framework for custom connector development.
203
204
```java { .api }
205
// Base interfaces for connectors
206
public interface DynamicTableSource extends TableSource {
207
// Source connector interface
208
}
209
210
public interface DynamicTableSink extends TableSink {
211
// Sink connector interface
212
}
213
214
// Built-in connector factories
215
public class DataGenTableSourceFactory implements DynamicTableSourceFactory;
216
public class PrintTableSinkFactory implements DynamicTableSinkFactory;
217
```
218
219
[Connectors](./connectors.md)
220
221
## Common Usage Patterns
222
223
### Streaming Table Program
224
225
```java
226
// Set up streaming environment
227
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
228
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
229
230
// Create source table
231
tEnv.executeSql("CREATE TABLE clicks (" +
232
"user_id BIGINT," +
233
"page STRING," +
234
"timestamp_ltz TIMESTAMP_LTZ(3)," +
235
"WATERMARK FOR timestamp_ltz AS timestamp_ltz - INTERVAL '5' SECOND" +
236
") WITH ('connector' = 'kafka', ...)");
237
238
// Windowed aggregation
239
Table result = tEnv.from("clicks")
240
.window(Tumble.over(lit(1).hours()).on($("timestamp_ltz")).as("window"))
241
.groupBy($("window"), $("page"))
242
.select($("page"), $("user_id").count().as("page_views"));
243
244
// Output results
245
result.executeInsert("output_table");
246
```
247
248
### Batch ETL Pipeline
249
250
```java
251
// Batch environment
252
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
253
TableEnvironment tEnv = TableEnvironment.create(settings);
254
255
// Load and transform data
256
Table orders = tEnv.from("source_table")
257
.select($("order_id"), $("customer_id"), $("amount"), $("order_date"))
258
.filter($("amount").isGreaterOrEqual(lit(100)))
259
.groupBy($("customer_id"))
260
.select($("customer_id"),
261
$("amount").sum().as("total_spent"),
262
$("order_id").count().as("order_count"));
263
264
// Write results
265
orders.executeInsert("customer_summary");
266
```
267
268
## Error Handling
269
270
Common exceptions and error patterns:
271
272
```java { .api }
273
// Core exception types
274
public class TableException extends RuntimeException;
275
public class ValidationException extends TableException;
276
public class SqlParserException extends TableException;
277
public class CatalogNotExistException extends CatalogException;
278
public class TableNotExistException extends CatalogException;
279
```
280
281
Handle common errors:
282
283
```java
284
try {
285
TableResult result = tEnv.executeSql("SELECT * FROM non_existent_table");
286
} catch (TableNotExistException e) {
287
// Handle missing table
288
} catch (SqlParserException e) {
289
// Handle SQL syntax errors
290
} catch (ValidationException e) {
291
// Handle type/schema validation errors
292
}
293
```