or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregation-grouping.mdcatalog-management.mdexpressions.mdindex.mdsql-integration.mdtable-environment.mdtable-operations.mduser-defined-functions.mdwindow-operations.md

table-environment.mddocs/

0

# Table Environment and Setup

1

2

The TableEnvironment is the main entry point and central context for all Table API operations. It handles the creation of tables, execution of SQL statements, catalog management, and configuration of the execution environment.

3

4

## Capabilities

5

6

### TableEnvironment Creation

7

8

Creates table environments configured for specific execution modes and settings.

9

10

```java { .api }

11

/**

12

* Creates a table environment based on the provided settings

13

* @param settings Configuration settings for the table environment

14

* @return Configured TableEnvironment instance

15

*/

16

public static TableEnvironment create(EnvironmentSettings settings);

17

```

18

19

**Usage Examples:**

20

21

```java

22

// Streaming mode environment

23

EnvironmentSettings streamSettings = EnvironmentSettings

24

.newInstance()

25

.inStreamingMode()

26

.build();

27

TableEnvironment streamEnv = TableEnvironment.create(streamSettings);

28

29

// Batch mode environment

30

EnvironmentSettings batchSettings = EnvironmentSettings

31

.newInstance()

32

.inBatchMode()

33

.build();

34

TableEnvironment batchEnv = TableEnvironment.create(batchSettings);

35

```

36

37

### Table Access and Creation

38

39

Methods for accessing existing tables and creating new table definitions.

40

41

```java { .api }

42

/**

43

* Gets a table from the catalog by path

44

* @param path Table path in format [[catalog.]database.]table

45

* @return Table instance for the specified path

46

*/

47

public Table from(String path);

48

49

/**

50

* Creates a table in the catalog with the specified descriptor

51

* @param path Table path where to create the table

52

* @param descriptor Table descriptor defining schema and properties

53

*/

54

public void createTable(String path, TableDescriptor descriptor);

55

56

/**

57

* Creates a temporary view from a Table object

58

* @param path View name/path

59

* @param view Table object to create view from

60

*/

61

public void createTemporaryView(String path, Table view);

62

63

/**

64

* Creates a temporary table from a descriptor

65

* @param path Table path

66

* @param descriptor Table descriptor

67

*/

68

public void createTemporaryTable(String path, TableDescriptor descriptor);

69

```

70

71

**Usage Examples:**

72

73

```java

74

// Access existing table

75

Table orders = tableEnv.from("catalog1.database1.orders");

76

Table customers = tableEnv.from("customers"); // uses current catalog/database

77

78

// Create table with descriptor

79

TableDescriptor descriptor = TableDescriptor

80

.forConnector("filesystem")

81

.schema(Schema.newBuilder()

82

.column("id", DataTypes.BIGINT())

83

.column("name", DataTypes.STRING())

84

.column("price", DataTypes.DECIMAL(10, 2))

85

.build())

86

.option("path", "/path/to/data")

87

.option("format", "csv")

88

.build();

89

90

tableEnv.createTable("my_table", descriptor);

91

92

// Create temporary view

93

Table filteredOrders = orders.filter($("amount").isGreater(100));

94

tableEnv.createTemporaryView("large_orders", filteredOrders);

95

```

96

97

### SQL Execution

98

99

Execute SQL statements and queries with full SQL support.

100

101

```java { .api }

102

/**

103

* Executes a SQL query and returns the result as a Table

104

* @param query SQL SELECT query

105

* @return Table containing query results

106

*/

107

public Table sqlQuery(String query);

108

109

/**

110

* Executes a SQL statement (DDL, DML, or query)

111

* @param statement SQL statement to execute

112

* @return TableResult containing execution results

113

*/

114

public TableResult executeSql(String statement);

115

```

116

117

**Usage Examples:**

118

119

```java

120

// SQL query

121

Table result = tableEnv.sqlQuery(

122

"SELECT customer_id, SUM(amount) as total " +

123

"FROM orders " +

124

"WHERE order_date >= '2023-01-01' " +

125

"GROUP BY customer_id"

126

);

127

128

// SQL DDL statement

129

tableEnv.executeSql(

130

"CREATE TABLE user_behavior (" +

131

" user_id BIGINT," +

132

" item_id BIGINT," +

133

" category_id BIGINT," +

134

" behavior STRING," +

135

" ts TIMESTAMP(3)" +

136

") WITH (" +

137

" 'connector' = 'kafka'," +

138

" 'topic' = 'user_behavior'," +

139

" 'properties.bootstrap.servers' = 'localhost:9092'" +

140

")"

141

);

142

143

// SQL DML statement

144

tableEnv.executeSql(

145

"INSERT INTO result_table " +

146

"SELECT user_id, COUNT(*) as behavior_cnt " +

147

"FROM user_behavior " +

148

"GROUP BY user_id"

149

);

150

```

151

152

### Catalog Operations

153

154

Manage catalogs, databases, and metadata contexts.

155

156

