or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdsources-sinks.mdsql-integration.mdtable-environment.mdtable-operations.mdtype-system.mduser-defined-functions.mdwindow-operations.md

table-environment.mddocs/

0

# Table Environment

1

2

The TableEnvironment is the main entry point of the Table API, providing methods for registering tables, executing SQL queries, and configuring the table program.

3

4

## Capabilities

5

6

### Environment Creation

7

8

Factory methods for creating batch and streaming table environments.

9

10

```scala { .api }

11

object TableEnvironment {

12

/**

13

* Creates a batch table environment with an ExecutionEnvironment

14

* @param executionEnvironment The batch execution environment

15

* @returns BatchTableEnvironment for batch processing

16

*/

17

def getTableEnvironment(executionEnvironment: ExecutionEnvironment): BatchTableEnvironment

18

19

/**

20

* Creates a streaming table environment with a StreamExecutionEnvironment

21

* @param executionEnvironment The stream execution environment

22

* @returns StreamTableEnvironment for stream processing

23

*/

24

def getTableEnvironment(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment

25

}

26

```

27

28

**Usage Examples:**

29

30

```scala

31

import org.apache.flink.api.scala._

32

import org.apache.flink.streaming.api.scala._

33

import org.apache.flink.table.api.scala._

34

35

// Batch environment

36

val batchEnv = ExecutionEnvironment.getExecutionEnvironment

37

val batchTableEnv = TableEnvironment.getTableEnvironment(batchEnv)

38

39

// Streaming environment

40

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment

41

val streamTableEnv = TableEnvironment.getTableEnvironment(streamEnv)

42

```

43

44

### Table Registration and Management

45

46

Register and manage tables within the table environment.

47

48

```scala { .api }

49

/**

50

* Registers a table in the catalog for later reference

51

* @param name Name to register the table under

52

* @param table Table instance to register

53

*/

54

def registerTable(name: String, table: Table): Unit

55

56

/**

57

* Registers a table source in the catalog

58

* @param name Name to register the source under

59

* @param tableSource The table source to register

60

*/

61

def registerTableSource(name: String, tableSource: TableSource[_]): Unit

62

63

/**

64

* Registers a table sink in the catalog

65

* @param name Name to register the sink under

66

* @param fieldNames Field names of the sink

67

* @param fieldTypes Field types of the sink

68

* @param tableSink The table sink to register

69

*/

70

def registerTableSink(

71

name: String,

72

fieldNames: Array[String],

73

fieldTypes: Array[TypeInformation[_]],

74

tableSink: TableSink[_]

75

): Unit

76

77

/**

78

* Scans a registered table by name

79

* @param tablePath Table name or path components

80

* @returns Table instance for the registered table

81

*/

82

def scan(tablePath: String*): Table

83

84

/**

85

* Creates a table from a table source

86

* @param source The table source to create table from

87

* @returns Table instance wrapping the source

88

*/

89

def fromTableSource(source: TableSource[_]): Table

90

91

/**

92

* Lists all registered tables in the environment

93

* @returns Array of table names

94

*/

95

def listTables(): Array[String]

96

```

97

98

**Usage Examples:**

99

100

```scala

101

// Register a table

102

val dataSet = env.fromElements((1, "Alice"), (2, "Bob"))

103

val table = dataSet.toTable(tEnv, 'id, 'name)

104

tEnv.registerTable("Users", table)

105

106

// Scan registered table

107

val users = tEnv.scan("Users")

108

109

// Register and use table source

110

val csvSource = new CsvTableSource("/path/to/file.csv", Array("id", "name"), Array(Types.INT, Types.STRING))

111

tEnv.registerTableSource("CsvUsers", csvSource)

112

val csvTable = tEnv.scan("CsvUsers")

113

```

114

115

### SQL Execution

116

117

Execute SQL queries and statements directly on the table environment.

118

119

```scala { .api }

120

/**

121

* Executes a SQL query and returns the result as a Table

122

* @param query The SQL query string

123

* @returns Table containing query results

124

*/

125

def sqlQuery(query: String): Table

126

127

/**

128

* Executes a SQL statement (DDL/DML)

129

* @param stmt The SQL statement string

130

*/

131

def sqlUpdate(stmt: String): Unit

132

```

133

134

**Usage Examples:**

135

136

```scala

137

// SQL queries

138

val result = tEnv.sqlQuery("SELECT id, name FROM Users WHERE id > 1")

139

val aggregated = tEnv.sqlQuery("SELECT COUNT(*) as user_count FROM Users")

140

141

// SQL DDL/DML

142

tEnv.sqlUpdate("CREATE TABLE OutputTable (id INT, name STRING)")

143

tEnv.sqlUpdate("INSERT INTO OutputTable SELECT * FROM Users")

144

```

145

146

### Function Registration

147

148

Register user-defined functions for use in queries.

149

150

