or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-table

Apache Flink's Table API and SQL module for unified stream and batch processing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-table@1.20.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-table@1.20.0

0

# Apache Flink Table API

1

2

Apache Flink's Table API and SQL module provides unified stream and batch processing capabilities through both Table API and SQL interfaces. It offers language-integrated query APIs for Java, Scala, and Python with intuitive composition of queries using relational operators such as selection, filter, and join.

3

4

The Table API is built around the core concept of Tables - pipeline descriptions that can be optimized and executed on either bounded or unbounded data streams. This enables both real-time streaming analytics and traditional batch processing with the same unified API.

5

6

## Package Information

7

8

- **Package Name**: org.apache.flink:flink-table

9

- **Package Type**: maven

10

- **Language**: Java (with Scala support)

11

- **Version**: 1.20.2

12

- **Installation**: Add to Maven dependencies:

13

```xml

14

<dependency>

15

<groupId>org.apache.flink</groupId>

16

<artifactId>flink-table-api-java</artifactId>

17

<version>1.20.2</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```java

24

import org.apache.flink.table.api.TableEnvironment;

25

import org.apache.flink.table.api.EnvironmentSettings;

26

import org.apache.flink.table.api.Table;

27

import org.apache.flink.table.api.DataTypes;

28

import org.apache.flink.table.api.Schema;

29

```

30

31

For DataStream integration:

32

33

```java

34

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

35

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

36

```

37

38

## Basic Usage

39

40

```java

41

import org.apache.flink.table.api.Table;

42

import org.apache.flink.table.api.TableEnvironment;

43

import org.apache.flink.table.api.EnvironmentSettings;

44

import static org.apache.flink.table.api.Expressions.*;

45

46

// Create table environment

47

EnvironmentSettings settings = EnvironmentSettings

48

.newInstance()

49

.inStreamingMode()

50

.build();

51

TableEnvironment tableEnv = TableEnvironment.create(settings);

52

53

// Execute SQL queries

54

tableEnv.executeSql("CREATE TABLE source_table (id INT, name STRING) WITH (...)");

55

Table result = tableEnv.sqlQuery("SELECT id, UPPER(name) as name FROM source_table WHERE id > 10");

56

57

// Table API operations

58

Table filtered = result

59

.select($("id"), $("name"))

60

.where($("id").isGreater(5));

61

62

// Execute and collect results

63

filtered.execute().print();

64

```

65

66

## Architecture

67

68

Apache Flink Table API is built around several key architectural components:

69

70

- **Table Environment**: Central context for creating Table and SQL API programs, managing catalogs and configuration

71

- **Table Abstraction**: Core abstraction representing data transformation pipelines (not actual data)

72

- **Type System**: Comprehensive data type system supporting primitives, complex types, and user-defined types

73

- **Expression API**: Type-safe expression system for table transformations and computations

74

- **Catalog System**: Pluggable metadata management for tables, functions, and databases

75

- **Connector Framework**: Extensible source/sink architecture for data integration

76

- **Function Framework**: Support for scalar, table, and aggregate user-defined functions

77

- **Query Planning**: Advanced query optimization and execution planning with Calcite integration

78

79

## Capabilities

80

81

### Table Environment

82

83

Core entry point and central context for creating Table and SQL API programs. Handles catalog management, SQL execution, and table operations.

84

85

```java { .api }

86

interface TableEnvironment {

87

// Factory methods

88

static TableEnvironment create(EnvironmentSettings settings);

89

static TableEnvironment create(Configuration configuration);

90

91

// SQL execution

92

Table sqlQuery(String query);

93

TableResult executeSql(String statement);

94

95

// Table creation and access

96

Table from(String path);

97

Table from(TableDescriptor descriptor);

98

void createTemporaryTable(String path, TableDescriptor descriptor);

99

void createTable(String path, TableDescriptor descriptor);

100

101

// Catalog and database management

102

void useCatalog(String catalogName);

103

void useDatabase(String databaseName);

104

String[] listTables();

105

String[] listCatalogs();

106

String[] listDatabases();

107

}

108

```

109

110

[Table Environment](./table-environment.md)

111

112

### Table Operations

113

114

Core table abstraction providing fluent API for data transformations, joins, aggregations, and window operations.

115

116

```java { .api }

117