```java { .api }

157

/**

158

* Register a catalog with the given name

159

* @param catalogName Name to register the catalog under

160

* @param catalog Catalog instance to register

161

*/

162

public void registerCatalog(String catalogName, Catalog catalog);

163

164

/**

165

* Set the current catalog for resolving unqualified table references

166

* @param catalogName Name of catalog to use as current

167

*/

168

public void useCatalog(String catalogName);

169

170

/**

171

* Set the current database within the current catalog

172

* @param databaseName Name of database to use as current

173

*/

174

public void useDatabase(String databaseName);

175

176

/**

177

* Get the current catalog name

178

* @return Name of the current catalog

179

*/

180

public String getCurrentCatalog();

181

182

/**

183

* Get the current database name

184

* @return Name of the current database

185

*/

186

public String getCurrentDatabase();

187

188

/**

189

* List all registered catalog names

190

* @return Array of catalog names

191

*/

192

public String[] listCatalogs();

193

194

/**

195

* List all databases in the current catalog

196

* @return Array of database names

197

*/

198

public String[] listDatabases();

199

200

/**

201

* List all tables in the current database

202

* @return Array of table names

203

*/

204

public String[] listTables();

205

```

206

207

### Function Registration

208

209

Register user-defined functions for use in Table API and SQL.

210

211

```java { .api }

212

/**

213

* Register a scalar function under the given name

214

* @param name Function name to register under

215

* @param function ScalarFunction instance

216

*/

217

public void registerFunction(String name, ScalarFunction function);

218

219

/**

220

* Create a temporary system function from a class name

221

* @param name Function name

222

* @param functionClass Function class name

223

*/

224

public void createTemporarySystemFunction(String name, String functionClass);

225

226

/**

227

* Create a temporary function from a function instance

228

* @param path Function path/name

229

* @param function Function instance

230

*/

231

public void createTemporaryFunction(String path, UserDefinedFunction function);

232

```

233

234

### Configuration Management

235

236

Access and modify table environment configuration.

237

238

```java { .api }

239

/**

240

* Get the table configuration for this environment

241

* @return TableConfig instance

242

*/

243

public TableConfig getConfig();

244

```

245

246

### Statement Sets

247

248

Create statement sets for batching multiple DML operations.

249

250

```java { .api }

251

/**

252

* Create a StatementSet for executing multiple statements together

253

* @return StatementSet instance

254

*/

255

public StatementSet createStatementSet();

256

```

257

258

## Environment Settings

259

260

Configuration class for creating table environments with specific settings.

261

262

```java { .api }

263

/**

264

* Creates a new EnvironmentSettings builder

265

* @return Builder for configuring environment settings

266

*/

267

public static EnvironmentSettings.Builder newInstance();

268

269

public static class Builder {

270

/**

271

* Sets the planner to Blink planner (default and only option in newer versions)

272

* @return Builder instance for chaining

273

*/

274

public Builder useBlinkPlanner();

275

276

/**

277

* Configures environment for streaming mode execution

278

* @return Builder instance for chaining

279

*/

280

public Builder inStreamingMode();

281

282

/**

283

* Configures environment for batch mode execution

284

* @return Builder instance for chaining

285

*/

286

public Builder inBatchMode();

287

288

/**

289

* Sets a custom class loader for the environment

290

* @param classLoader Custom class loader

291

* @return Builder instance for chaining

292

*/

293

public Builder withClassLoader(ClassLoader classLoader);

294

295

/**

296

* Builds the EnvironmentSettings with configured options

297

* @return EnvironmentSettings instance

298

*/

299

public EnvironmentSettings build();

300

}

301

```

302

303

**Usage Examples:**

304

305

```java

306

// Default streaming environment

307

EnvironmentSettings settings = EnvironmentSettings

308

.newInstance()

309

.inStreamingMode()

310

.build();

311

312

// Batch environment with custom class loader

313

ClassLoader customClassLoader = // ... custom class loader

314

EnvironmentSettings batchSettings = EnvironmentSettings

315

.newInstance()

316

.inBatchMode()

317

.withClassLoader(customClassLoader)

318

.build();

319

```

320

321

## Configuration Types

322

323

```java { .api }

324

public final class TableConfig implements WritableConfig, ReadableConfig {

325

/**

326

* Get the underlying configuration object

327

* @return Configuration instance

328

*/

329

public Configuration getConfiguration();

330

331

/**

332

* Set the SQL dialect for parsing SQL statements

333

* @param dialect SQL dialect to use

334

*/

335

public void setSqlDialect(SqlDialect dialect);

336

337

/**

338

* Set the parallelism for table operations

339

* @param parallelism Parallelism level

340

*/

341

public void setParallelism(int parallelism);

342

343

/**

344

* Get the current parallelism setting

345

* @return Current parallelism level

346

*/

347

public int getParallelism();

348

}

349

350

public enum SqlDialect {

351

/** Default Flink SQL dialect */

352

DEFAULT,

353

/** Hive-compatible SQL dialect */

354

HIVE

355

}

356

```