or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-system.mddatastream-integration.mdindex.mdsql-execution.mdtable-environment.mdtable-operations.mdtype-system.mduser-defined-functions.md

table-operations.mddocs/

0

# Table Operations

1

2

Table is the core abstraction of the Table API, representing a pipeline of data transformations. It provides fluent API methods for selection, filtering, aggregation, joins, and window operations on both bounded and unbounded data streams.

3

4

## Capabilities

5

6

### Basic Transformations

7

8

Core operations for selecting fields, filtering data, and basic data manipulation.

9

10

```java { .api }

11

/**

12

* Selects the given fields from the table

13

* @param fields Expressions defining the selected fields

14

* @return New Table with selected fields

15

*/

16

Table select(Expression... fields);

17

18

/**

19

* Filters rows based on the given predicate

20

* @param predicate Boolean expression for row filtering

21

* @return New Table with filtered rows

22

*/

23

Table filter(Expression predicate);

24

25

/**

26

* Alias for filter() - filters rows based on the given predicate

27

* @param predicate Boolean expression for row filtering

28

* @return New Table with filtered rows

29

*/

30

Table where(Expression predicate);

31

32

/**

33

* Renames fields of the table

34

* @param fields Expressions defining new field names

35

* @return New Table with renamed fields

36

*/

37

Table as(Expression... fields);

38

39

/**

40

* Adds additional columns to the table

41

* @param fields Expressions defining new columns

42

* @return New Table with additional columns

43

*/

44

Table addColumns(Expression... fields);

45

46

/**

47

* Adds columns or replaces existing ones

48

* @param fields Expressions defining columns to add or replace

49

* @return New Table with added/replaced columns

50

*/

51

Table addOrReplaceColumns(Expression... fields);

52

53

/**

54

* Drops columns from the table

55

* @param fields Expressions defining columns to drop

56

* @return New Table without the specified columns

57

*/

58

Table dropColumns(Expression... fields);

59

60

/**

61

* Returns distinct rows from the table

62

* @return New Table with distinct rows

63

*/

64

Table distinct();

65

```

66

67

**Usage Examples:**

68

69

```java

70

import static org.apache.flink.table.api.Expressions.*;

71

72

// Basic selection and filtering

73

Table customers = tableEnv.from("customers");

74

Table result = customers

75

.select($("customer_id"), $("name"), $("email"))

76

.filter($("age").isGreater(18))

77

.where($("active").isEqual(true));

78

79

// Column manipulation

80

Table enhanced = customers

81

.addColumns($("name").upperCase().as("name_upper"))

82

.dropColumns($("internal_notes"))

83

.as("customer_id", "full_name", "email_address", "age", "is_active", "name_upper");

84

85

// Distinct records

86

Table uniqueCategories = products

87

.select($("category"))

88

.distinct();

89

```

90

91

### Grouping and Aggregation

92

93

Operations for grouping data and computing aggregations.

94

95

```java { .api }

96

/**

97

* Groups the table by the given expressions

98

* @param fields Expressions defining grouping keys

99

* @return GroupedTable for applying aggregations

100

*/

101

GroupedTable groupBy(Expression... fields);

102

103

/**

104

* Applies aggregation functions to grouped or ungrouped table

105

* @param aggregateExpression Aggregate expression (sum, count, avg, etc.)

106

* @param moreAggregateExpressions Additional aggregate expressions

107

* @return AggregatedTable with aggregation results

108

*/

109

AggregatedTable aggregate(Expression aggregateExpression, Expression... moreAggregateExpressions);

110

111

/**

112

* Applies flat aggregation using a table aggregate function

113

* @param tableAggFunction Table aggregate function

114

* @return FlatAggregateTable with flattened results

115

*/

116

FlatAggregateTable flatAggregate(Expression tableAggFunction);

117

```

118

119

**Usage Examples:**

120

121

