or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdsources-sinks.mdsql-integration.mdtable-environment.mdtable-operations.mdtype-system.mduser-defined-functions.mdwindow-operations.md

table-operations.mddocs/

0

# Table Operations

1

2

The Table class provides SQL-like operations for data transformation, filtering, aggregation, and joining. It represents a relational table with a schema and supports fluent method chaining.

3

4

## Capabilities

5

6

### Projection Operations

7

8

Select specific fields or computed expressions from the table.

9

10

```scala { .api }

11

/**

12

* Projects fields using expressions

13

* @param fields Expression-based field selections

14

* @returns New table with projected fields

15

*/

16

def select(fields: Expression*): Table

17

18

/**

19

* Projects fields using string expressions

20

* @param fields String-based field selections

21

* @returns New table with projected fields

22

*/

23

def select(fields: String): Table

24

```

25

26

**Usage Examples:**

27

28

```scala

29

// Expression-based selection

30

val projected = table.select('name, 'age + 1, 'salary * 0.1)

31

32

// String-based selection

33

val projected2 = table.select("name, age + 1 as nextAge, salary * 0.1 as bonus")

34

35

// Select all fields

36

val allFields = table.select('*)

37

```

38

39

### Filtering Operations

40

41

Filter rows based on predicates and conditions.

42

43

```scala { .api }

44

/**

45

* Filters rows using expression predicate

46

* @param predicate Boolean expression for filtering

47

* @returns Filtered table

48

*/

49

def filter(predicate: Expression): Table

50

51

/**

52

* Filters rows using string predicate

53

* @param predicate String-based boolean expression

54

* @returns Filtered table

55

*/

56

def filter(predicate: String): Table

57

58

/**

59

* Alias for filter operation

60

* @param predicate Boolean expression for filtering

61

* @returns Filtered table

62

*/

63

def where(predicate: Expression): Table

64

```

65

66

**Usage Examples:**

67

68

```scala

69

// Expression-based filtering

70

val adults = table.filter('age >= 18)

71

val activeUsers = table.where('status === "active")

72

73

// String-based filtering

74

val highEarners = table.filter("salary > 50000")

75

76

// Complex conditions

77

val filtered = table.filter('age >= 21 && 'department === "Engineering")

78

```

79

80

### Grouping and Aggregation

81

82

Group rows and perform aggregate operations.

83

84

```scala { .api }

85

/**

86

* Groups table by specified fields

87

* @param fields Grouping field expressions

88

* @returns GroupedTable for aggregation operations

89

*/

90

def groupBy(fields: Expression*): GroupedTable

91

92

class GroupedTable {

93

/**

94

* Selects fields and aggregates for grouped table

95

* @param fields Fields and aggregate expressions

96

* @returns Aggregated table

97

*/

98

def select(fields: Expression*): Table

99

100

/**

101

* Selects fields using string expressions

102

* @param fields String-based field and aggregate selections

103

* @returns Aggregated table

104

*/

105

def select(fields: String): Table

106

}

107

```

108

109

**Usage Examples:**

110

111

```scala

112

// Basic grouping and aggregation

113

val grouped = table

114

.groupBy('department)

115

.select('department, 'salary.avg, 'age.max, 'name.count)

116

117

// Multiple grouping fields

118

val multiGrouped = table

119

.groupBy('department, 'level)

120

.select('department, 'level, 'salary.sum as 'totalSalary)

121

122

// String-based aggregation

123

val stringAgg = table

124

.groupBy('department)

125

.select("department, AVG(salary) as avgSalary, COUNT(*) as employeeCount")

126

```

127

128

### Sorting Operations

129

130

Order table rows by specified criteria.

131

132

```scala { .api }

133

/**

134

* Orders table by specified fields

135

* @param fields Ordering field expressions (use .asc/.desc for direction)

136

* @returns Ordered table

137

*/

138

def orderBy(fields: Expression*): Table

139

```

140

141

**Usage Examples:**

142

143

```scala

144

// Ascending order (default)

145

val sortedAsc = table.orderBy('name)

146

147

// Descending order

148

val sortedDesc = table.orderBy('salary.desc)

149

150

// Multiple fields

151

val multiSorted = table.orderBy('department.asc, 'salary.desc, 'name.asc)

152

```

153

154

### Distinct Operations

155

156

Remove duplicate rows from the table.

157

158

```scala { .api }

159

/**

160

* Removes duplicate rows from the table

161

* @returns Table with unique rows

162

*/

163

def distinct(): Table

164

```

165

166

**Usage Examples:**

167

168

```scala

169

// Remove all duplicates

170

val unique = table.distinct()

171

172

// Distinct after projection

173

val uniqueNames = table.select('name).distinct()

174

```

175

176

### Join Operations

