Scala API for Apache Flink's Table & SQL ecosystem with type-safe bindings and comprehensive support for Scala-specific types
npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-api-scala_2-12@2.1.00
# Apache Flink Scala Table API
1
2
The Apache Flink Scala Table API provides Scala-specific bindings for Flink's Table & SQL ecosystem. It enables type-safe Scala programming with implicit conversions, operator overloading, and comprehensive support for Scala-specific types like case classes, Option, Either, and collections.
3
4
**⚠️ Deprecation Notice**: All Flink Scala APIs are deprecated as of Flink 1.18.0 (FLIP-265) and will be removed in a future major version. Users should migrate to the Java Table API while continuing to use Scala as their application language.
5
6
## Package Information
7
8
- **Package Name**: flink-table-api-scala_2.12
9
- **Package Type**: maven
10
- **Language**: Scala (2.12)
11
- **Group ID**: org.apache.flink
12
- **Installation**: Add to `pom.xml`:
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-table-api-scala_2.12</artifactId>
17
<version>2.1.0</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```scala
24
import org.apache.flink.table.api._
25
import org.apache.flink.table.api.bridge.scala._
26
```
27
28
The first import provides access to:
29
- Implicit conversions from Scala literals to Expressions
30
- Expression operations and operators
31
- Type information creation macros
32
- Comprehensive type system for Scala types
33
34
The bridge import adds:
35
- DataStream integration capabilities
36
- Conversion utilities between Table and DataStream
37
- StreamTableEnvironment for streaming applications
38
39
## Basic Usage
40
41
```scala
42
import org.apache.flink.table.api._
43
import org.apache.flink.streaming.api.scala._
44
import org.apache.flink.table.api.bridge.scala._
45
46
// Create execution environment and table environment
47
val env = StreamExecutionEnvironment.getExecutionEnvironment
48
val tEnv = StreamTableEnvironment.create(env)
49
50
// Create table from data with case class
51
case class Order(id: Int, product: String, amount: Double)
52
val orders = env.fromElements(
53
Order(1, "laptop", 999.99),
54
Order(2, "mouse", 29.99)
55
)
56
57
val ordersTable = tEnv.fromDataStream(orders)
58
59
// Use Scala-specific syntax with implicit conversions
60
val result = ordersTable
61
.select($"id", $"product", $"amount" * 1.1 as "amountWithTax")
62
.where($"amount" > 50.0)
63
64
// Convert back to DataStream
65
val resultStream = tEnv.toDataStream(result)
66
```
67
68
## Architecture
69
70
The Flink Scala Table API is built around several key components:
71
72
- **Implicit Conversions**: Seamless conversion between Scala values and Flink expressions using trait mixins
73
- **Expression DSL**: Rich domain-specific language with operator overloading for natural Scala syntax
74
- **Type System**: Macro-based type information generation for compile-time type safety
75
- **Scala Type Support**: Comprehensive support for Option, Either, Try, case classes, and collections
76
- **Serialization Layer**: Efficient Kryo-based serialization for all Scala types
77
- **Field Access**: Symbol-based ($'field) and string-based field references
78
79
## Capabilities
80
81
### Expression and Conversion System
82
83
Core expression creation and implicit conversions that enable natural Scala syntax for table operations. Includes literal conversions, field references, and operator overloading.
84
85
```scala { .api }
86
trait ImplicitExpressionConversions {
87
// Constants for window operations
88
implicit val UNBOUNDED_ROW: Expression
89
implicit val UNBOUNDED_RANGE: Expression
90
implicit val CURRENT_ROW: Expression
91
implicit val CURRENT_RANGE: Expression
92
93
// Field reference creation
94
def $(name: String): Expression
95
def col(name: String): Expression
96
97
// Literal creation
98
def lit(v: Any): Expression
99
def lit(v: Any, dataType: DataType): Expression
100
101
// Function calls
102
def call(path: String, params: Expression*): Expression
103
def call(function: UserDefinedFunction, params: Expression*): Expression
104
def call(function: Class[_ <: UserDefinedFunction], params: Expression*): Expression
105
def callSql(sqlExpression: String): Expression
106
}
107
108
// Implicit classes for expression operations
109
implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations
110
implicit class UnresolvedFieldExpression(s: Symbol) extends ImplicitExpressionOperations
111
```
112
113
[Expression and Conversions](./expressions.md)
114
115
### Type System and Type Information
116
117
Comprehensive type system providing TypeInformation for all Scala types including case classes, collections, Option, Either, and Try. Uses macros for automatic type inference.
118
119
```scala { .api }
120
object Types {
121
// Generic type creation
122
def of[T: TypeInformation]: TypeInformation[T]
123
124
// Scala-specific types
125
val UNIT: TypeInformation[Unit]
126
val NOTHING: TypeInformation[Nothing]
127
128
// Factory methods for complex types
129
def CASE_CLASS[T: TypeInformation]: TypeInformation[T]
130
def TUPLE[T: TypeInformation]: TypeInformation[T]
131
def OPTION[A, T <: Option[A]](valueType: TypeInformation[A]): TypeInformation[T]
132
def EITHER[A, B](leftType: TypeInformation[A], rightType: TypeInformation[B]): TypeInformation[Either[A, B]]
133
def TRY[A, T <: Try[A]](valueType: TypeInformation[A]): TypeInformation[T]
134
135
// Collection types
136
def TRAVERSABLE[T: TypeInformation]: TypeInformation[T]
137
def OBJECT_ARRAY[E <: AnyRef](elementType: TypeInformation[E]): TypeInformation[Array[E]]
138
}
139
140
trait ImplicitTypeConversions {
141
// Macro-based type inference
142
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
143
implicit val scalaNothingTypeInfo: TypeInformation[Nothing]
144
}
145
```
146
147
[Type System](./types.md)
148
149
### Expression Operations and Operators
150
151
Rich set of expression operations including arithmetic, comparison, logical operators, and specialized operations for table transformations. Provides natural Scala operator syntax.
152
153
```scala { .api }
154
trait ImplicitExpressionOperations {
155
// Field aliasing
156
def as(name: Symbol, extraNames: Symbol*): Expression
157
158
// Comparison operators
159
def >(other: Expression): Expression
160
def >=(other: Expression): Expression
161
def <(other: Expression): Expression
162
def <=(other: Expression): Expression
163
def ===(other: Expression): Expression
164
def !==(other: Expression): Expression
165
166
// Logical operators
167
def &&(other: Expression): Expression
168
def ||(other: Expression): Expression
169
def unary_!: Expression
170
171
// Arithmetic operators
172
def +(other: Expression): Expression
173
def -(other: Expression): Expression
174
def *(other: Expression): Expression
175
def /(other: Expression): Expression
176
def %(other: Expression): Expression
177
def unary_-: Expression
178
def unary_+: Expression
179
180
// Specialized operations
181
def to(other: Expression): Expression // Range for column selection
182
def ?(ifTrue: Expression, ifFalse: Expression): Expression // Ternary conditional
183
def rows: Expression // Row interval for windowing
184
}
185
```
186
187
[Expression Operations](./operations.md)
188
189
### Built-in Functions Library
190
191
Extensive collection of built-in functions for date/time operations, mathematical calculations, string manipulation, JSON processing, and utility operations.
192
193
```scala { .api }
194
// Date and time functions
195
def currentDate(): Expression
196
def currentTime(): Expression
197
def currentTimestamp(): Expression
198
def localTime(): Expression
199
def localTimestamp(): Expression
200
201
// Mathematical functions
202
def pi(): Expression
203
def e(): Expression
204
def rand(): Expression
205
def randInteger(bound: Expression): Expression
206
def atan2(y: Expression, x: Expression): Expression
207
def log(base: Expression, antilogarithm: Expression): Expression
208
def exp(base: Expression): Expression
209
def power(base: Expression, exponent: Expression): Expression
210
def mod(numeric1: Expression, numeric2: Expression): Expression
211
212
// String functions
213
def concat(string: Expression*): Expression
214
def concatWs(separator: Expression, string: Expression*): Expression
215
def uuid(): Expression
216
def upper(string: Expression): Expression
217
def lower(string: Expression): Expression
218
def length(string: Expression): Expression
219
def position(string: Expression, substring: Expression): Expression
220
221
// JSON functions
222
def jsonString(string: Expression): Expression
223
def jsonObject(keyValue: Expression*): Expression
224
def jsonArray(element: Expression*): Expression
225
def jsonValue(jsonString: Expression, path: Expression): Expression
226
def jsonQuery(jsonString: Expression, path: Expression): Expression
227
228
// Utility functions
229
def nullOf(dataType: DataType): Expression
230
def ifThenElse(condition: Expression, ifTrue: Expression, ifFalse: Expression): Expression
231
def coalesce(expr: Expression*): Expression
232
def isnull(expr: Expression): Expression
233
def isNotNull(expr: Expression): Expression
234
```
235
236
[Built-in Functions](./functions.md)
237
238
### Scala Type Information Classes
239
240
Specialized TypeInformation implementations for Scala-specific types providing efficient serialization and type handling for case classes, Option, Either, Try, and collections.
241
242
```scala { .api }
243
// Case class type information
244
abstract class CaseClassTypeInfo[T](
245
clazz: Class[T],
246
typeParamTypeInfos: Array[TypeInformation[_]],
247
fieldTypes: Seq[TypeInformation[_]],
248
fieldNames: Seq[String]
249
) extends TypeInformation[T] {
250
def getFieldNames: Array[String]
251
def getFieldIndex(fieldName: String): Int
252
def getFieldIndices(fields: Array[String]): Array[Int]
253
def getTypeAt[X](fieldExpression: String): TypeInformation[X]
254
def getFlatFields(): List[FlatFieldDescriptor]
255
}
256
257
// Option type information
258
class OptionTypeInfo[A, T <: Option[A]](elemTypeInfo: TypeInformation[A])
259
extends TypeInformation[T]
260
261
// Either type information
262
class EitherTypeInfo[A, B, T <: Either[A, B]](
263
clazz: Class[T],
264
leftTypeInfo: TypeInformation[A],
265
rightTypeInfo: TypeInformation[B]
266
) extends TypeInformation[T]
267
268
// Try type information
269
class TryTypeInfo[A, T <: Try[A]](valueType: TypeInformation[A])
270
extends TypeInformation[T]
271
272
// Collection type information
273
abstract class TraversableTypeInfo[T <: TraversableOnce[E], E](
274
clazz: Class[T],
275
elementTypeInfo: TypeInformation[E]
276
) extends TypeInformation[T]
277
278
// Enumeration type information
279
class EnumValueTypeInfo[E <: Enumeration](enum: E, clazz: Class[E#Value])
280
extends TypeInformation[E#Value] with AtomicType[E#Value]
281
```
282
283
[Type Information Classes](./typeinfo.md)
284
285
### DataStream Integration (Bridge API)
286
287
Integration layer between Table API and DataStream API enabling seamless conversion between Table and DataStream objects with streaming-specific operations.
288
289
```scala { .api }
290
trait StreamTableEnvironment extends TableEnvironment {
291
// DataStream to Table conversion
292
def fromDataStream[T](dataStream: DataStream[T]): Table
293
def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table
294
295
// Table to DataStream conversion
296
def toDataStream(table: Table): DataStream[Row]
297
def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T]
298
def toChangelogStream(table: Table): DataStream[Row]
299
}
300
301
// Implicit conversion classes
302
class TableConversions(table: Table)
303
class DataStreamConversions[T](dataStream: DataStream[T])
304
```
305
306
[DataStream Integration](./bridge.md)
307
308
### Runtime Serialization Support
309
310
Kryo-based serialization configuration and specialized serializers for efficient handling of all Scala types including collections, tuples, and special types in distributed Flink execution.
311
312
```scala { .api }
313
// Kryo configuration for Scala types
314
class FlinkScalaKryoInstantiator extends KryoInstantiator {
315
def newKryo: Kryo // Pre-configured with Scala serializers
316
}
317
318
// Specialized serializers for runtime efficiency
319
class CaseClassSerializer[T <: Product](/* parameters */) extends TypeSerializer[T]
320
class OptionSerializer[A](elementSerializer: TypeSerializer[A]) extends TypeSerializer[Option[A]]
321
class EitherSerializer[A, B](/* parameters */) extends TypeSerializer[Either[A, B]]
322
class TrySerializer[A](elementSerializer: TypeSerializer[A]) extends TypeSerializer[Try[A]]
323
class TraversableSerializer[T <: TraversableOnce[E], E](/* parameters */) extends TypeSerializer[T]
324
class EnumValueSerializer[E <: Enumeration](/* parameters */) extends TypeSerializer[E#Value]
325
```
326
327
[Runtime Serialization](./serialization.md)
328
329
## Migration Guide
330
331
Since all Scala APIs are deprecated, consider these migration approaches:
332
333
1. **Immediate**: Continue using Scala APIs with deprecation warnings
334
2. **Hybrid**: Use Java Table API with Scala DataStream API
335
3. **Full Migration**: Move to Java Table API entirely
336
337
The Java Table API provides equivalent functionality without Scala-specific syntax conveniences.
338
339
## Common Patterns
340
341
### Case Class Integration
342
```scala
343
case class User(id: Int, name: String, email: Option[String])
344
val userTable = tEnv.fromDataStream(users) // Automatic type inference
345
```
346
347
### Option Type Handling
348
```scala
349
val result = table.select($"email".isNotNull ? $"email" : lit("N/A") as "emailDisplay")
350
```
351
352
### Expression Chaining
353
```scala
354
val processed = table
355
.select($"amount" * 1.1 + $"tax" as "total")
356
.where($"total" > 100.0)
357
.groupBy($"category")
358
.select($"category", $"total".sum as "categoryTotal")
359
```