```java

122

// Basic grouping and aggregation

123

Table sales = tableEnv.from("sales");

124

Table salesByRegion = sales

125

.groupBy($("region"))

126

.select($("region"), $("amount").sum().as("total_sales"));

127

128

// Multiple aggregations

129

Table summary = sales

130

.groupBy($("region"), $("product_category"))

131

.select(

132

$("region"),

133

$("product_category"),

134

$("amount").sum().as("total_sales"),

135

$("amount").avg().as("avg_sale"),

136

$("order_id").count().as("num_orders")

137

);

138

139

// Aggregate without grouping

140

Table overallStats = sales

141

.select(

142

$("amount").sum().as("total"),

143

$("amount").avg().as("average"),

144

$("order_id").count().as("count")

145

);

146

```

147

148

### Join Operations

149

150

Various types of joins for combining data from multiple tables.

151

152

```java { .api }

153

/**

154

* Inner join with another table

155

* @param right Right table to join with

156

* @return New Table with joined results

157

*/

158

Table join(Table right);

159

160

/**

161

* Inner join with explicit join condition

162

* @param right Right table to join with

163

* @param joinPredicate Boolean expression defining join condition

164

* @return New Table with joined results

165

*/

166

Table join(Table right, Expression joinPredicate);

167

168

/**

169

* Left outer join with another table

170

* @param right Right table to join with

171

* @return New Table with left outer join results

172

*/

173

Table leftOuterJoin(Table right);

174

175

/**

176

* Left outer join with explicit join condition

177

* @param right Right table to join with

178

* @param joinPredicate Boolean expression defining join condition

179

* @return New Table with left outer join results

180

*/

181

Table leftOuterJoin(Table right, Expression joinPredicate);

182

183

/**

184

* Right outer join with another table

185

* @param right Right table to join with

186

* @return New Table with right outer join results

187

*/

188

Table rightOuterJoin(Table right);

189

190

/**

191

* Right outer join with explicit join condition

192

* @param right Right table to join with

193

* @param joinPredicate Boolean expression defining join condition

194

* @return New Table with right outer join results

195

*/

196

Table rightOuterJoin(Table right, Expression joinPredicate);

197

198

/**

199

* Full outer join with another table

200

* @param right Right table to join with

201

* @return New Table with full outer join results

202

*/

203

Table fullOuterJoin(Table right);

204

205

/**

206

* Full outer join with explicit join condition

207

* @param right Right table to join with

208

* @param joinPredicate Boolean expression defining join condition

209

* @return New Table with full outer join results

210

*/

211

Table fullOuterJoin(Table right, Expression joinPredicate);

212

```

213

214

**Usage Examples:**

215

216

```java

217

Table customers = tableEnv.from("customers");

218

Table orders = tableEnv.from("orders");

219

220

// Inner join with implicit condition (requires common column names)

221

Table customerOrders = customers.join(orders);

222

223

// Inner join with explicit condition

224

Table explicitJoin = customers

225

.join(orders, $("customer_id").isEqual($("cust_id")))

226

.select($("name"), $("order_id"), $("amount"));

227

228

// Left outer join to include all customers

229

Table allCustomers = customers

230

.leftOuterJoin(orders, $("customer_id").isEqual($("cust_id")))

231

.select($("name"), $("order_id").isNull().as("no_orders"));

232

233

// Complex join with multiple conditions

234

Table complexJoin = customers

235

.join(orders,

236

$("customer_id").isEqual($("cust_id"))

237

.and($("status").isEqual("active")))

238

.select($("name"), $("order_date"), $("total"));

239

```

240

241

### Window Operations

242

243

Time-based window operations for streaming data processing.

244

245

```java { .api }

246

/**

247

* Groups records into windows based on time attributes

248

* @param window Window specification (tumbling, sliding, session)

249

* @return WindowGroupedTable for window-based aggregations

250

*/

251

WindowGroupedTable window(GroupWindow window);

252

253

/**

254

* Applies over window aggregations

255

* @param overWindows Over window specifications

256

* @return New Table with over window results

257

*/

258

Table select(Expression... fields);

259

```

260

261

**Usage Examples:**

262

263