177

178

Combine tables using various join strategies.

179

180

```scala { .api }

181

/**

182

* Inner join with another table (Cartesian product)

183

* @param right Right table to join

184

* @returns Joined table

185

*/

186

def join(right: Table): Table

187

188

/**

189

* Inner join with join condition

190

* @param right Right table to join

191

* @param joinPredicate Join condition expression

192

* @returns Joined table

193

*/

194

def join(right: Table, joinPredicate: Expression): Table

195

196

/**

197

* Left outer join

198

* @param right Right table to join

199

* @param joinPredicate Join condition expression

200

* @returns Left outer joined table

201

*/

202

def leftOuterJoin(right: Table, joinPredicate: Expression): Table

203

204

/**

205

* Right outer join

206

* @param right Right table to join

207

* @param joinPredicate Join condition expression

208

* @returns Right outer joined table

209

*/

210

def rightOuterJoin(right: Table, joinPredicate: Expression): Table

211

212

/**

213

* Full outer join

214

* @param right Right table to join

215

* @param joinPredicate Join condition expression

216

* @returns Full outer joined table

217

*/

218

def fullOuterJoin(right: Table, joinPredicate: Expression): Table

219

```

220

221

**Usage Examples:**

222

223

```scala

224

val employees = tEnv.scan("Employees")

225

val departments = tEnv.scan("Departments")

226

227

// Inner join

228

val innerJoined = employees.join(departments, 'emp_dept_id === 'dept_id)

229

230

// Left outer join

231

val leftJoined = employees.leftOuterJoin(departments, 'emp_dept_id === 'dept_id)

232

233

// Multiple join conditions

234

val complexJoin = employees.join(

235

departments,

236

'emp_dept_id === 'dept_id && 'emp_status === "active"

237

)

238

```

239

240

### Set Operations

241

242

Combine tables using set-based operations.

243

244

```scala { .api }

245

/**

246

* Union with another table (removes duplicates)

247

* @param right Right table for union

248

* @returns Union of both tables without duplicates

249

*/

250

def union(right: Table): Table

251

252

/**

253

* Union all with another table (keeps duplicates)

254

* @param right Right table for union

255

* @returns Union of both tables with duplicates

256

*/

257

def unionAll(right: Table): Table

258

259

/**

260

* Set difference (removes duplicates)

261

* @param right Right table for difference

262

* @returns Rows in left table but not in right table

263

*/

264

def minus(right: Table): Table

265

266

/**

267

* Set difference all (keeps duplicates)

268

* @param right Right table for difference

269

* @returns All rows in left table minus right table rows

270

*/

271

def minusAll(right: Table): Table

272

273

/**

274

* Intersection (removes duplicates)

275

* @param right Right table for intersection

276

* @returns Common rows between both tables

277

*/

278

def intersect(right: Table): Table

279

280

/**

281

* Intersection all (keeps duplicates)

282

* @param right Right table for intersection

283

* @returns All common rows between both tables

284

*/

285

def intersectAll(right: Table): Table

286

```

287

288

**Usage Examples:**

289

290

```scala

291

val currentEmployees = tEnv.scan("CurrentEmployees")

292

val formerEmployees = tEnv.scan("FormerEmployees")

293

294

// Union operations

295

val allEmployees = currentEmployees.union(formerEmployees)

296

val allEmployeesWithDupes = currentEmployees.unionAll(formerEmployees)

297

298

// Set difference

299

val onlyCurrent = currentEmployees.minus(formerEmployees)

300

301

// Intersection

302

val rehiredEmployees = currentEmployees.intersect(formerEmployees)

303

```

304

305

### Field Aliasing and Renaming

306

307

Rename fields and provide aliases for table references.

308

309

```scala { .api }

310

/**

311

* Renames fields of the table

312

* @param fields New field name expressions

313

* @returns Table with renamed fields

314

*/

315

def as(fields: Expression*): Table

316

```

317

318

**Usage Examples:**

319

320

```scala

321

// Rename all fields

322

val renamed = table.as('employee_name, 'employee_age, 'employee_salary)

323

324

// Rename selected fields after projection

325

val projected = table

326

.select('name, 'age, 'salary * 12)

327

.as('fullName, 'currentAge, 'annualSalary)

328

```

329

330

### Schema Operations

331

332

Access and inspect table schema information.

333

334

```scala { .api }

335

/**

336

* Gets the schema of the table

337

* @returns TableSchema containing field information

338

*/

339

def getSchema: TableSchema

340

341

/**

342

* Prints the schema to console

343

*/

344

def printSchema(): Unit

345

```

346

347

**Usage Examples:**

348

349

