or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdsources-sinks.mdsql-integration.mdtable-environment.mdtable-operations.mdtype-system.mduser-defined-functions.mdwindow-operations.md

index.mddocs/

0

# Flink Table API

1

2

Apache Flink Table API provides a high-level declarative API for both stream and batch processing that supports SQL-like queries and operations. It offers a unified programming model allowing developers to write queries using either the Table API (language-embedded query API for Scala and Java) or SQL, enabling operations like filtering, joining, aggregating, and windowing on structured data streams and datasets.

3

4

## Package Information

5

6

- **Package Name**: flink-table_2.11

7

- **Package Type**: maven

8

- **Language**: Scala

9

- **Installation**: `maven: org.apache.flink/flink-table_2.11/1.5.1`

10

11

## Core Imports

12

13

**Scala:**

14

```scala

15

import org.apache.flink.table.api._

16

import org.apache.flink.table.api.scala._

17

import org.apache.flink.streaming.api.scala._

18

import org.apache.flink.api.scala._

19

```

20

21

**Java:**

22

```java

23

import org.apache.flink.table.api.*;

24

import org.apache.flink.table.api.java.*;

25

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

26

import org.apache.flink.api.java.ExecutionEnvironment;

27

```

28

29

## Basic Usage

30

31

**Scala Batch Example:**

32

```scala

33

import org.apache.flink.api.scala._

34

import org.apache.flink.table.api.scala._

35

36

val env = ExecutionEnvironment.getExecutionEnvironment

37

val tEnv = TableEnvironment.getTableEnvironment(env)

38

39

val input: DataSet[(String, Int)] = env.fromElements(

40

("Hello", 2), ("Hello", 5), ("Ciao", 3)

41

)

42

43

val result = input

44

.toTable(tEnv, 'word, 'count)

45

.groupBy('word)

46

.select('word, 'count.avg)

47

48

result.print()

49

```

50

51

**Scala Streaming Example:**

52

```scala

53

import org.apache.flink.streaming.api.scala._

54

import org.apache.flink.table.api.scala._

55

56

val env = StreamExecutionEnvironment.getExecutionEnvironment

57

val tEnv = TableEnvironment.getTableEnvironment(env)

58

59

val input: DataStream[(String, Int)] = env.fromElements(

60

("Hello", 2), ("Hello", 5), ("Ciao", 3)

61

)

62

63

val result = input

64

.toTable(tEnv, 'word, 'count)

65

.select('word, 'count * 2)

66

67

tEnv.toAppendStream[Row](result).print()

68

```

69

70

## Architecture

71

72

The Flink Table API is built around several key components:

73

74

- **TableEnvironment**: Main entry point providing table registration, SQL execution, and configuration

75

- **Table**: Core abstraction representing relational data with fluent query operations

76

- **Type System**: Rich type definitions supporting primitive, complex, and temporal types

77

- **Expression System**: Type-safe expression building for queries and transformations

78

- **Source/Sink Integration**: Pluggable connectors for external data systems

79

- **Function Framework**: User-defined scalar, table, and aggregate functions

80

- **SQL Integration**: Apache Calcite-based SQL parser and optimizer

81

82

## Capabilities

83

84

### Table Environment Management

85

86

Central management for table operations, SQL execution, and resource configuration. Essential for initializing both batch and streaming table environments.

87

88

```scala { .api }

89

abstract class TableEnvironment {

90

def getConfig: TableConfig

91

def scan(tablePath: String*): Table

92

def fromTableSource(source: TableSource[_]): Table

93

def registerTable(name: String, table: Table): Unit

94

def registerTableSource(name: String, tableSource: TableSource[_]): Unit

95

def registerFunction(name: String, function: ScalarFunction): Unit

96

def sqlQuery(query: String): Table

97

def sqlUpdate(stmt: String): Unit

98

def listTables(): Array[String]

99

def explain(table: Table): String

100

}

101

102

object TableEnvironment {

103

def getTableEnvironment(executionEnvironment: ExecutionEnvironment): BatchTableEnvironment

104

def getTableEnvironment(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment

105

}

106

```

107

108

[Table Environment](./table-environment.md)

109

110

### Table Operations and Queries

111

112

Core table abstraction providing SQL-like operations for data transformation, filtering, aggregation, and joining.

113

114

```scala { .api }

115

class Table {

116

def select(fields: Expression*): Table

117

def select(fields: String): Table

118

def filter(predicate: Expression): Table

119

def where(predicate: Expression): Table

120

def groupBy(fields: Expression*): GroupedTable

121

def orderBy(fields: Expression*): Table

122

def distinct(): Table

123

def join(right: Table): Table

124

def join(right: Table, joinPredicate: Expression): Table

125

def leftOuterJoin(right: Table, joinPredicate: Expression): Table

126

def union(right: Table): Table

127

def window(window: Window): WindowedTable

128

def as(fields: Expression*): Table

129

def getSchema: TableSchema

130

def insertInto(tableName: String): Unit

131

}

132

```

133

134

[Table Operations](./table-operations.md)

135

136

### Type System and Schema Management

137

138