```java

264

// Tumbling window aggregation

265

Table events = tableEnv.from("events");

266

Table windowedStats = events

267

.window(Tumble.over(lit(5).minutes()).on($("event_time")).as("w"))

268

.groupBy($("user_id"), $("w"))

269

.select(

270

$("user_id"),

271

$("w").start().as("window_start"),

272

$("w").end().as("window_end"),

273

$("event_count").sum().as("total_events")

274

);

275

276

// Over window for running calculations

277

Table runningTotals = sales

278

.select(

279

$("product_id"),

280

$("sale_time"),

281

$("amount"),

282

$("amount").sum().over(

283

partitionBy($("product_id"))

284

.orderBy($("sale_time"))

285

.rows().unboundedPreceding().toCurrentRow()

286

).as("running_total")

287

);

288

```

289

290

### Ordering and Limiting

291

292

Operations for sorting and limiting result sets.

293

294

```java { .api }

295

/**

296

* Orders the table by the given expressions

297

* @param fields Expressions defining sort order

298

* @return New Table with ordered rows

299

*/

300

Table orderBy(Expression... fields);

301

302

/**

303

* Limits the number of returned rows

304

* @param fetch Maximum number of rows to return

305

* @return New Table with limited rows

306

*/

307

Table limit(int fetch);

308

309

/**

310

* Limits with offset and fetch

311

* @param offset Number of rows to skip

312

* @param fetch Maximum number of rows to return

313

* @return New Table with limited rows

314

*/

315

Table limit(int offset, int fetch);

316

```

317

318

**Usage Examples:**

319

320

```java

321

// Order by multiple fields

322

Table sortedCustomers = customers

323

.orderBy($("registration_date").desc(), $("name").asc());

324

325

// Top N results

326

Table topSellers = sales

327

.groupBy($("seller_id"))

328

.select($("seller_id"), $("amount").sum().as("total_sales"))

329

.orderBy($("total_sales").desc())

330

.limit(10);

331

332

// Pagination

333

Table page2 = products

334

.orderBy($("product_id"))

335

.limit(20, 10); // Skip 20, take 10

336

```

337

338

### Schema Access and Metadata

339

340

Methods for accessing table schema and metadata information.

341

342

```java { .api }

343

/**

344

* Gets the resolved schema of this table

345

* @return ResolvedSchema containing column information

346

*/

347

ResolvedSchema getResolvedSchema();

348

349

/**

350

* Gets the query operation that defines this table

351

* @return QueryOperation representing the table pipeline

352

*/

353

QueryOperation getQueryOperation();

354

```

355

356

**Usage Examples:**

357

358

```java

359

Table myTable = tableEnv.from("products");

360

361

// Access schema information

362

ResolvedSchema schema = myTable.getResolvedSchema();

363

List<Column> columns = schema.getColumns();

364

for (Column column : columns) {

365

System.out.println(column.getName() + ": " + column.getDataType());

366

}

367

368

// Check column existence

369

boolean hasCategory = schema.getColumn("category").isPresent();

370

```

371

372

### Execution and Explanation

373

374

Methods for executing table operations and examining execution plans.

375

376

```java { .api }

377

/**

378

* Executes the table and returns results

379

* @return TableResult containing execution information and data

380

*/

381

TableResult execute();

382

383

/**

384

* Returns the execution plan as a string

385

* @return String representation of the execution plan

386

*/

387

String explain();

388

389

/**

390

* Returns detailed execution plan with specified format

391

* @param format Format for the explanation (TEXT or JSON)

392

* @param details Additional details to include in the plan

393

* @return String representation of the detailed execution plan

394

*/

395

String explain(ExplainFormat format, ExplainDetail... details);

396

397

/**

398

* Inserts the table contents into a target table

399

* @param tablePath Target table path

400

* @return TableResult with execution information

401

*/

402

TableResult insertInto(String tablePath);

403

```

404

405

**Usage Examples:**

406

407

```java

408

Table result = customers

409

.filter($("age").isGreater(25))

410

.select($("name"), $("email"));

411

412

// Examine execution plan

413

System.out.println(result.explain());

414

415

// Detailed plan with JSON format

416

String detailedPlan = result.explain(

417

ExplainFormat.JSON,

418

ExplainDetail.CHANGELOG_MODE,

419

ExplainDetail.ESTIMATED_COST

420

);

421

422

// Execute and process results

423

TableResult tableResult = result.execute();

424

try (CloseableIterator<Row> iterator = tableResult.collect()) {

425

while (iterator.hasNext()) {

426

Row row = iterator.next();

427

System.out.println(row);

428

}

429

}

430

431

// Insert into target table

432

result.insertInto("customer_summary").execute();

433

```