```scala

350

// Get schema information

351

val schema = table.getSchema

352

val fieldNames = schema.getFieldNames

353

val fieldTypes = schema.getFieldTypes

354

355

// Print schema for debugging

356

table.printSchema()

357

```

358

359

### Output Operations

360

361

Write table results to registered sinks or external systems.

362

363

```scala { .api }

364

/**

365

* Writes table to a table sink

366

* @param sink Table sink for output

367

*/

368

def writeToSink[T](sink: TableSink[T]): Unit

369

370

/**

371

* Inserts table data into a registered sink

372

* @param tableName Name of registered table sink

373

*/

374

def insertInto(tableName: String): Unit

375

```

376

377

**Usage Examples:**

378

379

```scala

380

// Write to custom sink

381

val csvSink = new CsvTableSink("/path/to/output.csv")

382

table.writeToSink(csvSink)

383

384

// Insert into registered sink

385

tEnv.registerTableSink("OutputTable", fieldNames, fieldTypes, csvSink)

386

table.insertInto("OutputTable")

387

```

388

389

### Result Limiting Operations

390

391

Control the number of rows returned from sorted results.

392

393

```scala { .api }

394

/**

395

* Limits a sorted result from an offset position

396

* @param offset Number of records to skip

397

* @returns Table with offset applied

398

*/

399

def offset(offset: Int): Table

400

401

/**

402

* Limits a sorted result to the first n rows

403

* @param fetch Number of records to return (must be >= 0)

404

* @returns Table limited to first n rows

405

*/

406

def fetch(fetch: Int): Table

407

408

/**

409

* Limits a sorted result (deprecated - use offset/fetch instead)

410

* @param offset Number of records to skip

411

* @returns Table with limit applied

412

*/

413

@deprecated("Use offset() and fetch() instead")

414

def limit(offset: Int): Table

415

416

/**

417

* Limits a sorted result with offset and fetch (deprecated)

418

* @param offset Number of records to skip

419

* @param fetch Number of records to return

420

* @returns Table with limit applied

421

*/

422

@deprecated("Use offset() and fetch() instead")

423

def limit(offset: Int, fetch: Int): Table

424

```

425

426

**Usage Examples:**

427

428

```scala

429

// Skip first 5 rows

430

val offsetResult = table.orderBy('name).offset(5)

431

432

// Return first 10 rows

433

val fetchResult = table.orderBy('salary.desc).fetch(10)

434

435

// Skip 5 rows and return next 10

436

val combined = table.orderBy('name).offset(5).fetch(10)

437

438

// Deprecated limit usage (still available)

439

val limited = table.orderBy('name).limit(5, 10)

440

```

441

442

### Internal API Access

443

444

Access internal table representations for advanced use cases.

445

446

```scala { .api }

447

/**

448

* Returns the RelNode representation of this table

449

* @returns RelNode for advanced query operations

450

*/

451

def getRelNode: RelNode

452

453

/**

454

* Access to the relation builder for advanced operations

455

* @returns FlinkRelBuilder instance

456

*/

457

def relBuilder: FlinkRelBuilder

458

```

459

460

**Usage Examples:**

461

462

```scala

463

// Access internal RelNode (advanced usage)

464

val relNode = table.getRelNode

465

466

// Access relation builder for complex operations

467

val builder = table.relBuilder

468

```

469

470

## Window Operations

471

472

Apply windowing operations for time-based aggregations.

473

474

```scala { .api }

475

/**

476

* Applies time or count-based windows to the table

477

* @param window Window specification (Tumble, Slide, or Session)

478

* @returns WindowedTable for window-based operations

479

*/

480

def window(window: Window): WindowedTable

481

482

/**

483

* Applies over-windows for row-based calculations

484

* @param overWindows Over-window specifications

485

* @returns OverWindowedTable for over-window operations

486

*/

487

def window(overWindows: OverWindow*): OverWindowedTable

488

```

489

490

**Usage Examples:**

491

492

```scala

493

import org.apache.flink.table.api.Tumble

494

495

// Tumbling window

496

val windowedTable = table

497

.window(Tumble over 10.minutes on 'rowtime as 'w)

498

.groupBy('w, 'department)

499

.select('department, 'w.start, 'w.end, 'salary.avg)

500

501

// Over window

502

val overResult = table

503

.window(Over partitionBy 'department orderBy 'salary.desc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w)

504

.select('name, 'salary, 'salary.sum over 'w)

505

```

506

507

## Types

508

509

```scala { .api }

510

class Table {

511

def tableEnv: TableEnvironment

512

}

513

514

class GroupedTable

515

class WindowedTable

516

class WindowGroupedTable

517

class OverWindowedTable

518

519

class TableSchema {

520

def getFieldNames: Array[String]

521

def getFieldTypes: Array[TypeInformation[_]]

522

def getFieldCount: Int

523

}

524

```