```scala { .api }

151

/**

152

* Registers a scalar function

153

* @param name Function name for SQL usage

154

* @param function ScalarFunction implementation

155

*/

156

def registerFunction(name: String, function: ScalarFunction): Unit

157

158

/**

159

* Lists all registered user-defined functions

160

* @returns Array of function names

161

*/

162

def listUserDefinedFunctions(): Array[String]

163

```

164

165

**Usage Examples:**

166

167

```scala

168

// Register custom function

169

class AddOne extends ScalarFunction {

170

def eval(x: Int): Int = x + 1

171

}

172

173

tEnv.registerFunction("addOne", new AddOne())

174

175

// Use in SQL

176

val result = tEnv.sqlQuery("SELECT addOne(id) FROM Users")

177

```

178

179

### External Catalog Management

180

181

Register external catalogs for accessing external metadata stores.

182

183

```scala { .api }

184

/**

185

* Registers an external catalog

186

* @param name Catalog name

187

* @param externalCatalog External catalog implementation

188

*/

189

def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit

190

191

/**

192

* Gets the default catalog

193

* @returns The default external catalog

194

*/

195

def getDefaultCatalog: ExternalCatalog

196

```

197

198

### Configuration and Utilities

199

200

Access configuration and utility methods.

201

202

```scala { .api }

203

/**

204

* Gets the table configuration

205

* @returns TableConfig instance

206

*/

207

def getConfig: TableConfig

208

209

/**

210

* Explains the execution plan for a table

211

* @param table Table to explain

212

* @returns String representation of execution plan

213

*/

214

def explain(table: Table): String

215

```

216

217

**Usage Examples:**

218

219

```scala

220

// Configure timezone

221

val config = tEnv.getConfig

222

config.setTimeZone(TimeZone.getTimeZone("UTC"))

223

224

// Explain query plan

225

val table = tEnv.scan("Users").select('name)

226

println(tEnv.explain(table))

227

```

228

229

## Specialized Environments

230

231

### BatchTableEnvironment

232

233

Batch-specific table environment with DataSet conversion capabilities.

234

235

```scala { .api }

236

abstract class BatchTableEnvironment extends TableEnvironment {

237

/**

238

* Converts a Table to a DataSet

239

* @param table Table to convert

240

* @returns DataSet of Row

241

*/

242

def toDataSet[T](table: Table): DataSet[T]

243

244

/**

245

* Converts a Table to a DataSet with specific type

246

* @param table Table to convert

247

* @param typeInfo Type information for conversion

248

* @returns Typed DataSet

249

*/

250

def toDataSet[T](table: Table, typeInfo: TypeInformation[T]): DataSet[T]

251

}

252

```

253

254

### StreamTableEnvironment

255

256

Stream-specific table environment with DataStream conversion capabilities.

257

258

```scala { .api }

259

abstract class StreamTableEnvironment extends TableEnvironment {

260

/**

261

* Converts a Table to an append-only DataStream

262

* @param table Table to convert

263

* @returns DataStream of Row

264

*/

265

def toAppendStream[T](table: Table): DataStream[T]

266

267

/**

268

* Converts a Table to a retract DataStream

269

* @param table Table to convert

270

* @returns DataStream of (Boolean, T) where Boolean indicates add/retract

271

*/

272

def toRetractStream[T](table: Table): DataStream[(Boolean, T)]

273

}

274

```

275

276

**Usage Examples:**

277

278

```scala

279

// Batch conversion

280

val batchResult: DataSet[Row] = batchTableEnv.toDataSet(table)

281

282

// Stream conversions

283

val appendStream: DataStream[Row] = streamTableEnv.toAppendStream(table)

284

val retractStream: DataStream[(Boolean, Row)] = streamTableEnv.toRetractStream(aggregatedTable)

285

```

286

287

## Types

288

289

```scala { .api }

290

abstract class TableEnvironment

291

abstract class BatchTableEnvironment extends TableEnvironment

292

abstract class StreamTableEnvironment extends TableEnvironment

293

294

trait ExternalCatalog {

295

/**

296

* Gets a table from the external catalog

297

* @param tablePath Table path components

298

* @returns Table from the external catalog

299

*/

300

def getTable(tablePath: String*): Table

301

302

/**

303

* Lists all tables in the external catalog

304

* @returns List of table names

305

*/

306

def listTables(): java.util.List[String]

307

308

/**

309

* Gets a database from the external catalog

310

* @param databaseName Name of the database

311

* @returns External catalog database

312

*/

313

def getDatabase(databaseName: String): ExternalCatalogDatabase

314

}

315

316

trait ExternalCatalogDatabase {

317

/**

318

* Gets a table from the database

319

* @param tableName Name of the table

320

* @returns Table from the database

321

*/

322

def getTable(tableName: String): Table

323

324

/**

325

* Lists all tables in the database

326

* @returns List of table names in the database

327

*/

328

def listTables(): java.util.List[String]

329

}

330

```