434

435

### Set Operations

436

437

Operations for combining tables using set theory operations.

438

439

```java { .api }

440

/**

441

* Returns the union of this table and the given table

442

* @param right The table to union with

443

* @return New Table containing union of both tables (with duplicates removed)

444

*/

445

Table union(Table right);

446

447

/**

448

* Returns the union of this table and the given table including duplicates

449

* @param right The table to union with

450

* @return New Table containing union of both tables (with duplicates)

451

*/

452

Table unionAll(Table right);

453

454

/**

455

* Returns the intersection of this table and the given table

456

* @param right The table to intersect with

457

* @return New Table containing only rows present in both tables

458

*/

459

Table intersect(Table right);

460

461

/**

462

* Returns the intersection of this table and the given table including duplicates

463

* @param right The table to intersect with

464

* @return New Table containing intersection with duplicates

465

*/

466

Table intersectAll(Table right);

467

468

/**

469

* Returns the minus operation (difference) of this table and the given table

470

* @param right The table to subtract from this table

471

* @return New Table containing rows in this table but not in the right table

472

*/

473

Table minus(Table right);

474

475

/**

476

* Returns the minus operation including duplicates

477

* @param right The table to subtract from this table

478

* @return New Table containing difference with duplicates

479

*/

480

Table minusAll(Table right);

481

```

482

483

**Usage Examples:**

484

485

```java

486

Table europeanCustomers = tableEnv.from("customers_europe");

487

Table americanCustomers = tableEnv.from("customers_america");

488

489

// Union all customers

490

Table allCustomers = europeanCustomers.unionAll(americanCustomers);

491

492

// Find common customer IDs between regions (for validation)

493

Table commonIds = europeanCustomers

494

.select($("customer_id"))

495

.intersect(americanCustomers.select($("customer_id")));

496

497

// Find customers only in Europe

498

Table europeOnly = europeanCustomers

499

.select($("customer_id"))

500

.minus(americanCustomers.select($("customer_id")));

501

```

502

503

### Lateral Join Operations

504

505

Join operations with table-valued functions for dynamic table expansion.

506

507

```java { .api }

508

/**

509

* Performs a lateral join with a table function

510

* @param tableFunctionCall Expression calling a table function

511

* @return New Table with lateral join results

512

*/

513

Table joinLateral(Expression tableFunctionCall);

514

515

/**

516

* Performs a lateral join with a table function and join condition

517

* @param tableFunctionCall Expression calling a table function

518

* @param joinPredicate Join condition expression

519

* @return New Table with lateral join results

520

*/

521

Table joinLateral(Expression tableFunctionCall, Expression joinPredicate);

522

523

/**

524

* Performs a left outer lateral join with a table function

525

* @param tableFunctionCall Expression calling a table function

526

* @return New Table with left outer lateral join results

527

*/

528

Table leftOuterJoinLateral(Expression tableFunctionCall);

529

530

/**

531

* Performs a left outer lateral join with a table function and join condition

532

* @param tableFunctionCall Expression calling a table function

533

* @param joinPredicate Join condition expression

534

* @return New Table with left outer lateral join results

535

*/

536

Table leftOuterJoinLateral(Expression tableFunctionCall, Expression joinPredicate);

537

```

538

539

**Usage Examples:**

540

541

```java

542

// Lateral join with a table function to split comma-separated values

543

Table orders = tableEnv.from("orders");

544

Table expandedOrders = orders

545

.joinLateral(call("split_string", $("item_list"), ",").as("item_id"))

546

.select($("order_id"), $("customer_id"), $("item_id"));

547

548

// Left outer lateral join to handle orders with no items

549

Table allOrders = orders

550

.leftOuterJoinLateral(call("split_string", $("item_list"), ",").as("item_id"))

551

.select($("order_id"), $("customer_id"), $("item_id"));

552

```

553

554

### Function Operations

555

556

Operations using scalar and table functions for data transformation.

