Apache Flink Table API Scala Bridge provides seamless integration between Flink's Table/SQL API and Scala-specific DataStream operations for stream processing applications.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-api-scala-bridge_2-12@1.20.00
# Flink Table API Scala Bridge
1
2
## Overview
3
4
The Flink Table API Scala Bridge provides seamless integration between Apache Flink's Table/SQL API and the Scala-specific DataStream API. This bridge library enables developers to convert DataStreams to Tables and vice-versa while leveraging Scala's type system and functional programming paradigms.
5
6
**Key Features:**
7
- Bidirectional conversion between DataStream and Table APIs
8
- Support for both bounded and unbounded data processing
9
- Integration with Flink's SQL engine for complex queries
10
- Scala-idiomatic APIs with implicit conversions
11
- Event-time processing and watermark propagation
12
- Changelog stream processing for updating tables
13
14
**⚠️ Deprecation Notice:** All Flink Scala APIs are deprecated as of version 1.18.0 and will be removed in a future major version. Users should migrate to the Java APIs. See [FLIP-265](https://s.apache.org/flip-265) for details.
15
16
## Package Information
17
18
- **Package:** `org.apache.flink:flink-table-api-scala-bridge_2.12`
19
- **Version:** 1.20.2
20
- **Language:** Scala 2.12
21
- **License:** Apache-2.0
22
23
### Installation
24
25
Add to your `build.sbt`:
26
27
```scala
28
libraryDependencies += "org.apache.flink" %% "flink-table-api-scala-bridge" % "1.20.2"
29
```
30
31
Or in Maven `pom.xml`:
32
33
```xml
34
<dependency>
35
<groupId>org.apache.flink</groupId>
36
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
37
<version>1.20.2</version>
38
</dependency>
39
```
40
41
## Core Imports
42
43
```scala
44
// Essential imports for Table API and DataStream integration
45
import org.apache.flink.table.api._
46
import org.apache.flink.table.api.bridge.scala._
47
import org.apache.flink.streaming.api.scala._
48
49
// Common type imports
50
import org.apache.flink.types.Row
51
import org.apache.flink.table.types.DataType
52
import org.apache.flink.table.connector.ChangelogMode
53
```
54
55
## Basic Usage
56
57
### Environment Setup
58
59
```scala
60
import org.apache.flink.streaming.api.scala._
61
import org.apache.flink.table.api.bridge.scala._
62
63
// Create execution environment
64
val env = StreamExecutionEnvironment.getExecutionEnvironment
65
val tableEnv = StreamTableEnvironment.create(env)
66
```
67
68
### DataStream to Table Conversion
69
70
```scala
71
case class User(name: String, age: Int)
72
73
val users: DataStream[User] = env.fromCollection(Seq(
74
User("Alice", 25),
75
User("Bob", 30)
76
))
77
78
// Convert DataStream to Table (automatic schema derivation)
79
val userTable: Table = tableEnv.fromDataStream(users)
80
81
// Or use implicit conversion
82
import org.apache.flink.table.api.bridge.scala._
83
val userTable2: Table = users.toTable(tableEnv)
84
```
85
86
### Table to DataStream Conversion
87
88
```scala
89
// Convert Table back to DataStream
90
val resultStream: DataStream[Row] = tableEnv.toDataStream(userTable)
91
92
// Or use implicit conversion
93
val resultStream2: DataStream[Row] = userTable.toDataStream
94
```
95
96
### SQL Operations
97
98
```scala
99
// Register table for SQL queries
100
tableEnv.createTemporaryView("users", userTable)
101
102
// Execute SQL query
103
val sqlResult: Table = tableEnv.sqlQuery(
104
"SELECT name, age FROM users WHERE age > 25"
105
)
106
107
val resultStream: DataStream[Row] = tableEnv.toDataStream(sqlResult)
108
```
109
110
## Architecture
111
112
The Flink Table API Scala Bridge consists of several key components:
113
114
- **StreamTableEnvironment**: Main entry point for table operations with DataStream integration
115
- **Conversion Utilities**: Classes providing DataStream ↔ Table conversion methods
116
- **Implicit Conversions**: Package-level implicits for seamless API integration
117
- **Statement Sets**: Batch execution of multiple table operations
118
- **Schema System**: Type-safe schema definitions and transformations
119
120
## Capabilities
121
122
### [Environment and Setup](./environment-setup.md)
123
Configure and create StreamTableEnvironment instances with various settings and execution modes.
124
125
```scala { .api }
126
object StreamTableEnvironment {
127
def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment
128
def create(executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings): StreamTableEnvironment
129
}
130
```
131
132
### [DataStream Integration](./datastream-integration.md)
133
Convert between DataStreams and Tables with automatic schema derivation and custom schema definitions.
134
135
```scala { .api }
136
trait StreamTableEnvironment {
137
def fromDataStream[T](dataStream: DataStream[T]): Table
138
def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table
139
def toDataStream(table: Table): DataStream[Row]
140
def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T]
141
}
142
```
143
144
### [Changelog Processing](./changelog-processing.md)
145
Handle updating tables and changelog streams for complex event processing scenarios.
146
147
```scala { .api }
148
trait StreamTableEnvironment {
149
def fromChangelogStream(dataStream: DataStream[Row]): Table
150
def fromChangelogStream(dataStream: DataStream[Row], schema: Schema): Table
151
def toChangelogStream(table: Table): DataStream[Row]
152
def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row]
153
}
154
```
155
156
### [Table Operations](./table-operations.md)
157
Create, register, and manage tables and views within the table environment.
158
159
```scala { .api }
160
trait StreamTableEnvironment {
161
def createTemporaryView[T](path: String, dataStream: DataStream[T]): Unit
162
def createTemporaryView[T](path: String, dataStream: DataStream[T], schema: Schema): Unit
163
}
164
```
165
166
### [Statement Sets](./statement-sets.md)
167
Batch multiple table operations together for optimized execution planning.
168
169
```scala { .api }
170
trait StreamTableEnvironment {
171
def createStatementSet(): StreamStatementSet
172
}
173
174
trait StreamStatementSet {
175
def addInsert(targetPath: String, table: Table): StreamStatementSet
176
def addInsertSql(statement: String): StreamStatementSet
177
def execute(): TableResult
178
}
179
```
180
181
### [Implicit Conversions](./implicit-conversions.md)
182
Package-level implicit conversions for seamless integration between DataStream and Table APIs.
183
184
```scala { .api }
185
package object scala {
186
implicit def tableConversions(table: Table): TableConversions
187
implicit def dataStreamConversions[T](dataStream: DataStream[T]): DataStreamConversions[T]
188
implicit def tableToChangelogDataStream(table: Table): DataStream[Row]
189
}
190
```
191
192
## Common Types
193
194
```scala { .api }
195
// Core Flink types
196
import org.apache.flink.streaming.api.scala.DataStream
197
import org.apache.flink.table.api.Table
198
import org.apache.flink.types.Row
199
200
// Schema and type system
201
import org.apache.flink.table.api.Schema
202
import org.apache.flink.table.types.DataType
203
import org.apache.flink.table.types.AbstractDataType
204
205
// Execution environment
206
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
207
import org.apache.flink.table.api.EnvironmentSettings
208
209
// Changelog processing
210
import org.apache.flink.table.connector.ChangelogMode
211
import org.apache.flink.types.RowKind
212
```
213
214
## Error Handling
215
216
The bridge API throws several types of exceptions:
217
218
- **ValidationException**: Invalid operations or incompatible types
219
- **TableException**: General table processing errors
220
- **UnsupportedOperationException**: Operations not supported in streaming mode
221
222
```scala
223
try {
224
val table = tableEnv.fromDataStream(dataStream)
225
val result = tableEnv.toDataStream(table)
226
} catch {
227
case e: ValidationException => // Handle validation errors
228
case e: TableException => // Handle table processing errors
229
}
230
```
231
232
## Migration Guide
233
234
Since this API is deprecated, consider migrating to the Java Table API:
235
236
```java
237
// Java equivalent
238
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
239
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
240
241
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
242
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
243
```