or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-table-api-scala_2-12

Scala API for Apache Flink's Table & SQL ecosystem with type-safe bindings and comprehensive support for Scala-specific types

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-table-api-scala_2.12@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-api-scala_2-12@2.1.0

0

# Apache Flink Scala Table API

1

2

The Apache Flink Scala Table API provides Scala-specific bindings for Flink's Table & SQL ecosystem. It enables type-safe Scala programming with implicit conversions, operator overloading, and comprehensive support for Scala-specific types like case classes, Option, Either, and collections.

3

4

**⚠️ Deprecation Notice**: All Flink Scala APIs are deprecated as of Flink 1.18.0 (FLIP-265) and will be removed in a future major version. Users should migrate to the Java Table API while continuing to use Scala as their application language.

5

6

## Package Information

7

8

- **Package Name**: flink-table-api-scala_2.12

9

- **Package Type**: maven

10

- **Language**: Scala (2.12)

11

- **Group ID**: org.apache.flink

12

- **Installation**: Add to `pom.xml`:

13

```xml

14

<dependency>

15

<groupId>org.apache.flink</groupId>

16

<artifactId>flink-table-api-scala_2.12</artifactId>

17

<version>2.1.0</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```scala

24

import org.apache.flink.table.api._

25

import org.apache.flink.table.api.bridge.scala._

26

```

27

28

The first import provides access to:

29

- Implicit conversions from Scala literals to Expressions

30

- Expression operations and operators

31

- Type information creation macros

32

- Comprehensive type system for Scala types

33

34

The bridge import adds:

35

- DataStream integration capabilities

36

- Conversion utilities between Table and DataStream

37

- StreamTableEnvironment for streaming applications

38

39

## Basic Usage

40

41

```scala

42

import org.apache.flink.table.api._

43

import org.apache.flink.streaming.api.scala._

44

import org.apache.flink.table.api.bridge.scala._

45

46

// Create execution environment and table environment

47

val env = StreamExecutionEnvironment.getExecutionEnvironment

48

val tEnv = StreamTableEnvironment.create(env)

49

50

// Create table from data with case class

51

case class Order(id: Int, product: String, amount: Double)

52

val orders = env.fromElements(

53

Order(1, "laptop", 999.99),

54

Order(2, "mouse", 29.99)

55

)

56

57

val ordersTable = tEnv.fromDataStream(orders)

58

59

// Use Scala-specific syntax with implicit conversions

60

val result = ordersTable

61

.select($"id", $"product", $"amount" * 1.1 as "amountWithTax")

62

.where($"amount" > 50.0)

63

64

// Convert back to DataStream

65

val resultStream = tEnv.toDataStream(result)

66

```

67

68

## Architecture

69

70

The Flink Scala Table API is built around several key components:

71

72

- **Implicit Conversions**: Seamless conversion between Scala values and Flink expressions using trait mixins

73

- **Expression DSL**: Rich domain-specific language with operator overloading for natural Scala syntax

74

- **Type System**: Macro-based type information generation for compile-time type safety

75

- **Scala Type Support**: Comprehensive support for Option, Either, Try, case classes, and collections

76

- **Serialization Layer**: Efficient Kryo-based serialization for all Scala types

77

- **Field Access**: Symbol-based ($'field) and string-based field references

78

79

## Capabilities

80

81

### Expression and Conversion System

82

83

Core expression creation and implicit conversions that enable natural Scala syntax for table operations. Includes literal conversions, field references, and operator overloading.

84

85

```scala { .api }

86

trait ImplicitExpressionConversions {

87

// Constants for window operations

88

implicit val UNBOUNDED_ROW: Expression

89

implicit val UNBOUNDED_RANGE: Expression

90

implicit val CURRENT_ROW: Expression

91

implicit val CURRENT_RANGE: Expression

92

93

// Field reference creation

94

def $(name: String): Expression

95

def col(name: String): Expression

96

97

// Literal creation

98

def lit(v: Any): Expression

99

def lit(v: Any, dataType: DataType): Expression

100

101

// Function calls

102

def call(path: String, params: Expression*): Expression

103

def call(function: UserDefinedFunction, params: Expression*): Expression

104

def call(function: Class[_ <: UserDefinedFunction], params: Expression*): Expression

105

def callSql(sqlExpression: String): Expression

106

}

107

108

// Implicit classes for expression operations

109

implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations

110

implicit class UnresolvedFieldExpression(s: Symbol) extends ImplicitExpressionOperations

111

```

112

113

[Expression and Conversions](./expressions.md)

114

115

### Type System and Type Information

116

117

Comprehensive type system providing TypeInformation for all Scala types including case classes, collections, Option, Either, and Try. Uses macros for automatic type inference.

118

119

```scala { .api }