557

558

```java { .api }

559

/**

560

* Applies a scalar function to each row

561

* @param mapFunction Scalar function expression

562

* @return New Table with function results

563

*/

564

Table map(Expression mapFunction);

565

566

/**

567

* Applies a table function that can produce multiple rows per input row

568

* @param tableFunction Table function expression

569

* @return New Table with flattened results

570

*/

571

Table flatMap(Expression tableFunction);

572

```

573

574

**Usage Examples:**

575

576

```java

577

Table events = tableEnv.from("events");

578

579

// Map operation to transform each row

580

Table transformedEvents = events

581

.map(call("parse_json", $("json_data")).as("parsed_data"))

582

.select($("event_id"), $("parsed_data"));

583

584

// FlatMap operation to explode arrays

585

Table expandedEvents = events

586

.flatMap(call("explode_array", $("tag_array")).as("tag"))

587

.select($("event_id"), $("event_time"), $("tag"));

588

```

589

590

### Temporal Operations

591

592

Operations for creating temporal table functions from tables.

593

594

```java { .api }

595

/**

596

* Creates a temporal table function from this table

597

* @param timeAttribute Expression identifying the time attribute

598

* @param primaryKey Expression identifying the primary key

599

* @return TemporalTableFunction for temporal joins

600

*/

601

TemporalTableFunction createTemporalTableFunction(Expression timeAttribute, Expression primaryKey);

602

```

603

604

**Usage Examples:**

605

606

```java

607

Table exchangeRates = tableEnv.from("exchange_rates");

608

609

// Create temporal table function for exchange rates

610

TemporalTableFunction ratesFunction = exchangeRates

611

.createTemporalTableFunction($("rate_time"), $("currency"));

612

613

// Register for use in temporal joins

614

tableEnv.createTemporarySystemFunction("rates", ratesFunction);

615

616

// Use in temporal join

617

Table orders = tableEnv.from("orders");

618

Table ordersWithRates = orders

619

.joinLateral(call("rates", $("order_time")).as("rate_currency", "exchange_rate"))

620

.select($("order_id"), $("amount"), $("exchange_rate"),

621

$("amount").times($("exchange_rate")).as("amount_usd"));

622

```

623

624

### Column Manipulation

625

626

Additional operations for advanced column manipulation.

627

628

```java { .api }

629

/**

630

* Renames columns of the table

631

* @param fields Expressions defining new column names

632

* @return New Table with renamed columns

633

*/

634

Table renameColumns(Expression... fields);

635

636

/**

637

* Skips the first n rows

638

* @param offset Number of rows to skip

639

* @return New Table with offset applied

640

*/

641

Table offset(int offset);

642

643

/**

644

* Takes the first n rows after any offset

645

* @param fetch Number of rows to take

646

* @return New Table with fetch applied

647

*/

648

Table fetch(int fetch);

649

650

/**

651

* Creates an alias name for the table with optional field names

652

* @param field First field name

653

* @param fields Additional field names

654

* @return New Table with alias applied

655

*/

656

Table as(String field, String... fields);

657

```

658

659

**Usage Examples:**

660

661

```java

662

Table products = tableEnv.from("products");

663

664

// Rename columns

665

Table renamedProducts = products

666

.renameColumns($("prod_id").as("product_id"), $("prod_name").as("product_name"));

667

668

// Pagination using offset and fetch

669

Table page3Products = products

670

.orderBy($("product_id"))

671

.offset(20)

672

.fetch(10);

673

674

// Alias table and columns

675

Table aliasedProducts = products.as("p", "id", "name", "price", "category");

676

```

677

678

### Insert Operations

679

680

Operations for inserting table data into target tables.

681

682