interface Table extends Explainable<Table>, Executable {

118

// Schema access

119

ResolvedSchema getResolvedSchema();

120

121

// Basic transformations

122

Table select(Expression... fields);

123

Table filter(Expression predicate);

124

Table where(Expression predicate);

125

Table as(Expression... fields);

126

Table as(String field, String... fields);

127

Table distinct();

128

129

// Column operations

130

Table addColumns(Expression... fields);

131

Table addOrReplaceColumns(Expression... fields);

132

Table renameColumns(Expression... fields);

133

Table dropColumns(Expression... fields);

134

135

// Aggregations

136

GroupedTable groupBy(Expression... fields);

137

AggregatedTable aggregate(Expression aggregateFunction);

138

AggregatedTable flatAggregate(Expression tableAggregateFunction);

139

140

// Joins

141

Table join(Table right);

142

Table join(Table right, Expression joinPredicate);

143

Table leftOuterJoin(Table right);

144

Table leftOuterJoin(Table right, Expression joinPredicate);

145

Table rightOuterJoin(Table right);

146

Table rightOuterJoin(Table right, Expression joinPredicate);

147

Table fullOuterJoin(Table right);

148

Table fullOuterJoin(Table right, Expression joinPredicate);

149

150

// Lateral joins

151

Table joinLateral(Expression tableFunctionCall);

152

Table joinLateral(Expression tableFunctionCall, Expression joinPredicate);

153

Table leftOuterJoinLateral(Expression tableFunctionCall);

154

Table leftOuterJoinLateral(Expression tableFunctionCall, Expression joinPredicate);

155

156

// Set operations

157

Table union(Table right);

158

Table unionAll(Table right);

159

Table intersect(Table right);

160

Table intersectAll(Table right);

161

Table minus(Table right);

162

Table minusAll(Table right);

163

164

// Function operations

165

Table map(Expression mapFunction);

166

Table flatMap(Expression tableFunction);

167

168

// Ordering and limiting

169

Table orderBy(Expression... fields);

170

Table offset(int offset);

171

Table fetch(int fetch);

172

Table limit(int fetch);

173

Table limit(int offset, int fetch);

174

175

// Window operations

176

GroupWindowedTable window(GroupWindow groupWindow);

177

OverWindowedTable window(OverWindow... overWindows);

178

179

// Temporal operations

180

TemporalTableFunction createTemporalTableFunction(Expression timeAttribute, Expression primaryKey);

181

182

// Insert operations

183

TablePipeline insertInto(String tablePath);

184

TablePipeline insertInto(String tablePath, boolean overwrite);

185

TablePipeline insertInto(TableDescriptor descriptor);

186

TablePipeline insertInto(TableDescriptor descriptor, boolean overwrite);

187

TableResult executeInsert(String tablePath);

188

TableResult executeInsert(String tablePath, boolean overwrite);

189

TableResult executeInsert(TableDescriptor descriptor);

190

TableResult executeInsert(TableDescriptor descriptor, boolean overwrite);

191

192

// Execution

193

TableResult execute();

194

String explain();

195

}

196

```

197

198

[Table Operations](./table-operations.md)

199

200

### DataStream Integration

201

202

Integration layer between Table API and Flink's DataStream API, enabling conversion between tables and data streams for complex pipelines.

203

204

```java { .api }

205

interface StreamTableEnvironment extends TableEnvironment {

206

// Factory methods

207

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);

208

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);

209

210

// DataStream conversion

211

<T> Table fromDataStream(DataStream<T> dataStream);

212

<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);

213

DataStream<Row> toDataStream(Table table);

214

<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);

215

<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);

216

217

// Changelog conversion

218

Table fromChangelogStream(DataStream<Row> dataStream);

219

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);

220

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);

221

DataStream<Row> toChangelogStream(Table table);

222

DataStream<Row> toChangelogStream(Table table, Schema targetSchema);

223

DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);

224

225

// Temporary view creation

226

<T> void createTemporaryView(String path, DataStream<T> dataStream);

227

<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);

228

229

// Statement set creation

230

StreamStatementSet createStatementSet();

231

}

232

```

233

234

[DataStream Integration](./datastream-integration.md)

235

236

### Type System

237

238

Comprehensive type system supporting primitive types, complex nested structures, and user-defined types with full serialization support.

239

240

```java { .api }

241

class DataTypes {

242

// Primitive types

243

static DataType BOOLEAN();

244

static DataType TINYINT();

245

static DataType SMALLINT();

246

static DataType INT();

247

static DataType BIGINT();

248

static DataType FLOAT();

249

static DataType DOUBLE();

250

static DataType DECIMAL(int precision, int scale);

251

252

// String and binary types

253

static DataType CHAR(int length);

254

static DataType VARCHAR(int length);

255

static DataType STRING();

256

static DataType BINARY(int length);

257

static DataType VARBINARY(int length);

258

static DataType BYTES();

259

260

// Temporal types

261

static DataType DATE();

262

static DataType TIME();

263

static DataType TIMESTAMP();

264

static DataType TIMESTAMP_WITH_TIME_ZONE();

265

static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE();

266

267

// Complex types

268

static DataType ARRAY(DataType elementType);

269

static DataType MAP(DataType keyType, DataType valueType);

270

static DataType ROW(Field... fields);

271

}

272

273

class Schema {

274

static Builder newBuilder();

275

Column[] getColumns();

276

List<UniqueConstraint> getPrimaryKey();

277

List<WatermarkSpec> getWatermarkSpecs();

278

}

279