120

object Types {

121

// Generic type creation

122

def of[T: TypeInformation]: TypeInformation[T]

123

124

// Scala-specific types

125

val UNIT: TypeInformation[Unit]

126

val NOTHING: TypeInformation[Nothing]

127

128

// Factory methods for complex types

129

def CASE_CLASS[T: TypeInformation]: TypeInformation[T]

130

def TUPLE[T: TypeInformation]: TypeInformation[T]

131

def OPTION[A, T <: Option[A]](valueType: TypeInformation[A]): TypeInformation[T]

132

def EITHER[A, B](leftType: TypeInformation[A], rightType: TypeInformation[B]): TypeInformation[Either[A, B]]

133

def TRY[A, T <: Try[A]](valueType: TypeInformation[A]): TypeInformation[T]

134

135

// Collection types

136

def TRAVERSABLE[T: TypeInformation]: TypeInformation[T]

137

def OBJECT_ARRAY[E <: AnyRef](elementType: TypeInformation[E]): TypeInformation[Array[E]]

138

}

139

140

trait ImplicitTypeConversions {

141

// Macro-based type inference

142

implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]

143

implicit val scalaNothingTypeInfo: TypeInformation[Nothing]

144

}

145

```

146

147

[Type System](./types.md)

148

149

### Expression Operations and Operators

150

151

Rich set of expression operations including arithmetic, comparison, logical operators, and specialized operations for table transformations. Provides natural Scala operator syntax.

152

153

```scala { .api }

154

trait ImplicitExpressionOperations {

155

// Field aliasing

156

def as(name: Symbol, extraNames: Symbol*): Expression

157

158

// Comparison operators

159

def >(other: Expression): Expression

160

def >=(other: Expression): Expression

161

def <(other: Expression): Expression

162

def <=(other: Expression): Expression

163

def ===(other: Expression): Expression

164

def !==(other: Expression): Expression

165

166

// Logical operators

167

def &&(other: Expression): Expression

168

def ||(other: Expression): Expression

169

def unary_!: Expression

170

171

// Arithmetic operators

172

def +(other: Expression): Expression

173

def -(other: Expression): Expression

174

def *(other: Expression): Expression

175

def /(other: Expression): Expression

176

def %(other: Expression): Expression

177

def unary_-: Expression

178

def unary_+: Expression

179

180

// Specialized operations

181

def to(other: Expression): Expression // Range for column selection

182

def ?(ifTrue: Expression, ifFalse: Expression): Expression // Ternary conditional

183

def rows: Expression // Row interval for windowing

184

}

185

```

186

187

[Expression Operations](./operations.md)

188

189

### Built-in Functions Library

190

191

Extensive collection of built-in functions for date/time operations, mathematical calculations, string manipulation, JSON processing, and utility operations.

192

193

```scala { .api }

194

// Date and time functions

195

def currentDate(): Expression

196

def currentTime(): Expression

197

def currentTimestamp(): Expression

198

def localTime(): Expression

199

def localTimestamp(): Expression

200

201

// Mathematical functions

202

def pi(): Expression

203

def e(): Expression

204

def rand(): Expression

205

def randInteger(bound: Expression): Expression

206

def atan2(y: Expression, x: Expression): Expression

207

def log(base: Expression, antilogarithm: Expression): Expression

208

def exp(base: Expression): Expression

209

def power(base: Expression, exponent: Expression): Expression

210

def mod(numeric1: Expression, numeric2: Expression): Expression

211

212

// String functions

213

def concat(string: Expression*): Expression

214

def concatWs(separator: Expression, string: Expression*): Expression

215

def uuid(): Expression

216

def upper(string: Expression): Expression

217

def lower(string: Expression): Expression

218

def length(string: Expression): Expression

219

def position(string: Expression, substring: Expression): Expression

220

221

// JSON functions

222

def jsonString(string: Expression): Expression

223

def jsonObject(keyValue: Expression*): Expression

224

def jsonArray(element: Expression*): Expression

225

def jsonValue(jsonString: Expression, path: Expression): Expression

226

def jsonQuery(jsonString: Expression, path: Expression): Expression

227

228

// Utility functions

229

def nullOf(dataType: DataType): Expression

230

def ifThenElse(condition: Expression, ifTrue: Expression, ifFalse: Expression): Expression

231

def coalesce(expr: Expression*): Expression

232

def isnull(expr: Expression): Expression

233

def isNotNull(expr: Expression): Expression

234

```

235

236

[Built-in Functions](./functions.md)

237

238

### Scala Type Information Classes

239

240

Specialized TypeInformation implementations for Scala-specific types providing efficient serialization and type handling for case classes, Option, Either, Try, and collections.

241

242

```scala { .api }

243

// Case class type information

244