```java { .api }

683

/**

684

* Creates a pipeline to insert table data into the specified table

685

* @param tablePath Target table path

686

* @return TablePipeline for further configuration

687

*/

688

TablePipeline insertInto(String tablePath);

689

690

/**

691

* Creates a pipeline to insert table data with overwrite option

692

* @param tablePath Target table path

693

* @param overwrite Whether to overwrite existing data

694

* @return TablePipeline for further configuration

695

*/

696

TablePipeline insertInto(String tablePath, boolean overwrite);

697

698

/**

699

* Creates a pipeline to insert table data using table descriptor

700

* @param descriptor Table descriptor defining the target

701

* @return TablePipeline for further configuration

702

*/

703

TablePipeline insertInto(TableDescriptor descriptor);

704

705

/**

706

* Creates a pipeline to insert table data using table descriptor with overwrite

707

* @param descriptor Table descriptor defining the target

708

* @param overwrite Whether to overwrite existing data

709

* @return TablePipeline for further configuration

710

*/

711

TablePipeline insertInto(TableDescriptor descriptor, boolean overwrite);

712

713

/**

714

* Directly executes insert into the specified table

715

* @param tablePath Target table path

716

* @return TableResult with execution information

717

*/

718

TableResult executeInsert(String tablePath);

719

720

/**

721

* Directly executes insert with overwrite option

722

* @param tablePath Target table path

723

* @param overwrite Whether to overwrite existing data

724

* @return TableResult with execution information

725

*/

726

TableResult executeInsert(String tablePath, boolean overwrite);

727

728

/**

729

* Directly executes insert using table descriptor

730

* @param descriptor Table descriptor defining the target

731

* @return TableResult with execution information

732

*/

733

TableResult executeInsert(TableDescriptor descriptor);

734

735

/**

736

* Directly executes insert using table descriptor with overwrite

737

* @param descriptor Table descriptor defining the target

738

* @param overwrite Whether to overwrite existing data

739

* @return TableResult with execution information

740

*/

741

TableResult executeInsert(TableDescriptor descriptor, boolean overwrite);

742

```

743

744

**Usage Examples:**

745

746

```java

747

Table processedOrders = orders

748

.filter($("status").isEqual("processed"))

749

.select($("order_id"), $("customer_id"), $("total_amount"));

750

751

// Direct insert execution

752

TableResult result = processedOrders.executeInsert("processed_orders");

753

754

// Pipeline-based insert for more control

755

TablePipeline pipeline = processedOrders.insertInto("processed_orders", true);

756

TableResult pipelineResult = pipeline.execute();

757

758

// Insert using table descriptor

759

TableDescriptor targetDescriptor = TableDescriptor.forConnector("kafka")

760

.schema(Schema.newBuilder()

761

.column("order_id", DataTypes.BIGINT())

762

.column("customer_id", DataTypes.BIGINT())

763

.column("total_amount", DataTypes.DECIMAL(10, 2))

764

.build())

765

.option("topic", "processed-orders")

766

.build();

767

768

TableResult descriptorResult = processedOrders.executeInsert(targetDescriptor);

769

```

770

771

## Types

772

773

### Grouped Tables

774

775

```java { .api }

776

interface GroupedTable {

777

Table select(Expression... fields);

778

AggregatedTable aggregate(Expression aggregateExpression, Expression... moreAggregateExpressions);

779

FlatAggregateTable flatAggregate(Expression tableAggFunction);

780

}

781

782

interface WindowGroupedTable extends GroupedTable {

783

// Inherits all GroupedTable methods

784

}

785

```

786

787

### Aggregated Tables

788

789

```java { .api }

790

interface AggregatedTable {

791

Table select(Expression... fields);

792

AggregatedTable aggregate(Expression aggregateExpression, Expression... moreAggregateExpressions);

793

}

794

795

interface FlatAggregateTable {

796

Table select(Expression... fields);

797

FlatAggregateTable flatAggregate(Expression tableAggFunction);

798

}

799

```

800

801

### Schema Information

802

803

```java { .api }

804

class ResolvedSchema {

805

List<Column> getColumns();

806

Optional<Column> getColumn(String name);

807

Optional<Column> getColumn(int index);

808

List<String> getColumnNames();

809

List<DataType> getColumnDataTypes();

810

Optional<UniqueConstraint> getPrimaryKey();

811

List<WatermarkSpec> getWatermarkSpecs();

812

}

813

814

class Column {

815

String getName();

816

DataType getDataType();

817

String getComment();

818

boolean isPhysical();

819

boolean isComputed();

820

boolean isMetadata();

821

}

822

```