or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-system.mddatastream-integration.mdindex.mdsql-execution.mdtable-environment.mdtable-operations.mdtype-system.mduser-defined-functions.md

table-environment.mddocs/

0

# Table Environment

1

2

TableEnvironment is the entry point and central context for creating Table and SQL API programs. It provides a unified interface for both streaming and batch processing, managing catalogs, databases, functions, and configuration.

3

4

## Capabilities

5

6

### Environment Creation

7

8

Creates table environments with specific settings for streaming or batch processing.

9

10

```java { .api }

11

/**

12

* Creates a table environment that is the entry point for Table and SQL API programs

13

* @param settings The environment settings used to instantiate the TableEnvironment

14

* @return TableEnvironment instance

15

*/

16

static TableEnvironment create(EnvironmentSettings settings);

17

18

/**

19

* Creates a table environment with configuration

20

* @param configuration The configuration for the table environment

21

* @return TableEnvironment instance

22

*/

23

static TableEnvironment create(Configuration configuration);

24

```

25

26

**Usage Examples:**

27

28

```java

29

// Streaming environment

30

EnvironmentSettings streamingSettings = EnvironmentSettings

31

.newInstance()

32

.inStreamingMode()

33

.build();

34

TableEnvironment streamingEnv = TableEnvironment.create(streamingSettings);

35

36

// Batch environment

37

EnvironmentSettings batchSettings = EnvironmentSettings

38

.newInstance()

39

.inBatchMode()

40

.build();

41

TableEnvironment batchEnv = TableEnvironment.create(batchSettings);

42

43

// With custom configuration

44

Configuration config = new Configuration();

45

config.setString("table.exec.mini-batch.enabled", "true");

46

TableEnvironment configEnv = TableEnvironment.create(config);

47

```

48

49

### SQL Query Execution

50

51

Executes SQL queries and statements with support for both queries and DDL/DML operations.

52

53

```java { .api }

54

/**

55

* Evaluates a SQL query on registered tables and returns the result as a Table

56

* @param query SQL query string

57

* @return Table representing the query result

58

*/

59

Table sqlQuery(String query);

60

61

/**

62

* Executes a single SQL statement and returns the execution result

63

* @param statement SQL statement (DDL, DML, or query)

64

* @return TableResult containing execution information and data

65

*/

66

TableResult executeSql(String statement);

67

```

68

69

**Usage Examples:**

70

71

```java

72

// Query execution

73

Table result = tableEnv.sqlQuery(

74

"SELECT customer_id, SUM(amount) as total " +

75

"FROM orders " +

76

"WHERE order_date >= '2024-01-01' " +

77

"GROUP BY customer_id"

78

);

79

80

// DDL execution

81

tableEnv.executeSql(

82

"CREATE TABLE orders (" +

83

" order_id BIGINT," +

84

" customer_id BIGINT," +

85

" amount DECIMAL(10,2)," +

86

" order_date DATE" +

87

") WITH (" +

88

" 'connector' = 'kafka'," +

89

" 'topic' = 'orders'" +

90

")"

91

);

92

93

// DML execution

94

TableResult insertResult = tableEnv.executeSql(

95

"INSERT INTO target_table SELECT * FROM source_table WHERE active = true"

96

);

97

```

98

99

### Table Registration and Access

100

101

Manages table creation, registration, and access within the table environment.

102

103

```java { .api }

104

/**

105

* Creates a Table from a registered table path

106

* @param path Table path in catalog.database.table format

107

* @return Table instance for the specified path

108

*/

109

Table from(String path);

110

111

/**

112

* Creates a Table from a table descriptor

113

* @param descriptor TableDescriptor defining the table structure and properties

114

* @return Table instance based on the descriptor

115

*/

116

Table from(TableDescriptor descriptor);

117

118

/**

119

* Creates a temporary table in the current catalog and database

120

* @param path Table path (can be simple name or catalog.database.table)

121

* @param descriptor TableDescriptor defining the table

122

*/

123

void createTemporaryTable(String path, TableDescriptor descriptor);

124

125

/**

126

* Creates a persistent table in the catalog

127

* @param path Table path in catalog.database.table format

128

* @param descriptor TableDescriptor defining the table

129

*/

130

void createTable(String path, TableDescriptor descriptor);

131

132

/**

133

* Creates a temporary view from a Table

134

* @param path View name or path

135

* @param table Table to register as a view

136

*/

137

void createTemporaryView(String path, Table table);

138

```

139

140

**Usage Examples:**

141

142

```java

143

// Access existing table

144

Table orders = tableEnv.from("default_catalog.orders_db.orders");

145

146

// Create table from descriptor

147

TableDescriptor descriptor = TableDescriptor

148

.forConnector("kafka")

149

.schema(Schema.newBuilder()

150

.column("id", DataTypes.BIGINT())

151

.column("name", DataTypes.STRING())

152

.build())

153

.option("topic", "users")

154

.build();

155

156

tableEnv.createTemporaryTable("users", descriptor);

157

Table users = tableEnv.from("users");

158

159

// Create view from existing table

160

Table activeUsers = users.filter($("active").isEqual(true));

161

tableEnv.createTemporaryView("active_users", activeUsers);

162

```

