0
# Table Operations
1
2
The Table class provides SQL-like operations for data transformation, filtering, aggregation, and joining. It represents a relational table with a schema and supports fluent method chaining.
3
4
## Capabilities
5
6
### Projection Operations
7
8
Select specific fields or computed expressions from the table.
9
10
```scala { .api }
11
/**
12
* Projects fields using expressions
13
* @param fields Expression-based field selections
14
* @returns New table with projected fields
15
*/
16
def select(fields: Expression*): Table
17
18
/**
19
* Projects fields using string expressions
20
* @param fields String-based field selections
21
* @returns New table with projected fields
22
*/
23
def select(fields: String): Table
24
```
25
26
**Usage Examples:**
27
28
```scala
29
// Expression-based selection
30
val projected = table.select('name, 'age + 1, 'salary * 0.1)
31
32
// String-based selection
33
val projected2 = table.select("name, age + 1 as nextAge, salary * 0.1 as bonus")
34
35
// Select all fields
36
val allFields = table.select('*)
37
```
38
39
### Filtering Operations
40
41
Filter rows based on predicates and conditions.
42
43
```scala { .api }
44
/**
45
* Filters rows using expression predicate
46
* @param predicate Boolean expression for filtering
47
* @returns Filtered table
48
*/
49
def filter(predicate: Expression): Table
50
51
/**
52
* Filters rows using string predicate
53
* @param predicate String-based boolean expression
54
* @returns Filtered table
55
*/
56
def filter(predicate: String): Table
57
58
/**
59
* Alias for filter operation
60
* @param predicate Boolean expression for filtering
61
* @returns Filtered table
62
*/
63
def where(predicate: Expression): Table
64
```
65
66
**Usage Examples:**
67
68
```scala
69
// Expression-based filtering
70
val adults = table.filter('age >= 18)
71
val activeUsers = table.where('status === "active")
72
73
// String-based filtering
74
val highEarners = table.filter("salary > 50000")
75
76
// Complex conditions
77
val filtered = table.filter('age >= 21 && 'department === "Engineering")
78
```
79
80
### Grouping and Aggregation
81
82
Group rows and perform aggregate operations.
83
84
```scala { .api }
85
/**
86
* Groups table by specified fields
87
* @param fields Grouping field expressions
88
* @returns GroupedTable for aggregation operations
89
*/
90
def groupBy(fields: Expression*): GroupedTable
91
92
class GroupedTable {
93
/**
94
* Selects fields and aggregates for grouped table
95
* @param fields Fields and aggregate expressions
96
* @returns Aggregated table
97
*/
98
def select(fields: Expression*): Table
99
100
/**
101
* Selects fields using string expressions
102
* @param fields String-based field and aggregate selections
103
* @returns Aggregated table
104
*/
105
def select(fields: String): Table
106
}
107
```
108
109
**Usage Examples:**
110
111
```scala
112
// Basic grouping and aggregation
113
val grouped = table
114
.groupBy('department)
115
.select('department, 'salary.avg, 'age.max, 'name.count)
116
117
// Multiple grouping fields
118
val multiGrouped = table
119
.groupBy('department, 'level)
120
.select('department, 'level, 'salary.sum as 'totalSalary)
121
122
// String-based aggregation
123
val stringAgg = table
124
.groupBy('department)
125
.select("department, AVG(salary) as avgSalary, COUNT(*) as employeeCount")
126
```
127
128
### Sorting Operations
129
130
Order table rows by specified criteria.
131
132
```scala { .api }
133
/**
134
* Orders table by specified fields
135
* @param fields Ordering field expressions (use .asc/.desc for direction)
136
* @returns Ordered table
137
*/
138
def orderBy(fields: Expression*): Table
139
```
140
141
**Usage Examples:**
142
143
```scala
144
// Ascending order (default)
145
val sortedAsc = table.orderBy('name)
146
147
// Descending order
148
val sortedDesc = table.orderBy('salary.desc)
149
150
// Multiple fields
151
val multiSorted = table.orderBy('department.asc, 'salary.desc, 'name.asc)
152
```
153
154
### Distinct Operations
155
156
Remove duplicate rows from the table.
157
158
```scala { .api }
159
/**
160
* Removes duplicate rows from the table
161
* @returns Table with unique rows
162
*/
163
def distinct(): Table
164
```
165
166
**Usage Examples:**
167
168
```scala
169
// Remove all duplicates
170
val unique = table.distinct()
171
172
// Distinct after projection
173
val uniqueNames = table.select('name).distinct()
174
```
175
176
### Join Operations
177
178
Combine tables using various join strategies.
179
180
```scala { .api }
181
/**
182
* Inner join with another table (Cartesian product)
183
* @param right Right table to join
184
* @returns Joined table
185
*/
186
def join(right: Table): Table
187
188
/**
189
* Inner join with join condition
190
* @param right Right table to join
191
* @param joinPredicate Join condition expression
192
* @returns Joined table
193
*/
194
def join(right: Table, joinPredicate: Expression): Table
195
196
/**
197
* Left outer join
198
* @param right Right table to join
199
* @param joinPredicate Join condition expression
200
* @returns Left outer joined table
201
*/
202
def leftOuterJoin(right: Table, joinPredicate: Expression): Table
203
204
/**
205
* Right outer join
206
* @param right Right table to join
207
* @param joinPredicate Join condition expression
208
* @returns Right outer joined table
209
*/
210
def rightOuterJoin(right: Table, joinPredicate: Expression): Table
211
212
/**
213
* Full outer join
214
* @param right Right table to join
215
* @param joinPredicate Join condition expression
216
* @returns Full outer joined table
217
*/
218
def fullOuterJoin(right: Table, joinPredicate: Expression): Table
219
```
220
221
**Usage Examples:**
222
223
```scala
224
val employees = tEnv.scan("Employees")
225
val departments = tEnv.scan("Departments")
226
227
// Inner join
228
val innerJoined = employees.join(departments, 'emp_dept_id === 'dept_id)
229
230
// Left outer join
231
val leftJoined = employees.leftOuterJoin(departments, 'emp_dept_id === 'dept_id)
232
233
// Multiple join conditions
234
val complexJoin = employees.join(
235
departments,
236
'emp_dept_id === 'dept_id && 'emp_status === "active"
237
)
238
```
239
240
### Set Operations
241
242
Combine tables using set-based operations.
243
244
```scala { .api }
245
/**
246
* Union with another table (removes duplicates)
247
* @param right Right table for union
248
* @returns Union of both tables without duplicates
249
*/
250
def union(right: Table): Table
251
252
/**
253
* Union all with another table (keeps duplicates)
254
* @param right Right table for union
255
* @returns Union of both tables with duplicates
256
*/
257
def unionAll(right: Table): Table
258
259
/**
260
* Set difference (removes duplicates)
261
* @param right Right table for difference
262
* @returns Rows in left table but not in right table
263
*/
264
def minus(right: Table): Table
265
266
/**
267
* Set difference all (keeps duplicates)
268
* @param right Right table for difference
269
* @returns All rows in left table minus right table rows
270
*/
271
def minusAll(right: Table): Table
272
273
/**
274
* Intersection (removes duplicates)
275
* @param right Right table for intersection
276
* @returns Common rows between both tables
277
*/
278
def intersect(right: Table): Table
279
280
/**
281
* Intersection all (keeps duplicates)
282
* @param right Right table for intersection
283
* @returns All common rows between both tables
284
*/
285
def intersectAll(right: Table): Table
286
```
287
288
**Usage Examples:**
289
290
```scala
291
val currentEmployees = tEnv.scan("CurrentEmployees")
292
val formerEmployees = tEnv.scan("FormerEmployees")
293
294
// Union operations
295
val allEmployees = currentEmployees.union(formerEmployees)
296
val allEmployeesWithDupes = currentEmployees.unionAll(formerEmployees)
297
298
// Set difference
299
val onlyCurrent = currentEmployees.minus(formerEmployees)
300
301
// Intersection
302
val rehiredEmployees = currentEmployees.intersect(formerEmployees)
303
```
304
305
### Field Aliasing and Renaming
306
307
Rename fields and provide aliases for table references.
308
309
```scala { .api }
310
/**
311
* Renames fields of the table
312
* @param fields New field name expressions
313
* @returns Table with renamed fields
314
*/
315
def as(fields: Expression*): Table
316
```
317
318
**Usage Examples:**
319
320
```scala
321
// Rename all fields
322
val renamed = table.as('employee_name, 'employee_age, 'employee_salary)
323
324
// Rename selected fields after projection
325
val projected = table
326
.select('name, 'age, 'salary * 12)
327
.as('fullName, 'currentAge, 'annualSalary)
328
```
329
330
### Schema Operations
331
332
Access and inspect table schema information.
333
334
```scala { .api }
335
/**
336
* Gets the schema of the table
337
* @returns TableSchema containing field information
338
*/
339
def getSchema: TableSchema
340
341
/**
342
* Prints the schema to console
343
*/
344
def printSchema(): Unit
345
```
346
347
**Usage Examples:**
348
349
```scala
350
// Get schema information
351
val schema = table.getSchema
352
val fieldNames = schema.getFieldNames
353
val fieldTypes = schema.getFieldTypes
354
355
// Print schema for debugging
356
table.printSchema()
357
```
358
359
### Output Operations
360
361
Write table results to registered sinks or external systems.
362
363
```scala { .api }
364
/**
365
* Writes table to a table sink
366
* @param sink Table sink for output
367
*/
368
def writeToSink[T](sink: TableSink[T]): Unit
369
370
/**
371
* Inserts table data into a registered sink
372
* @param tableName Name of registered table sink
373
*/
374
def insertInto(tableName: String): Unit
375
```
376
377
**Usage Examples:**
378
379
```scala
380
// Write to custom sink
381
val csvSink = new CsvTableSink("/path/to/output.csv")
382
table.writeToSink(csvSink)
383
384
// Insert into registered sink
385
tEnv.registerTableSink("OutputTable", fieldNames, fieldTypes, csvSink)
386
table.insertInto("OutputTable")
387
```
388
389
### Result Limiting Operations
390
391
Control the number of rows returned from sorted results.
392
393
```scala { .api }
394
/**
395
* Limits a sorted result from an offset position
396
* @param offset Number of records to skip
397
* @returns Table with offset applied
398
*/
399
def offset(offset: Int): Table
400
401
/**
402
* Limits a sorted result to the first n rows
403
* @param fetch Number of records to return (must be >= 0)
404
* @returns Table limited to first n rows
405
*/
406
def fetch(fetch: Int): Table
407
408
/**
409
* Limits a sorted result (deprecated - use offset/fetch instead)
410
* @param offset Number of records to skip
411
* @returns Table with limit applied
412
*/
413
@deprecated("Use offset() and fetch() instead")
414
def limit(offset: Int): Table
415
416
/**
417
* Limits a sorted result with offset and fetch (deprecated)
418
* @param offset Number of records to skip
419
* @param fetch Number of records to return
420
* @returns Table with limit applied
421
*/
422
@deprecated("Use offset() and fetch() instead")
423
def limit(offset: Int, fetch: Int): Table
424
```
425
426
**Usage Examples:**
427
428
```scala
429
// Skip first 5 rows
430
val offsetResult = table.orderBy('name).offset(5)
431
432
// Return first 10 rows
433
val fetchResult = table.orderBy('salary.desc).fetch(10)
434
435
// Skip 5 rows and return next 10
436
val combined = table.orderBy('name).offset(5).fetch(10)
437
438
// Deprecated limit usage (still available)
439
val limited = table.orderBy('name).limit(5, 10)
440
```
441
442
### Internal API Access
443
444
Access internal table representations for advanced use cases.
445
446
```scala { .api }
447
/**
448
* Returns the RelNode representation of this table
449
* @returns RelNode for advanced query operations
450
*/
451
def getRelNode: RelNode
452
453
/**
454
* Access to the relation builder for advanced operations
455
* @returns FlinkRelBuilder instance
456
*/
457
def relBuilder: FlinkRelBuilder
458
```
459
460
**Usage Examples:**
461
462
```scala
463
// Access internal RelNode (advanced usage)
464
val relNode = table.getRelNode
465
466
// Access relation builder for complex operations
467
val builder = table.relBuilder
468
```
469
470
## Window Operations
471
472
Apply windowing operations for time-based aggregations.
473
474
```scala { .api }
475
/**
476
* Applies time or count-based windows to the table
477
* @param window Window specification (Tumble, Slide, or Session)
478
* @returns WindowedTable for window-based operations
479
*/
480
def window(window: Window): WindowedTable
481
482
/**
483
* Applies over-windows for row-based calculations
484
* @param overWindows Over-window specifications
485
* @returns OverWindowedTable for over-window operations
486
*/
487
def window(overWindows: OverWindow*): OverWindowedTable
488
```
489
490
**Usage Examples:**
491
492
```scala
493
import org.apache.flink.table.api.Tumble
494
495
// Tumbling window
496
val windowedTable = table
497
.window(Tumble over 10.minutes on 'rowtime as 'w)
498
.groupBy('w, 'department)
499
.select('department, 'w.start, 'w.end, 'salary.avg)
500
501
// Over window
502
val overResult = table
503
.window(Over partitionBy 'department orderBy 'salary.desc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w)
504
.select('name, 'salary, 'salary.sum over 'w)
505
```
506
507
## Types
508
509
```scala { .api }
510
class Table {
511
def tableEnv: TableEnvironment
512
}
513
514
class GroupedTable
515
class WindowedTable
516
class WindowGroupedTable
517
class OverWindowedTable
518
519
class TableSchema {
520
def getFieldNames: Array[String]
521
def getFieldTypes: Array[TypeInformation[_]]
522
def getFieldCount: Int
523
}
524
```