0
# Implicit Conversions
1
2
The Flink Table API Scala Bridge provides Scala-idiomatic implicit conversions for seamless integration between DataStream and Table APIs. These conversions are available through the package object and enable fluent, type-safe operations.
3
4
## Available Implicit Conversions
5
6
All implicit conversions are provided through the `org.apache.flink.table.api.bridge.scala` package object and are automatically available when you import the package.
7
8
### Import Statement
9
10
```scala
11
import org.apache.flink.table.api.bridge.scala._
12
```
13
14
This import makes all implicit conversions available in scope.
15
16
## Capabilities
17
18
### Table to TableConversions
19
20
Implicit conversion from Table to TableConversions for fluent DataStream conversion methods.
21
22
```scala { .api }
23
/**
24
* Conversions from Table to DataStream
25
* @param table The table to provide conversion methods for
26
* @return TableConversions wrapper with conversion methods
27
*/
28
implicit def tableConversions(table: Table): TableConversions
29
```
30
31
**Usage Example:**
32
33
```scala
34
import org.apache.flink.table.api._
35
import org.apache.flink.table.api.bridge.scala._
36
37
val table = tableEnv.fromDataStream(dataStream)
38
39
// These methods are available via implicit conversion:
40
val rowStream = table.toDataStream // Returns DataStream[Row]
41
val changelogStream = table.toChangelogStream // Returns DataStream[Row]
42
43
// Type-safe conversion
44
case class Person(name: String, age: Int)
45
val personStream = table.toDataStream(classOf[Person])
46
```
47
48
### Table to DataStream Direct Conversion
49
50
Direct implicit conversion from Table to changelog DataStream for the most common use case.
51
52
```scala { .api }
53
/**
54
* Conversions from Table to DataStream of changelog entries
55
* Provides direct conversion to changelog DataStream for convenience
56
* @param table The table to convert
57
* @return DataStream[Row] containing changelog entries
58
*/
59
implicit def tableToChangelogDataStream(table: Table): DataStream[Row]
60
```
61
62
**Usage Example:**
63
64
```scala
65
import org.apache.flink.table.api.bridge.scala._
66
67
val table = tableEnv.sqlQuery("SELECT name, COUNT(*) FROM events GROUP BY name")
68
69
// Direct implicit conversion to changelog stream
70
val changelogStream: DataStream[Row] = table
71
72
// Process the changelog stream
73
changelogStream.map { row =>
74
s"${row.getKind}: ${row.getField(0)} -> ${row.getField(1)}"
75
}
76
```
77
78
**Important Note:** This conversion only works with Tables that are part of a Scala StreamTableEnvironment. It will throw a ValidationException if used with tables from other environments.
79
80
### DataStream to DataStreamConversions
81
82
Implicit conversion from DataStream to DataStreamConversions for fluent Table conversion methods.
83
84
```scala { .api }
85
/**
86
* Conversions from DataStream to Table
87
* @param set The DataStream to provide conversion methods for
88
* @return DataStreamConversions wrapper with conversion methods
89
*/
90
implicit def dataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T]
91
```
92
93
**Usage Example:**
94
95
```scala
96
import org.apache.flink.streaming.api.scala._
97
import org.apache.flink.table.api.bridge.scala._
98
99
val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))
100
101
// These methods are available via implicit conversion:
102
val table = dataStream.toTable(tableEnv)
103
val tableWithSchema = dataStream.toTable(tableEnv, schema)
104
dataStream.createTemporaryView(tableEnv, "my_view")
105
106
// For changelog streams
107
val changelogStream = env.fromElements(
108
Row.of(RowKind.INSERT, "Alice", Integer.valueOf(25))
109
)
110
val changelogTable = changelogStream.toChangelogTable(tableEnv)
111
```
112
113
## Complete Usage Examples
114
115
### Basic Conversion Example
116
117
```scala
118
import org.apache.flink.table.api._
119
import org.apache.flink.table.api.bridge.scala._
120
import org.apache.flink.streaming.api.scala._
121
122
val env = StreamExecutionEnvironment.getExecutionEnvironment
123
val tableEnv = StreamTableEnvironment.create(env)
124
125
// Create DataStream
126
val dataStream = env.fromElements(
127
("Alice", 25, "Engineer"),
128
("Bob", 30, "Manager"),
129
("Charlie", 35, "Developer")
130
)
131
132
// Convert to Table using implicit conversion
133
val table = dataStream.toTable(tableEnv)
134
135
// Apply table operations
136
val filteredTable = table.filter($"_2" > 28) // age > 28
137
138
// Convert back to DataStream using implicit conversion
139
val resultStream = filteredTable.toDataStream
140
141
// Or direct implicit conversion to changelog stream
142
val changelogResult: DataStream[Row] = filteredTable
143
144
resultStream.print("Results")
145
env.execute("Implicit Conversions Example")
146
```
147
148
### Advanced Schema Example
149
150
```scala
151
import org.apache.flink.table.api._
152
import org.apache.flink.table.api.bridge.scala._
153
import org.apache.flink.streaming.api.scala._
154
155
val env = StreamExecutionEnvironment.getExecutionEnvironment
156
val tableEnv = StreamTableEnvironment.create(env)
157
158
case class Employee(name: String, age: Int, department: String)
159
160
val dataStream = env.fromElements(
161
Employee("Alice", 25, "Engineering"),
162
Employee("Bob", 30, "Marketing"),
163
Employee("Charlie", 35, "Engineering")
164
)
165
166
// Define custom schema
167
val schema = Schema.newBuilder()
168
.column("name", DataTypes.STRING())
169
.column("age", DataTypes.INT())
170
.column("department", DataTypes.STRING())
171
.build()
172
173
// Convert with schema using implicit conversion
174
val table = dataStream.toTable(tableEnv, schema)
175
176
// Create temporary view using implicit conversion
177
dataStream.createTemporaryView(tableEnv, "employees", schema)
178
179
// Query the view
180
val engineeringTable = tableEnv.sqlQuery("""
181
SELECT name, age
182
FROM employees
183
WHERE department = 'Engineering' AND age > 30
184
""")
185
186
// Convert back to typed DataStream using implicit conversion
187
val engineeringStream = engineeringTable.toDataStream(classOf[Employee])
188
189
engineeringStream.print("Engineering Results")
190
env.execute("Schema Example")
191
```
192
193
### Changelog Processing Example
194
195
```scala
196
import org.apache.flink.table.api._
197
import org.apache.flink.table.api.bridge.scala._
198
import org.apache.flink.streaming.api.scala._
199
import org.apache.flink.types.{Row, RowKind}
200
201
val env = StreamExecutionEnvironment.getExecutionEnvironment
202
val tableEnv = StreamTableEnvironment.create(env)
203
204
// Create changelog stream
205
val changelogStream = env.fromElements(
206
Row.of(RowKind.INSERT, "Alice", Integer.valueOf(25)),
207
Row.of(RowKind.UPDATE_AFTER, "Alice", Integer.valueOf(26)),
208
Row.of(RowKind.DELETE, "Bob", Integer.valueOf(30)),
209
Row.of(RowKind.INSERT, "Charlie", Integer.valueOf(35))
210
)
211
212
// Convert to table using implicit conversion
213
val changelogTable = changelogStream.toChangelogTable(tableEnv)
214
215
// Apply aggregation (will produce more changelog events)
216
val aggregatedTable = changelogTable
217
.groupBy($"f1" >= 30) // group by age >= 30
218
.select($"f1" >= 30 as "age_group", $"f0".count() as "count")
219
220
// Convert back to changelog stream using direct implicit conversion
221
val resultChangelogStream: DataStream[Row] = aggregatedTable
222
223
// Process changelog events
224
resultChangelogStream.map { row =>
225
val kind = row.getKind
226
val ageGroup = row.getField(0).asInstanceOf[Boolean]
227
val count = row.getField(1).asInstanceOf[Long]
228
229
val group = if (ageGroup) "30+" else "under 30"
230
s"$kind: $group has $count people"
231
}.print("Changelog Results")
232
233
env.execute("Changelog Example")
234
```
235
236
## Error Handling
237
238
### ValidationException for Invalid Conversions
239
240
The direct `tableToChangelogDataStream` implicit conversion performs validation:
241
242
```scala
243
try {
244
val invalidTable = batchTableEnv.fromValues(1, 2, 3) // Not from StreamTableEnvironment
245
val stream: DataStream[Row] = invalidTable // This will throw ValidationException
246
} catch {
247
case e: ValidationException =>
248
println(s"Cannot convert table: ${e.getMessage}")
249
}
250
```
251
252
### Type Safety
253
254
Implicit conversions maintain type safety:
255
256
```scala
257
val dataStream: DataStream[(String, Int)] = env.fromElements(("Alice", 25))
258
val table = dataStream.toTable(tableEnv) // Type information preserved
259
260
// This will work:
261
case class Person(name: String, age: Int)
262
val personStream = table.toDataStream(classOf[Person])
263
264
// This will fail at runtime if types don't match:
265
case class WrongType(id: Int, value: String)
266
// val wrongStream = table.toDataStream(classOf[WrongType]) // Runtime error
267
```
268
269
## Best Practices
270
271
1. **Always import the package**: `import org.apache.flink.table.api.bridge.scala._`
272
2. **Use type-safe conversions**: Prefer strongly-typed conversions when possible
273
3. **Handle changelog appropriately**: Use changelog streams for operations that produce updates
274
4. **Schema compatibility**: Ensure target types match table schemas
275
5. **Environment consistency**: Keep tables and streams within the same environment type