Rich type system supporting primitive, complex, and temporal types with schema definition and validation.

139

140

```scala { .api }

141

object Types {

142

val STRING: TypeInformation[String]

143

val BOOLEAN: TypeInformation[java.lang.Boolean]

144

val INT: TypeInformation[java.lang.Integer]

145

val LONG: TypeInformation[java.lang.Long]

146

val DOUBLE: TypeInformation[java.lang.Double]

147

val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp]

148

def ROW(types: TypeInformation[_]*): TypeInformation[Row]

149

def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row]

150

def MAP[K, V](keyType: TypeInformation[K], valueType: TypeInformation[V]): TypeInformation[java.util.Map[K, V]]

151

}

152

153

class TableSchema {

154

def getFieldNames: Array[String]

155

def getFieldTypes: Array[TypeInformation[_]]

156

}

157

```

158

159

[Type System](./type-system.md)

160

161

### User-Defined Functions

162

163

Framework for creating custom scalar, table, and aggregate functions with lifecycle management and context access.

164

165

```scala { .api }

166

abstract class UserDefinedFunction {

167

def open(context: FunctionContext): Unit

168

def close(): Unit

169

def isDeterministic: Boolean

170

}

171

172

abstract class ScalarFunction extends UserDefinedFunction {

173

def getResultType(signature: Array[Class[_]]): TypeInformation[_]

174

def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]]

175

}

176

177

abstract class TableFunction[T] extends UserDefinedFunction {

178

protected def collect(result: T): Unit

179

}

180

181

abstract class AggregateFunction[T, ACC] extends UserDefinedFunction {

182

def createAccumulator(): ACC

183

def getValue(accumulator: ACC): T

184

}

185

```

186

187

[User-Defined Functions](./user-defined-functions.md)

188

189

### Data Sources and Sinks

190

191

Pluggable interfaces for integrating external data systems with support for projection and filter pushdown.

192

193

```scala { .api }

194

trait TableSource[T] {

195

def getReturnType: TypeInformation[T]

196

def getTableSchema: TableSchema

197

def explainSource(): String

198

}

199

200

trait BatchTableSource[T] extends TableSource[T] {

201

def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]

202

}

203

204

trait StreamTableSource[T] extends TableSource[T] {

205

def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]

206

}

207

208

trait TableSink[T] {

209

def getOutputType: TypeInformation[T]

210

def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[T]

211

}

212

```

213

214

[Sources and Sinks](./sources-sinks.md)

215

216

### Window Operations

217

218

Time and count-based windowing operations for stream processing with tumbling, sliding, and session window support.

219

220

```scala { .api }

221

sealed trait Window

222

223

case class TumbleWithSize(size: Expression) extends Window

224

case class SlideWithSize(size: Expression) extends Window

225

case class SessionWithGap(gap: Expression) extends Window

226

227

class WindowedTable {

228

def groupBy(fields: Expression*): WindowGroupedTable

229

}

230

231

case class OverWindow(

232

partitionBy: Seq[Expression],

233

orderBy: Expression,

234

preceding: Expression,

235

following: Expression

236

)

237

```

238

239

[Window Operations](./window-operations.md)

240

241

### SQL Integration

242

243

Direct SQL query execution with full DDL and DML support, leveraging Apache Calcite for parsing and optimization.

244

245

```scala { .api }

246

// Available on TableEnvironment

247

def sqlQuery(query: String): Table

248

def sqlUpdate(stmt: String): Unit

249

```

250

251

**Usage Examples:**

252

```scala

253

// Query execution

254

val result = tEnv.sqlQuery("SELECT word, COUNT(*) FROM WordTable GROUP BY word")

255

256

// DDL operations

257

tEnv.sqlUpdate("CREATE TABLE MyTable (name STRING, age INT)")

258

259

// DML operations

260

tEnv.sqlUpdate("INSERT INTO MyTable SELECT name, age FROM SourceTable")

261

```

262

263

[SQL Integration](./sql-integration.md)

264

265

## Types

266

267

```scala { .api }

268

case class Row(values: Any*)

269

270

class TableConfig {

271

def getTimeZone: TimeZone

272

def setTimeZone(timeZone: TimeZone): Unit

273

}

274

275

trait FunctionContext {

276

def getMetricGroup: MetricGroup

277

def getCachedFile(name: String): File

278

}

279

280

abstract class QueryConfig

281

abstract class BatchQueryConfig extends QueryConfig

282

abstract class StreamQueryConfig extends QueryConfig

283

284

trait ExternalCatalog {

285

def getTable(tablePath: String*): Table

286

def listTables(): java.util.List[String]

287

def getDatabase(databaseName: String): ExternalCatalogDatabase

288

}

289

290

trait ExternalCatalogDatabase {

291

def getTable(tableName: String): Table

292

def listTables(): java.util.List[String]

293

}

294

295

object ExpressionParser {

296

def parseExpression(expression: String): Expression

297

def parseExpressionList(expression: String): Seq[Expression]

298

}

299

300

class ValidationException(message: String) extends TableException(message)

301

class TableException(message: String) extends RuntimeException(message)

302

```