163

164

### Catalog and Database Management

165

166

Manages catalog and database contexts for table operations.

167

168

```java { .api }

169

/**

170

* Sets the current catalog for table operations

171

* @param catalogName Name of the catalog to use

172

*/

173

void useCatalog(String catalogName);

174

175

/**

176

* Sets the current database within the current catalog

177

* @param databaseName Name of the database to use

178

*/

179

void useDatabase(String databaseName);

180

181

/**

182

* Lists all available catalogs

183

* @return Array of catalog names

184

*/

185

String[] listCatalogs();

186

187

/**

188

* Lists all databases in the current catalog

189

* @return Array of database names

190

*/

191

String[] listDatabases();

192

193

/**

194

* Lists all tables in the current database

195

* @return Array of table names

196

*/

197

String[] listTables();

198

199

/**

200

* Lists all user-defined functions in the current database

201

* @return Array of function names

202

*/

203

String[] listUserDefinedFunctions();

204

205

/**

206

* Lists all temporary tables in the current session

207

* @return Array of temporary table names

208

*/

209

String[] listTemporaryTables();

210

211

/**

212

* Lists all temporary views in the current session

213

* @return Array of temporary view names

214

*/

215

String[] listTemporaryViews();

216

```

217

218

**Usage Examples:**

219

220

```java

221

// Catalog and database navigation

222

tableEnv.useCatalog("my_catalog");

223

tableEnv.useDatabase("analytics");

224

225

// List available resources

226

String[] catalogs = tableEnv.listCatalogs();

227

String[] databases = tableEnv.listDatabases();

228

String[] tables = tableEnv.listTables();

229

230

// Current context information

231

String currentCatalog = tableEnv.getCurrentCatalog();

232

String currentDatabase = tableEnv.getCurrentDatabase();

233

```

234

235

### Table Creation from Values

236

237

Creates tables directly from values without external data sources.

238

239

```java { .api }

240

/**

241

* Creates a table from a list of rows with automatic schema inference

242

* @param rows List of Row objects containing the data

243

* @return Table containing the specified data

244

*/

245

Table fromValues(Row... rows);

246

247

/**

248

* Creates a table from a list of rows with automatic schema inference

249

* @param rows Collection of Row objects containing the data

250

* @return Table containing the specified data

251

*/

252

Table fromValues(Collection<Row> rows);

253

254

/**

255

* Creates a table from values with explicit schema

256

* @param rowDataType DataType defining the schema for the rows

257

* @param rows List of Row objects containing the data

258

* @return Table with specified schema and data

259

*/

260

Table fromValues(AbstractDataType<?> rowDataType, Row... rows);

261

262

/**

263

* Creates a table from values with explicit schema

264

* @param rowDataType DataType defining the schema for the rows

265

* @param rows Collection of Row objects containing the data

266

* @return Table with specified schema and data

267

*/

268

Table fromValues(AbstractDataType<?> rowDataType, Collection<Row> rows);

269

270

/**

271

* Creates a table from expression values

272

* @param expressions List of expressions representing row values

273

* @return Table containing the expression values

274

*/

275

Table fromValues(Expression... expressions);

276

277

/**

278

* Creates a table from expression values

279

* @param expressions Collection of expressions representing row values

280

* @return Table containing the expression values

281

*/

282

Table fromValues(Collection<Expression> expressions);

283

284

/**

285

* Creates a table from expression values with explicit data type

286

* @param rowDataType DataType defining the schema for the expressions

287

* @param expressions List of expressions representing row values

288

* @return Table with specified schema and expression values

289

*/

290

Table fromValues(AbstractDataType<?> rowDataType, Expression... expressions);

291

292

/**

293

* Creates a table from expression values with explicit data type

294

* @param rowDataType DataType defining the schema for the expressions

295

* @param expressions Collection of expressions representing row values

296

* @return Table with specified schema and expression values

297

*/

298

Table fromValues(AbstractDataType<?> rowDataType, Collection<Expression> expressions);

299

```

300

301

**Usage Examples:**

302

303

```java

304

import static org.apache.flink.table.api.Expressions.*;

305

306

// Create table from Row objects

307

Row row1 = Row.of(1L, "Alice", 25);

308

Row row2 = Row.of(2L, "Bob", 30);

309

Row row3 = Row.of(3L, "Charlie", 35);

310

311

Table fromRowsTable = tableEnv.fromValues(row1, row2, row3);

312

313

// Create table with explicit schema

314

AbstractDataType<?> rowType = DataTypes.ROW(

315

DataTypes.FIELD("id", DataTypes.BIGINT()),

316

DataTypes.FIELD("name", DataTypes.STRING()),

317

DataTypes.FIELD("age", DataTypes.INT())

318

);

319

320

Table typedTable = tableEnv.fromValues(rowType, row1, row2, row3);

321

322

// Create table from expressions

323

Table expressionTable = tableEnv.fromValues(

324

row(1, "Alice", 25),

325

row(2, "Bob", 30),

326

row(3, "Charlie", 35)

327

);

328

329

// Create table with explicit data type for expressions

330

Table typedExpressionTable = tableEnv.fromValues(

331

rowType,

332

row(1, "Alice", 25),

333

row(2, "Bob", 30),

334

row(3, "Charlie", 35)

335

);

336

337

// Use in complex queries

338

Table result = expressionTable

339

.filter($("age").isGreater(25))

340

.select($("name"), $("age"));

341

```

