0
# Table Operations
1
2
Table is the core abstraction of the Table API, representing a pipeline of data transformations. It provides fluent API methods for selection, filtering, aggregation, joins, and window operations on both bounded and unbounded data streams.
3
4
## Capabilities
5
6
### Basic Transformations
7
8
Core operations for selecting fields, filtering data, and basic data manipulation.
9
10
```java { .api }
11
/**
12
* Selects the given fields from the table
13
* @param fields Expressions defining the selected fields
14
* @return New Table with selected fields
15
*/
16
Table select(Expression... fields);
17
18
/**
19
* Filters rows based on the given predicate
20
* @param predicate Boolean expression for row filtering
21
* @return New Table with filtered rows
22
*/
23
Table filter(Expression predicate);
24
25
/**
26
* Alias for filter() - filters rows based on the given predicate
27
* @param predicate Boolean expression for row filtering
28
* @return New Table with filtered rows
29
*/
30
Table where(Expression predicate);
31
32
/**
33
* Renames fields of the table
34
* @param fields Expressions defining new field names
35
* @return New Table with renamed fields
36
*/
37
Table as(Expression... fields);
38
39
/**
40
* Adds additional columns to the table
41
* @param fields Expressions defining new columns
42
* @return New Table with additional columns
43
*/
44
Table addColumns(Expression... fields);
45
46
/**
47
* Adds columns or replaces existing ones
48
* @param fields Expressions defining columns to add or replace
49
* @return New Table with added/replaced columns
50
*/
51
Table addOrReplaceColumns(Expression... fields);
52
53
/**
54
* Drops columns from the table
55
* @param fields Expressions defining columns to drop
56
* @return New Table without the specified columns
57
*/
58
Table dropColumns(Expression... fields);
59
60
/**
61
* Returns distinct rows from the table
62
* @return New Table with distinct rows
63
*/
64
Table distinct();
65
```
66
67
**Usage Examples:**
68
69
```java
70
import static org.apache.flink.table.api.Expressions.*;
71
72
// Basic selection and filtering
73
Table customers = tableEnv.from("customers");
74
Table result = customers
75
.select($("customer_id"), $("name"), $("email"))
76
.filter($("age").isGreater(18))
77
.where($("active").isEqual(true));
78
79
// Column manipulation
80
Table enhanced = customers
81
.addColumns($("name").upperCase().as("name_upper"))
82
.dropColumns($("internal_notes"))
83
.as("customer_id", "full_name", "email_address", "age", "is_active", "name_upper");
84
85
// Distinct records
86
Table uniqueCategories = products
87
.select($("category"))
88
.distinct();
89
```
90
91
### Grouping and Aggregation
92
93
Operations for grouping data and computing aggregations.
94
95
```java { .api }
96
/**
97
* Groups the table by the given expressions
98
* @param fields Expressions defining grouping keys
99
* @return GroupedTable for applying aggregations
100
*/
101
GroupedTable groupBy(Expression... fields);
102
103
/**
104
* Applies aggregation functions to grouped or ungrouped table
105
* @param aggregateExpression Aggregate expression (sum, count, avg, etc.)
106
* @param moreAggregateExpressions Additional aggregate expressions
107
* @return AggregatedTable with aggregation results
108
*/
109
AggregatedTable aggregate(Expression aggregateExpression, Expression... moreAggregateExpressions);
110
111
/**
112
* Applies flat aggregation using a table aggregate function
113
* @param tableAggFunction Table aggregate function
114
* @return FlatAggregateTable with flattened results
115
*/
116
FlatAggregateTable flatAggregate(Expression tableAggFunction);
117
```
118
119
**Usage Examples:**
120
121
```java
122
// Basic grouping and aggregation
123
Table sales = tableEnv.from("sales");
124
Table salesByRegion = sales
125
.groupBy($("region"))
126
.select($("region"), $("amount").sum().as("total_sales"));
127
128
// Multiple aggregations
129
Table summary = sales
130
.groupBy($("region"), $("product_category"))
131
.select(
132
$("region"),
133
$("product_category"),
134
$("amount").sum().as("total_sales"),
135
$("amount").avg().as("avg_sale"),
136
$("order_id").count().as("num_orders")
137
);
138
139
// Aggregate without grouping
140
Table overallStats = sales
141
.select(
142
$("amount").sum().as("total"),
143
$("amount").avg().as("average"),
144
$("order_id").count().as("count")
145
);
146
```
147
148
### Join Operations
149
150
Various types of joins for combining data from multiple tables.
151
152
```java { .api }
153
/**
154
* Inner join with another table
155
* @param right Right table to join with
156
* @return New Table with joined results
157
*/
158
Table join(Table right);
159
160
/**
161
* Inner join with explicit join condition
162
* @param right Right table to join with
163
* @param joinPredicate Boolean expression defining join condition
164
* @return New Table with joined results
165
*/
166
Table join(Table right, Expression joinPredicate);
167
168
/**
169
* Left outer join with another table
170
* @param right Right table to join with
171
* @return New Table with left outer join results
172
*/
173
Table leftOuterJoin(Table right);
174
175
/**
176
* Left outer join with explicit join condition
177
* @param right Right table to join with
178
* @param joinPredicate Boolean expression defining join condition
179
* @return New Table with left outer join results
180
*/
181
Table leftOuterJoin(Table right, Expression joinPredicate);
182
183
/**
184
* Right outer join with another table
185
* @param right Right table to join with
186
* @return New Table with right outer join results
187
*/
188
Table rightOuterJoin(Table right);
189
190
/**
191
* Right outer join with explicit join condition
192
* @param right Right table to join with
193
* @param joinPredicate Boolean expression defining join condition
194
* @return New Table with right outer join results
195
*/
196
Table rightOuterJoin(Table right, Expression joinPredicate);
197
198
/**
199
* Full outer join with another table
200
* @param right Right table to join with
201
* @return New Table with full outer join results
202
*/
203
Table fullOuterJoin(Table right);
204
205
/**
206
* Full outer join with explicit join condition
207
* @param right Right table to join with
208
* @param joinPredicate Boolean expression defining join condition
209
* @return New Table with full outer join results
210
*/
211
Table fullOuterJoin(Table right, Expression joinPredicate);
212
```
213
214
**Usage Examples:**
215
216
```java
217
Table customers = tableEnv.from("customers");
218
Table orders = tableEnv.from("orders");
219
220
// Inner join with implicit condition (requires common column names)
221
Table customerOrders = customers.join(orders);
222
223
// Inner join with explicit condition
224
Table explicitJoin = customers
225
.join(orders, $("customer_id").isEqual($("cust_id")))
226
.select($("name"), $("order_id"), $("amount"));
227
228
// Left outer join to include all customers
229
Table allCustomers = customers
230
.leftOuterJoin(orders, $("customer_id").isEqual($("cust_id")))
231
.select($("name"), $("order_id").isNull().as("no_orders"));
232
233
// Complex join with multiple conditions
234
Table complexJoin = customers
235
.join(orders,
236
$("customer_id").isEqual($("cust_id"))
237
.and($("status").isEqual("active")))
238
.select($("name"), $("order_date"), $("total"));
239
```
240
241
### Window Operations
242
243
Time-based window operations for streaming data processing.
244
245
```java { .api }
246
/**
247
* Groups records into windows based on time attributes
248
* @param window Window specification (tumbling, sliding, session)
249
* @return WindowGroupedTable for window-based aggregations
250
*/
251
WindowGroupedTable window(GroupWindow window);
252
253
/**
254
* Applies over window aggregations
255
* @param overWindows Over window specifications
256
* @return New Table with over window results
257
*/
258
Table select(Expression... fields);
259
```
260
261
**Usage Examples:**
262
263
```java
264
// Tumbling window aggregation
265
Table events = tableEnv.from("events");
266
Table windowedStats = events
267
.window(Tumble.over(lit(5).minutes()).on($("event_time")).as("w"))
268
.groupBy($("user_id"), $("w"))
269
.select(
270
$("user_id"),
271
$("w").start().as("window_start"),
272
$("w").end().as("window_end"),
273
$("event_count").sum().as("total_events")
274
);
275
276
// Over window for running calculations
277
Table runningTotals = sales
278
.select(
279
$("product_id"),
280
$("sale_time"),
281
$("amount"),
282
$("amount").sum().over(
283
partitionBy($("product_id"))
284
.orderBy($("sale_time"))
285
.rows().unboundedPreceding().toCurrentRow()
286
).as("running_total")
287
);
288
```
289
290
### Ordering and Limiting
291
292
Operations for sorting and limiting result sets.
293
294
```java { .api }
295
/**
296
* Orders the table by the given expressions
297
* @param fields Expressions defining sort order
298
* @return New Table with ordered rows
299
*/
300
Table orderBy(Expression... fields);
301
302
/**
303
* Limits the number of returned rows
304
* @param fetch Maximum number of rows to return
305
* @return New Table with limited rows
306
*/
307
Table limit(int fetch);
308
309
/**
310
* Limits with offset and fetch
311
* @param offset Number of rows to skip
312
* @param fetch Maximum number of rows to return
313
* @return New Table with limited rows
314
*/
315
Table limit(int offset, int fetch);
316
```
317
318
**Usage Examples:**
319
320
```java
321
// Order by multiple fields
322
Table sortedCustomers = customers
323
.orderBy($("registration_date").desc(), $("name").asc());
324
325
// Top N results
326
Table topSellers = sales
327
.groupBy($("seller_id"))
328
.select($("seller_id"), $("amount").sum().as("total_sales"))
329
.orderBy($("total_sales").desc())
330
.limit(10);
331
332
// Pagination
333
Table page2 = products
334
.orderBy($("product_id"))
335
.limit(20, 10); // Skip 20, take 10
336
```
337
338
### Schema Access and Metadata
339
340
Methods for accessing table schema and metadata information.
341
342
```java { .api }
343
/**
344
* Gets the resolved schema of this table
345
* @return ResolvedSchema containing column information
346
*/
347
ResolvedSchema getResolvedSchema();
348
349
/**
350
* Gets the query operation that defines this table
351
* @return QueryOperation representing the table pipeline
352
*/
353
QueryOperation getQueryOperation();
354
```
355
356
**Usage Examples:**
357
358
```java
359
Table myTable = tableEnv.from("products");
360
361
// Access schema information
362
ResolvedSchema schema = myTable.getResolvedSchema();
363
List<Column> columns = schema.getColumns();
364
for (Column column : columns) {
365
System.out.println(column.getName() + ": " + column.getDataType());
366
}
367
368
// Check column existence
369
boolean hasCategory = schema.getColumn("category").isPresent();
370
```
371
372
### Execution and Explanation
373
374
Methods for executing table operations and examining execution plans.
375
376
```java { .api }
377
/**
378
* Executes the table and returns results
379
* @return TableResult containing execution information and data
380
*/
381
TableResult execute();
382
383
/**
384
* Returns the execution plan as a string
385
* @return String representation of the execution plan
386
*/
387
String explain();
388
389
/**
390
* Returns detailed execution plan with specified format
391
* @param format Format for the explanation (TEXT or JSON)
392
* @param details Additional details to include in the plan
393
* @return String representation of the detailed execution plan
394
*/
395
String explain(ExplainFormat format, ExplainDetail... details);
396
397
/**
398
* Inserts the table contents into a target table
399
* @param tablePath Target table path
400
* @return TableResult with execution information
401
*/
402
TableResult insertInto(String tablePath);
403
```
404
405
**Usage Examples:**
406
407
```java
408
Table result = customers
409
.filter($("age").isGreater(25))
410
.select($("name"), $("email"));
411
412
// Examine execution plan
413
System.out.println(result.explain());
414
415
// Detailed plan with JSON format
416
String detailedPlan = result.explain(
417
ExplainFormat.JSON,
418
ExplainDetail.CHANGELOG_MODE,
419
ExplainDetail.ESTIMATED_COST
420
);
421
422
// Execute and process results
423
TableResult tableResult = result.execute();
424
try (CloseableIterator<Row> iterator = tableResult.collect()) {
425
while (iterator.hasNext()) {
426
Row row = iterator.next();
427
System.out.println(row);
428
}
429
}
430
431
// Insert into target table
432
result.insertInto("customer_summary").execute();
433
```
434
435
### Set Operations
436
437
Operations for combining tables using set theory operations.
438
439
```java { .api }
440
/**
441
* Returns the union of this table and the given table
442
* @param right The table to union with
443
* @return New Table containing union of both tables (with duplicates removed)
444
*/
445
Table union(Table right);
446
447
/**
448
* Returns the union of this table and the given table including duplicates
449
* @param right The table to union with
450
* @return New Table containing union of both tables (with duplicates)
451
*/
452
Table unionAll(Table right);
453
454
/**
455
* Returns the intersection of this table and the given table
456
* @param right The table to intersect with
457
* @return New Table containing only rows present in both tables
458
*/
459
Table intersect(Table right);
460
461
/**
462
* Returns the intersection of this table and the given table including duplicates
463
* @param right The table to intersect with
464
* @return New Table containing intersection with duplicates
465
*/
466
Table intersectAll(Table right);
467
468
/**
469
* Returns the minus operation (difference) of this table and the given table
470
* @param right The table to subtract from this table
471
* @return New Table containing rows in this table but not in the right table
472
*/
473
Table minus(Table right);
474
475
/**
476
* Returns the minus operation including duplicates
477
* @param right The table to subtract from this table
478
* @return New Table containing difference with duplicates
479
*/
480
Table minusAll(Table right);
481
```
482
483
**Usage Examples:**
484
485
```java
486
Table europeanCustomers = tableEnv.from("customers_europe");
487
Table americanCustomers = tableEnv.from("customers_america");
488
489
// Union all customers
490
Table allCustomers = europeanCustomers.unionAll(americanCustomers);
491
492
// Find common customer IDs between regions (for validation)
493
Table commonIds = europeanCustomers
494
.select($("customer_id"))
495
.intersect(americanCustomers.select($("customer_id")));
496
497
// Find customers only in Europe
498
Table europeOnly = europeanCustomers
499
.select($("customer_id"))
500
.minus(americanCustomers.select($("customer_id")));
501
```
502
503
### Lateral Join Operations
504
505
Join operations with table-valued functions for dynamic table expansion.
506
507
```java { .api }
508
/**
509
* Performs a lateral join with a table function
510
* @param tableFunctionCall Expression calling a table function
511
* @return New Table with lateral join results
512
*/
513
Table joinLateral(Expression tableFunctionCall);
514
515
/**
516
* Performs a lateral join with a table function and join condition
517
* @param tableFunctionCall Expression calling a table function
518
* @param joinPredicate Join condition expression
519
* @return New Table with lateral join results
520
*/
521
Table joinLateral(Expression tableFunctionCall, Expression joinPredicate);
522
523
/**
524
* Performs a left outer lateral join with a table function
525
* @param tableFunctionCall Expression calling a table function
526
* @return New Table with left outer lateral join results
527
*/
528
Table leftOuterJoinLateral(Expression tableFunctionCall);
529
530
/**
531
* Performs a left outer lateral join with a table function and join condition
532
* @param tableFunctionCall Expression calling a table function
533
* @param joinPredicate Join condition expression
534
* @return New Table with left outer lateral join results
535
*/
536
Table leftOuterJoinLateral(Expression tableFunctionCall, Expression joinPredicate);
537
```
538
539
**Usage Examples:**
540
541
```java
542
// Lateral join with a table function to split comma-separated values
543
Table orders = tableEnv.from("orders");
544
Table expandedOrders = orders
545
.joinLateral(call("split_string", $("item_list"), ",").as("item_id"))
546
.select($("order_id"), $("customer_id"), $("item_id"));
547
548
// Left outer lateral join to handle orders with no items
549
Table allOrders = orders
550
.leftOuterJoinLateral(call("split_string", $("item_list"), ",").as("item_id"))
551
.select($("order_id"), $("customer_id"), $("item_id"));
552
```
553
554
### Function Operations
555
556
Operations using scalar and table functions for data transformation.
557
558
```java { .api }
559
/**
560
* Applies a scalar function to each row
561
* @param mapFunction Scalar function expression
562
* @return New Table with function results
563
*/
564
Table map(Expression mapFunction);
565
566
/**
567
* Applies a table function that can produce multiple rows per input row
568
* @param tableFunction Table function expression
569
* @return New Table with flattened results
570
*/
571
Table flatMap(Expression tableFunction);
572
```
573
574
**Usage Examples:**
575
576
```java
577
Table events = tableEnv.from("events");
578
579
// Map operation to transform each row
580
Table transformedEvents = events
581
.map(call("parse_json", $("json_data")).as("parsed_data"))
582
.select($("event_id"), $("parsed_data"));
583
584
// FlatMap operation to explode arrays
585
Table expandedEvents = events
586
.flatMap(call("explode_array", $("tag_array")).as("tag"))
587
.select($("event_id"), $("event_time"), $("tag"));
588
```
589
590
### Temporal Operations
591
592
Operations for creating temporal table functions from tables.
593
594
```java { .api }
595
/**
596
* Creates a temporal table function from this table
597
* @param timeAttribute Expression identifying the time attribute
598
* @param primaryKey Expression identifying the primary key
599
* @return TemporalTableFunction for temporal joins
600
*/
601
TemporalTableFunction createTemporalTableFunction(Expression timeAttribute, Expression primaryKey);
602
```
603
604
**Usage Examples:**
605
606
```java
607
Table exchangeRates = tableEnv.from("exchange_rates");
608
609
// Create temporal table function for exchange rates
610
TemporalTableFunction ratesFunction = exchangeRates
611
.createTemporalTableFunction($("rate_time"), $("currency"));
612
613
// Register for use in temporal joins
614
tableEnv.createTemporarySystemFunction("rates", ratesFunction);
615
616
// Use in temporal join
617
Table orders = tableEnv.from("orders");
618
Table ordersWithRates = orders
619
.joinLateral(call("rates", $("order_time")).as("rate_currency", "exchange_rate"))
620
.select($("order_id"), $("amount"), $("exchange_rate"),
621
$("amount").times($("exchange_rate")).as("amount_usd"));
622
```
623
624
### Column Manipulation
625
626
Additional operations for advanced column manipulation.
627
628
```java { .api }
629
/**
630
* Renames columns of the table
631
* @param fields Expressions defining new column names
632
* @return New Table with renamed columns
633
*/
634
Table renameColumns(Expression... fields);
635
636
/**
637
* Skips the first n rows
638
* @param offset Number of rows to skip
639
* @return New Table with offset applied
640
*/
641
Table offset(int offset);
642
643
/**
644
* Takes the first n rows after any offset
645
* @param fetch Number of rows to take
646
* @return New Table with fetch applied
647
*/
648
Table fetch(int fetch);
649
650
/**
651
* Creates an alias name for the table with optional field names
652
* @param field First field name
653
* @param fields Additional field names
654
* @return New Table with alias applied
655
*/
656
Table as(String field, String... fields);
657
```
658
659
**Usage Examples:**
660
661
```java
662
Table products = tableEnv.from("products");
663
664
// Rename columns
665
Table renamedProducts = products
666
.renameColumns($("prod_id").as("product_id"), $("prod_name").as("product_name"));
667
668
// Pagination using offset and fetch
669
Table page3Products = products
670
.orderBy($("product_id"))
671
.offset(20)
672
.fetch(10);
673
674
// Alias table and columns
675
Table aliasedProducts = products.as("p", "id", "name", "price", "category");
676
```
677
678
### Insert Operations
679
680
Operations for inserting table data into target tables.
681
682
```java { .api }
683
/**
684
* Creates a pipeline to insert table data into the specified table
685
* @param tablePath Target table path
686
* @return TablePipeline for further configuration
687
*/
688
TablePipeline insertInto(String tablePath);
689
690
/**
691
* Creates a pipeline to insert table data with overwrite option
692
* @param tablePath Target table path
693
* @param overwrite Whether to overwrite existing data
694
* @return TablePipeline for further configuration
695
*/
696
TablePipeline insertInto(String tablePath, boolean overwrite);
697
698
/**
699
* Creates a pipeline to insert table data using table descriptor
700
* @param descriptor Table descriptor defining the target
701
* @return TablePipeline for further configuration
702
*/
703
TablePipeline insertInto(TableDescriptor descriptor);
704
705
/**
706
* Creates a pipeline to insert table data using table descriptor with overwrite
707
* @param descriptor Table descriptor defining the target
708
* @param overwrite Whether to overwrite existing data
709
* @return TablePipeline for further configuration
710
*/
711
TablePipeline insertInto(TableDescriptor descriptor, boolean overwrite);
712
713
/**
714
* Directly executes insert into the specified table
715
* @param tablePath Target table path
716
* @return TableResult with execution information
717
*/
718
TableResult executeInsert(String tablePath);
719
720
/**
721
* Directly executes insert with overwrite option
722
* @param tablePath Target table path
723
* @param overwrite Whether to overwrite existing data
724
* @return TableResult with execution information
725
*/
726
TableResult executeInsert(String tablePath, boolean overwrite);
727
728
/**
729
* Directly executes insert using table descriptor
730
* @param descriptor Table descriptor defining the target
731
* @return TableResult with execution information
732
*/
733
TableResult executeInsert(TableDescriptor descriptor);
734
735
/**
736
* Directly executes insert using table descriptor with overwrite
737
* @param descriptor Table descriptor defining the target
738
* @param overwrite Whether to overwrite existing data
739
* @return TableResult with execution information
740
*/
741
TableResult executeInsert(TableDescriptor descriptor, boolean overwrite);
742
```
743
744
**Usage Examples:**
745
746
```java
747
Table processedOrders = orders
748
.filter($("status").isEqual("processed"))
749
.select($("order_id"), $("customer_id"), $("total_amount"));
750
751
// Direct insert execution
752
TableResult result = processedOrders.executeInsert("processed_orders");
753
754
// Pipeline-based insert for more control
755
TablePipeline pipeline = processedOrders.insertInto("processed_orders", true);
756
TableResult pipelineResult = pipeline.execute();
757
758
// Insert using table descriptor
759
TableDescriptor targetDescriptor = TableDescriptor.forConnector("kafka")
760
.schema(Schema.newBuilder()
761
.column("order_id", DataTypes.BIGINT())
762
.column("customer_id", DataTypes.BIGINT())
763
.column("total_amount", DataTypes.DECIMAL(10, 2))
764
.build())
765
.option("topic", "processed-orders")
766
.build();
767
768
TableResult descriptorResult = processedOrders.executeInsert(targetDescriptor);
769
```
770
771
## Types
772
773
### Grouped Tables
774
775
```java { .api }
776
interface GroupedTable {
777
Table select(Expression... fields);
778
AggregatedTable aggregate(Expression aggregateExpression, Expression... moreAggregateExpressions);
779
FlatAggregateTable flatAggregate(Expression tableAggFunction);
780
}
781
782
interface WindowGroupedTable extends GroupedTable {
783
// Inherits all GroupedTable methods
784
}
785
```
786
787
### Aggregated Tables
788
789
```java { .api }
790
interface AggregatedTable {
791
Table select(Expression... fields);
792
AggregatedTable aggregate(Expression aggregateExpression, Expression... moreAggregateExpressions);
793
}
794
795
interface FlatAggregateTable {
796
Table select(Expression... fields);
797
FlatAggregateTable flatAggregate(Expression tableAggFunction);
798
}
799
```
800
801
### Schema Information
802
803
```java { .api }
804
class ResolvedSchema {
805
List<Column> getColumns();
806
Optional<Column> getColumn(String name);
807
Optional<Column> getColumn(int index);
808
List<String> getColumnNames();
809
List<DataType> getColumnDataTypes();
810
Optional<UniqueConstraint> getPrimaryKey();
811
List<WatermarkSpec> getWatermarkSpecs();
812
}
813
814
class Column {
815
String getName();
816
DataType getDataType();
817
String getComment();
818
boolean isPhysical();
819
boolean isComputed();
820
boolean isMetadata();
821
}
822
```