abstract class CaseClassTypeInfo[T](

245

clazz: Class[T],

246

typeParamTypeInfos: Array[TypeInformation[_]],

247

fieldTypes: Seq[TypeInformation[_]],

248

fieldNames: Seq[String]

249

) extends TypeInformation[T] {

250

def getFieldNames: Array[String]

251

def getFieldIndex(fieldName: String): Int

252

def getFieldIndices(fields: Array[String]): Array[Int]

253

def getTypeAt[X](fieldExpression: String): TypeInformation[X]

254

def getFlatFields(): List[FlatFieldDescriptor]

255

}

256

257

// Option type information

258

class OptionTypeInfo[A, T <: Option[A]](elemTypeInfo: TypeInformation[A])

259

extends TypeInformation[T]

260

261

// Either type information

262

class EitherTypeInfo[A, B, T <: Either[A, B]](

263

clazz: Class[T],

264

leftTypeInfo: TypeInformation[A],

265

rightTypeInfo: TypeInformation[B]

266

) extends TypeInformation[T]

267

268

// Try type information

269

class TryTypeInfo[A, T <: Try[A]](valueType: TypeInformation[A])

270

extends TypeInformation[T]

271

272

// Collection type information

273

abstract class TraversableTypeInfo[T <: TraversableOnce[E], E](

274

clazz: Class[T],

275

elementTypeInfo: TypeInformation[E]

276

) extends TypeInformation[T]

277

278

// Enumeration type information

279

class EnumValueTypeInfo[E <: Enumeration](enum: E, clazz: Class[E#Value])

280

extends TypeInformation[E#Value] with AtomicType[E#Value]

281

```

282

283

[Type Information Classes](./typeinfo.md)

284

285

### DataStream Integration (Bridge API)

286

287

Integration layer between Table API and DataStream API enabling seamless conversion between Table and DataStream objects with streaming-specific operations.

288

289

```scala { .api }

290

trait StreamTableEnvironment extends TableEnvironment {

291

// DataStream to Table conversion

292

def fromDataStream[T](dataStream: DataStream[T]): Table

293

def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table

294

295

// Table to DataStream conversion

296

def toDataStream(table: Table): DataStream[Row]

297

def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T]

298

def toChangelogStream(table: Table): DataStream[Row]

299

}

300

301

// Implicit conversion classes

302

class TableConversions(table: Table)

303

class DataStreamConversions[T](dataStream: DataStream[T])

304

```

305

306

[DataStream Integration](./bridge.md)

307

308

### Runtime Serialization Support

309

310

Kryo-based serialization configuration and specialized serializers for efficient handling of all Scala types including collections, tuples, and special types in distributed Flink execution.

311

312

```scala { .api }

313

// Kryo configuration for Scala types

314

class FlinkScalaKryoInstantiator extends KryoInstantiator {

315

def newKryo: Kryo // Pre-configured with Scala serializers

316

}

317

318

// Specialized serializers for runtime efficiency

319

class CaseClassSerializer[T <: Product](/* parameters */) extends TypeSerializer[T]

320

class OptionSerializer[A](elementSerializer: TypeSerializer[A]) extends TypeSerializer[Option[A]]

321

class EitherSerializer[A, B](/* parameters */) extends TypeSerializer[Either[A, B]]

322

class TrySerializer[A](elementSerializer: TypeSerializer[A]) extends TypeSerializer[Try[A]]

323

class TraversableSerializer[T <: TraversableOnce[E], E](/* parameters */) extends TypeSerializer[T]

324

class EnumValueSerializer[E <: Enumeration](/* parameters */) extends TypeSerializer[E#Value]

325

```

326

327

[Runtime Serialization](./serialization.md)

328

329

## Migration Guide

330

331

Since all Scala APIs are deprecated, consider these migration approaches:

332

333

1. **Immediate**: Continue using Scala APIs with deprecation warnings

334

2. **Hybrid**: Use Java Table API with Scala DataStream API

335

3. **Full Migration**: Move to Java Table API entirely

336

337

The Java Table API provides equivalent functionality without Scala-specific syntax conveniences.

338

339

## Common Patterns

340

341

### Case Class Integration

342

```scala

343

case class User(id: Int, name: String, email: Option[String])

344

val userTable = tEnv.fromDataStream(users) // Automatic type inference

345

```

346

347

### Option Type Handling

348

```scala

349

val result = table.select($"email".isNotNull ? $"email" : lit("N/A") as "emailDisplay")

350

```

351

352

### Expression Chaining

353

```scala

354

val processed = table

355

.select($"amount" * 1.1 + $"tax" as "total")

356

.where($"total" > 100.0)

357

.groupBy($"category")

358

.select($"category", $"total".sum as "categoryTotal")

359

```