0
# Table Operations
1
2
Core table manipulation and query capabilities for creating, transforming, aggregating, and joining tables in Apache Flink's Table API.
3
4
## Capabilities
5
6
### TableEnvironment
7
8
Primary entry point for all Table API programs providing table registration, SQL execution, and environment configuration.
9
10
```java { .api }
11
/**
12
* Creates a new TableEnvironment with specified settings
13
* @param settings Configuration for the table environment
14
* @return New TableEnvironment instance
15
*/
16
public static TableEnvironment create(EnvironmentSettings settings);
17
18
/**
19
* Create a table from a registered path in the catalog
20
* @param path Catalog path to the table (e.g., "database.table" or "table")
21
* @return Table instance for further operations
22
*/
23
public Table from(String path);
24
25
/**
26
* Execute a SQL statement (DDL, DML, or query)
27
* @param statement SQL statement to execute
28
* @return TableResult containing execution results or metadata
29
*/
30
public TableResult executeSql(String statement);
31
32
/**
33
* Create a table from a SQL query without executing it
34
* @param query SQL SELECT query
35
* @return Table instance representing the query results
36
*/
37
public Table sqlQuery(String query);
38
39
/**
40
* Register a table as a temporary view
41
* @param path Name/path for the temporary view
42
* @param view Table to register
43
*/
44
public void createTemporaryView(String path, Table view);
45
46
/**
47
* Switch to a different catalog
48
* @param catalogName Name of the catalog to use
49
*/
50
public void useCatalog(String catalogName);
51
52
/**
53
* Switch to a different database within the current catalog
54
* @param databaseName Name of the database to use
55
*/
56
public void useDatabase(String databaseName);
57
```
58
59
**Usage Example:**
60
61
```java
62
// Create environment
63
EnvironmentSettings settings = EnvironmentSettings.newInstance()
64
.inStreamingMode()
65
.build();
66
TableEnvironment tEnv = TableEnvironment.create(settings);
67
68
// Register source table
69
tEnv.executeSql("CREATE TABLE orders (" +
70
"id BIGINT, product STRING, amount DECIMAL(10,2)" +
71
") WITH ('connector' = 'kafka', ...)");
72
73
// Create table reference
74
Table orders = tEnv.from("orders");
75
```
76
77
### EnvironmentSettings
78
79
Configuration builder for TableEnvironment initialization specifying execution mode and planner settings.
80
81
```java { .api }
82
/**
83
* Create a new environment settings builder
84
* @return Builder instance for configuration
85
*/
86
public static Builder newInstance();
87
88
public static class Builder {
89
/**
90
* Configure for streaming execution mode
91
* @return Builder instance for method chaining
92
*/
93
public Builder inStreamingMode();
94
95
/**
96
* Configure for batch execution mode
97
* @return Builder instance for method chaining
98
*/
99
public Builder inBatchMode();
100
101
/**
102
* Set the name of the built-in catalog
103
* @param catalogName Name for the default catalog
104
* @return Builder instance for method chaining
105
*/
106
public Builder withBuiltInCatalogName(String catalogName);
107
108
/**
109
* Set the name of the built-in database
110
* @param databaseName Name for the default database
111
* @return Builder instance for method chaining
112
*/
113
public Builder withBuiltInDatabaseName(String databaseName);
114
115
/**
116
* Build the environment settings
117
* @return Configured EnvironmentSettings instance
118
*/
119
public EnvironmentSettings build();
120
}
121
```
122
123
### Table Interface
124
125
Core interface representing a table and providing all transformation operations.
126
127
```java { .api }
128
/**
129
* Project specific columns from the table
130
* @param fields Expressions representing the columns to select
131
* @return New Table with selected columns
132
*/
133
public Table select(Expression... fields);
134
135
/**
136
* Filter rows based on a predicate
137
* @param predicate Boolean expression for filtering
138
* @return New Table with filtered rows
139
*/
140
public Table filter(Expression predicate);
141
142
/**
143
* Group rows by specified fields for aggregation
144
* @param fields Expressions representing grouping columns
145
* @return GroupedTable instance for aggregation operations
146
*/
147
public GroupedTable groupBy(Expression... fields);
148
149
/**
150
* Apply window function for time-based operations
151
* @param window Window specification (tumbling, sliding, or session)
152
* @return WindowedTable instance for windowed operations
153
*/
154
public WindowedTable window(GroupWindow window);
155
156
/**
157
* Perform inner join with another table
158
* @param right Table to join with
159
* @return New Table containing joined results
160
*/
161
public Table join(Table right);
162
163
/**
164
* Perform inner join with join condition
165
* @param right Table to join with
166
* @param joinPredicate Join condition expression
167
* @return New Table containing joined results
168
*/
169
public Table join(Table right, Expression joinPredicate);
170
171
/**
172
* Perform left outer join with another table
173
* @param right Table to join with
174
* @param joinPredicate Join condition expression
175
* @return New Table containing left outer join results
176
*/
177
public Table leftOuterJoin(Table right, Expression joinPredicate);
178
179
/**
180
* Perform right outer join with another table
181
* @param right Table to join with
182
* @param joinPredicate Join condition expression
183
* @return New Table containing right outer join results
184
*/
185
public Table rightOuterJoin(Table right, Expression joinPredicate);
186
187
/**
188
* Perform full outer join with another table
189
* @param right Table to join with
190
* @param joinPredicate Join condition expression
191
* @return New Table containing full outer join results
192
*/
193
public Table fullOuterJoin(Table right, Expression joinPredicate);
194
195
/**
196
* Execute the table operation and collect results
197
* @return TableResult containing the execution results
198
*/
199
public TableResult execute();
200
201
/**
202
* Insert table contents into a registered sink table
203
* @param tablePath Path to the target table
204
* @return TableResult for the insert operation
205
*/
206
public TableResult executeInsert(String tablePath);
207
208
/**
209
* Get the resolved schema of this table
210
* @return ResolvedSchema containing column information and constraints
211
*/
212
public ResolvedSchema getResolvedSchema();
213
214
/**
215
* Add or replace columns in the table
216
* @param fields Column expressions with aliases
217
* @return New Table with added/replaced columns
218
*/
219
public Table addColumns(Expression... fields);
220
221
/**
222
* Drop columns from the table
223
* @param fieldNames Names of columns to drop
224
* @return New Table without the specified columns
225
*/
226
public Table dropColumns(String... fieldNames);
227
228
/**
229
* Rename columns in the table
230
* @param fields Rename expressions (oldName as newName)
231
* @return New Table with renamed columns
232
*/
233
public Table renameColumns(Expression... fields);
234
235
/**
236
* Union with another table (duplicate elimination)
237
* @param right Table to union with
238
* @return New Table containing union results
239
*/
240
public Table union(Table right);
241
242
/**
243
* Union all with another table (no duplicate elimination)
244
* @param right Table to union with
245
* @return New Table containing union all results
246
*/
247
public Table unionAll(Table right);
248
249
/**
250
* Remove duplicate rows from the table
251
* @return New Table with duplicates removed
252
*/
253
public Table distinct();
254
255
/**
256
* Sort the table by specified columns
257
* @param fields Expressions for sorting (use .asc() or .desc())
258
* @return New Table with sorted rows
259
*/
260
public Table orderBy(Expression... fields);
261
262
/**
263
* Limit the number of rows returned
264
* @param fetch Maximum number of rows to return
265
* @return New Table with limited rows
266
*/
267
public Table limit(int fetch);
268
269
/**
270
* Limit with offset support
271
* @param offset Number of rows to skip
272
* @param fetch Maximum number of rows to return
273
* @return New Table with offset and limit applied
274
*/
275
public Table limit(int offset, int fetch);
276
```
277
278
**Usage Examples:**
279
280
```java
281
// Basic transformations
282
Table result = orders
283
.select($("product"), $("amount"), $("order_date"))
284
.filter($("amount").isGreater(lit(100)))
285
.orderBy($("amount").desc())
286
.limit(10);
287
288
// Joins
289
Table orderDetails = orders
290
.join(products, $("orders.product_id").isEqual($("products.id")))
291
.select($("orders.id"), $("products.name"), $("orders.amount"));
292
293
// Aggregations
294
Table summary = orders
295
.groupBy($("product"))
296
.select($("product"),
297
$("amount").sum().as("total_amount"),
298
$("id").count().as("order_count"));
299
```
300
301
### GroupedTable
302
303
Specialized table interface for grouped data that can be aggregated.
304
305
```java { .api }
306
/**
307
* Apply aggregation functions to grouped data
308
* @param aggregateExpressions Aggregation expressions
309
* @return AggregatedTable for further operations
310
*/
311
public AggregatedTable aggregate(Expression... aggregateExpressions);
312
313
/**
314
* Select columns and aggregations from grouped data
315
* @param fields Column and aggregation expressions
316
* @return New Table with aggregated results
317
*/
318
public Table select(Expression... fields);
319
```
320
321
### AggregatedTable
322
323
Specialized table interface representing aggregated data.
324
325
```java { .api }
326
/**
327
* Select final columns from aggregated data
328
* @param fields Column expressions for final selection
329
* @return New Table with selected aggregated results
330
*/
331
public Table select(Expression... fields);
332
```
333
334
### WindowedTable
335
336
Specialized table interface for windowed operations on streaming data.
337
338
```java { .api }
339
/**
340
* Group windowed data by specified fields
341
* @param fields Expressions representing grouping columns
342
* @return GroupedTable for windowed aggregation
343
*/
344
public GroupedTable groupBy(Expression... fields);
345
```
346
347
### TableResult
348
349
Result of table operations providing access to data and metadata.
350
351
```java { .api }
352
/**
353
* Print the results to stdout (for development/debugging)
354
*/
355
public void print();
356
357
/**
358
* Get the job client for the submitted job (async execution)
359
* @return Optional JobClient if the operation was submitted as a job
360
*/
361
public Optional<JobClient> getJobClient();
362
363
/**
364
* Get the result schema
365
* @return ResolvedSchema of the result
366
*/
367
public ResolvedSchema getResolvedSchema();
368
369
/**
370
* Get the result kind (success, success with content, etc.)
371
* @return ResultKind indicating the type of result
372
*/
373
public ResultKind getResultKind();
374
375
/**
376
* Collect all results as a list (for bounded results only)
377
* @return CloseableIterator for accessing result rows
378
*/
379
public CloseableIterator<Row> collect();
380
```
381
382
### Window Specifications
383
384
Factory classes for creating time-based windows.
385
386
```java { .api }
387
// Tumble - Non-overlapping windows
388
public class Tumble {
389
/**
390
* Create a tumbling window with specified size
391
* @param size Window size expression (e.g., lit(1).hours())
392
* @return TumbleWithSize for further configuration
393
*/
394
public static TumbleWithSize over(Expression size);
395
}
396
397
public class TumbleWithSize {
398
/**
399
* Specify the time field for the window
400
* @param timeField Time attribute field
401
* @return TumbleWithSizeOnTime for alias assignment
402
*/
403
public TumbleWithSizeOnTime on(Expression timeField);
404
}
405
406
public class TumbleWithSizeOnTime {
407
/**
408
* Assign an alias to the window
409
* @param alias Window alias for referencing in aggregations
410
* @return GroupWindow specification
411
*/
412
public GroupWindow as(String alias);
413
}
414
415
// Slide - Overlapping windows
416
public class Slide {
417
/**
418
* Create a sliding window with specified size
419
* @param size Window size expression
420
* @return SlideWithSize for further configuration
421
*/
422
public static SlideWithSize over(Expression size);
423
}
424
425
public class SlideWithSize {
426
/**
427
* Specify the slide interval
428
* @param slide Slide interval expression (must be less than size)
429
* @return SlideWithSizeAndSlide for time field specification
430
*/
431
public SlideWithSizeAndSlide every(Expression slide);
432
}
433
434
public class SlideWithSizeAndSlide {
435
/**
436
* Specify the time field for the window
437
* @param timeField Time attribute field
438
* @return SlideWithSizeAndSlideOnTime for alias assignment
439
*/
440
public SlideWithSizeAndSlideOnTime on(Expression timeField);
441
}
442
443
// Session - Event-driven windows
444
public class Session {
445
/**
446
* Create a session window with specified gap
447
* @param gap Session timeout gap expression
448
* @return SessionWithGap for further configuration
449
*/
450
public static SessionWithGap withGap(Expression gap);
451
}
452
```
453
454
**Window Usage Example:**
455
456
```java
457
// Tumbling window - 1 hour non-overlapping windows
458
Table hourlyStats = events
459
.window(Tumble.over(lit(1).hours()).on($("timestamp")).as("w"))
460
.groupBy($("w"), $("category"))
461
.select($("category"),
462
$("w").start().as("window_start"),
463
$("w").end().as("window_end"),
464
$("value").sum().as("total_value"));
465
466
// Sliding window - 1 hour windows every 15 minutes
467
Table slidingStats = events
468
.window(Slide.over(lit(1).hours()).every(lit(15).minutes()).on($("timestamp")).as("w"))
469
.groupBy($("w"), $("user_id"))
470
.select($("user_id"), $("value").avg().as("avg_value"));
471
472
// Session window - Group by 30 minute inactivity gaps
473
Table sessionStats = events
474
.window(Session.withGap(lit(30).minutes()).on($("timestamp")).as("w"))
475
.groupBy($("w"), $("user_id"))
476
.select($("user_id"), $("w").start().as("session_start"), $("event").count().as("event_count"));
477
```