```

280

281

[Type System](./type-system.md)

282

283

### User-Defined Functions

284

285

Framework for creating custom scalar, table, and aggregate functions with support for multiple programming languages.

286

287

```java { .api }

288

abstract class UserDefinedFunction implements FunctionDefinition {

289

// Context and configuration access

290

FunctionContext getFunctionContext();

291

}

292

293

abstract class ScalarFunction extends UserDefinedFunction {

294

// Implementation provided by user

295

// public ReturnType eval(InputType1 input1, InputType2 input2, ...);

296

}

297

298

abstract class TableFunction<T> extends UserDefinedFunction {

299

// Emit results using collect()

300

// public void eval(InputType1 input1, InputType2 input2, ...);

301

protected void collect(T result);

302

}

303

304

abstract class AggregateFunction<T, ACC> extends ImperativeAggregateFunction<T, ACC> {

305

public abstract ACC createAccumulator();

306

public abstract T getValue(ACC accumulator);

307

// public void accumulate(ACC accumulator, InputType1 input1, InputType2 input2, ...);

308

}

309

```

310

311

[User-Defined Functions](./user-defined-functions.md)

312

313

### SQL Execution

314

315

Direct SQL query execution with support for DDL, DML, and query operations, including statement batching and result handling.

316

317

```java { .api }

318

interface TableEnvironment {

319

// SQL execution

320

Table sqlQuery(String query);

321

TableResult executeSql(String statement);

322

StatementSet createStatementSet();

323

324

// Function registration

325

void createTemporarySystemFunction(String name, UserDefinedFunction function);

326

void createFunction(String path, Class<? extends UserDefinedFunction> functionClass);

327

}

328

329

interface TableResult {

330

ResultKind getResultKind();

331

ResolvedSchema getResolvedSchema();

332

CloseableIterator<Row> collect();

333

void print();

334

}

335

336

interface StatementSet {

337

StatementSet addInsertSql(String statement);

338

StatementSet addInsert(String targetPath, Table table);

339

TableResult execute();

340

}

341

```

342

343

[SQL Execution](./sql-execution.md)

344

345

### Catalog System

346

347

Pluggable metadata management system for tables, functions, databases, and user-defined catalogs with persistent storage support.

348

349

```java { .api }

350

interface Catalog {

351

// Database operations

352

void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists);

353

void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade);

354

List<String> listDatabases();

355

CatalogDatabase getDatabase(String databaseName);

356

357

// Table operations

358

void createTable(ObjectPath tablePath, CatalogTable table, boolean ignoreIfExists);

359

void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);

360

List<String> listTables(String databaseName);

361

CatalogTable getTable(ObjectPath tablePath);

362

363

// Function operations

364

void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists);

365

void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists);

366

List<String> listFunctions(String databaseName);

367

}

368

369

class ObjectIdentifier {

370

static ObjectIdentifier of(String catalogName, String databaseName, String objectName);

371

String getCatalogName();

372

String getDatabaseName();

373

String getObjectName();

374

}

375

```

376

377

[Catalog System](./catalog-system.md)

378

379

## Types

380

381

### Core Data Types

382

383

```java { .api }

384

abstract class DataType {

385

LogicalType getLogicalType();

386

Class<?> getConversionClass();

387

DataType notNull();

388

DataType nullable();

389

DataType bridgedTo(Class<?> newConversionClass);

390

}

391

392

// Primitive wrapper types

393

class AtomicDataType extends DataType { }

394

class CollectionDataType extends DataType { }

395

class FieldsDataType extends DataType { }

396

class KeyValueDataType extends DataType { }

397

```

398

399

### Configuration and Settings

400

401

```java { .api }

402

class EnvironmentSettings {

403

static EnvironmentSettings.Builder newInstance();

404

405

interface Builder {

406

Builder useBlinkPlanner();

407

Builder useAnyPlanner();

408

Builder inStreamingMode();

409

Builder inBatchMode();

410

Builder withConfiguration(Configuration configuration);

411

EnvironmentSettings build();

412

}

413

}

414

415

class TableConfig {

416

Configuration getConfiguration();

417

String getSqlDialect();

418

ZoneId getLocalTimeZone();

419

}

420

```

421

422

### Result and Execution Types

423

424

```java { .api }

425

enum ResultKind {

426

SUCCESS,

427

SUCCESS_WITH_CONTENT

428

}

429

430

enum SqlDialect {

431

DEFAULT,

432

HIVE

433

}

434

435

enum ExplainFormat {

436

TEXT,

437

JSON

438

}

439

440

class Row {

441

Object getField(int pos);

442

Object getField(String name);

443

int getArity();

444

RowKind getKind();

445

}

446

```

447

448

### Exception Types

449

450

```java { .api }

451

class TableException extends RuntimeException { }

452

class TableRuntimeException extends RuntimeException { }

453

class ValidationException extends TableException { }

454

class TableNotExistException extends TableException { }

455

class AmbiguousTableFactoryException extends TableException { }

456

```