342

343

### Function Management

344

345

Manages user-defined functions registration and lifecycle.

346

347

```java { .api }

348

/**

349

* Registers a temporary system function that can be used in SQL and Table API

350

* @param name Function name for SQL usage

351

* @param function UserDefinedFunction implementation

352

*/

353

void createTemporarySystemFunction(String name, UserDefinedFunction function);

354

355

/**

356

* Creates a persistent function in the catalog

357

* @param path Function path in catalog.database.function format

358

* @param functionClass Class implementing the function

359

*/

360

void createFunction(String path, Class<? extends UserDefinedFunction> functionClass);

361

362

/**

363

* Drops a temporary system function

364

* @param name Function name to drop

365

* @return true if function existed and was dropped

366

*/

367

boolean dropTemporarySystemFunction(String name);

368

369

/**

370

* Drops a persistent function from the catalog

371

* @param path Function path in catalog.database.function format

372

*/

373

void dropFunction(String path);

374

```

375

376

**Usage Examples:**

377

378

```java

379

// Register scalar function

380

ScalarFunction myUpper = new ScalarFunction() {

381

public String eval(String input) {

382

return input != null ? input.toUpperCase() : null;

383

}

384

};

385

tableEnv.createTemporarySystemFunction("my_upper", myUpper);

386

387

// Use in SQL

388

Table result = tableEnv.sqlQuery("SELECT my_upper(name) FROM users");

389

390

// Register aggregate function

391

tableEnv.createTemporarySystemFunction("my_avg", new MyAverageFunction());

392

393

// Create persistent function

394

tableEnv.createFunction("my_catalog.my_db.my_function", MyFunction.class);

395

```

396

397

### Configuration and Context

398

399

Manages table environment configuration and context information.

400

401

```java { .api }

402

/**

403

* Gets the table configuration for this environment

404

* @return TableConfig instance for accessing and modifying settings

405

*/

406

TableConfig getConfig();

407

408

/**

409

* Gets the current catalog name

410

* @return Current catalog name

411

*/

412

String getCurrentCatalog();

413

414

/**

415

* Gets the current database name

416

* @return Current database name

417

*/

418

String getCurrentDatabase();

419

```

420

421

**Usage Examples:**

422

423

```java

424

// Access configuration

425

TableConfig config = tableEnv.getConfig();

426

config.getConfiguration().setString("table.exec.mini-batch.enabled", "true");

427

428

// Get current context

429

String catalog = tableEnv.getCurrentCatalog();

430

String database = tableEnv.getCurrentDatabase();

431

```

432

433

### Statement Set Operations

434

435

Creates and manages statement sets for batch execution of multiple operations.

436

437

```java { .api }

438

/**

439

* Creates a StatementSet for batch execution of multiple statements

440

* @return StatementSet instance for adding multiple operations

441

*/

442

StatementSet createStatementSet();

443

```

444

445

**Usage Examples:**

446

447

```java

448

// Batch multiple operations

449

StatementSet stmtSet = tableEnv.createStatementSet();

450

stmtSet.addInsertSql("INSERT INTO sink1 SELECT * FROM source WHERE type = 'A'");

451

stmtSet.addInsertSql("INSERT INTO sink2 SELECT * FROM source WHERE type = 'B'");

452

stmtSet.addInsert("sink3", processedTable);

453

454

// Execute all statements together

455

TableResult result = stmtSet.execute();

456

```

457

458

## Types

459

460

### Environment Settings

461

462

```java { .api }

463

class EnvironmentSettings {

464

static Builder newInstance();

465

466

interface Builder {

467

Builder useBlinkPlanner();

468

Builder useAnyPlanner();

469

Builder inStreamingMode();

470

Builder inBatchMode();

471

Builder withConfiguration(Configuration configuration);

472

EnvironmentSettings build();

473

}

474

}

475

```

476

477

### Table Configuration

478

479

```java { .api }

480

class TableConfig {

481

Configuration getConfiguration();

482

void setSqlDialect(SqlDialect sqlDialect);

483

SqlDialect getSqlDialect();

484

void setLocalTimeZone(ZoneId zoneId);

485

ZoneId getLocalTimeZone();

486

}

487

```

488

489

### Statement Set

490

491

```java { .api }

492

interface StatementSet {

493

StatementSet addInsertSql(String statement);

494

StatementSet addInsert(String targetPath, Table table);

495

StatementSet add(ModifyOperation modifyOperation);

496

String explain();

497

TableResult execute();

498

}

499

```