0
# Table Conversions
1
2
The TableConversions class provides utilities for converting Tables back to DataStreams with support for different output modes, type specifications, and changelog handling. It serves as a fluent wrapper around Table instances.
3
4
## Capabilities
5
6
### TableConversions Construction
7
8
Wrapper class for Table conversion operations.
9
10
```scala { .api }
11
/**
12
* Creates conversion utilities for a Table
13
* @param table The Table to provide conversion methods for
14
*/
15
class TableConversions(table: Table)
16
```
17
18
This class is typically accessed through implicit conversions from the package object:
19
20
```scala
21
import org.apache.flink.table.api.bridge.scala._
22
23
val table = tableEnv.fromDataStream(dataStream)
24
// table now has conversion methods available via implicit conversion
25
val resultStream = table.toDataStream
26
```
27
28
### DataStream Conversion Methods
29
30
Convert Tables to DataStreams for insert-only operations with different type specifications.
31
32
```scala { .api }
33
/**
34
* Convert insert-only Table to DataStream of Row
35
* @return DataStream containing Row elements
36
*/
37
def toDataStream: DataStream[Row]
38
39
/**
40
* Convert insert-only Table to DataStream of specified class
41
* @param targetClass Target class for the DataStream elements
42
* @return DataStream containing elements of the specified class
43
*/
44
def toDataStream[T](targetClass: Class[T]): DataStream[T]
45
46
/**
47
* Convert insert-only Table to DataStream of specified data type
48
* @param targetDataType Target data type specification
49
* @return DataStream containing elements of the specified data type
50
*/
51
def toDataStream[T](targetDataType: AbstractDataType[_]): DataStream[T]
52
```
53
54
**Usage Examples:**
55
56
```scala
57
import org.apache.flink.table.api._
58
import org.apache.flink.table.api.bridge.scala._
59
import org.apache.flink.streaming.api.scala._
60
61
val env = StreamExecutionEnvironment.getExecutionEnvironment
62
val tableEnv = StreamTableEnvironment.create(env)
63
64
// Create a table
65
val table = tableEnv.fromDataStream(env.fromElements(("Alice", 25), ("Bob", 30)))
66
67
// Convert to Row DataStream
68
val rowStream = table.toDataStream
69
70
// Convert to typed DataStream using case class
71
case class Person(name: String, age: Int)
72
val personStream = table.toDataStream(classOf[Person])
73
74
// Convert using data type specification
75
val typedStream = table.toDataStream[Person](DataTypes.STRUCTURED(classOf[Person]))
76
```
77
78
### Changelog Stream Conversion
79
80
Convert Tables to changelog DataStreams for handling streams with insert/update/delete operations.
81
82
```scala { .api }
83
/**
84
* Convert Table to changelog DataStream
85
* @return DataStream containing Row elements with changelog information
86
*/
87
def toChangelogStream: DataStream[Row]
88
89
/**
90
* Convert Table to changelog DataStream with custom schema
91
* @param targetSchema Custom schema for the output stream
92
* @return DataStream containing Row elements with changelog information
93
*/
94
def toChangelogStream(targetSchema: Schema): DataStream[Row]
95
96
/**
97
* Convert Table to changelog DataStream with custom schema and changelog mode
98
* @param targetSchema Custom schema for the output stream
99
* @param changelogMode Changelog mode configuration
100
* @return DataStream containing Row elements with changelog information
101
*/
102
def toChangelogStream(targetSchema: Schema, changelogMode: ChangelogMode): DataStream[Row]
103
```
104
105
**Usage Examples:**
106
107
```scala
108
import org.apache.flink.types.{Row, RowKind}
109
110
// Convert to basic changelog stream
111
val changelogStream = table.toChangelogStream
112
113
// Process changelog stream
114
changelogStream.map { row =>
115
val kind = row.getKind
116
val name = row.getField(0).toString
117
val age = row.getField(1).asInstanceOf[Int]
118
119
kind match {
120
case RowKind.INSERT => s"Added: $name, $age"
121
case RowKind.UPDATE_AFTER => s"Updated: $name, $age"
122
case RowKind.DELETE => s"Deleted: $name, $age"
123
case _ => s"Other: $name, $age"
124
}
125
}
126
127
// Convert with custom schema
128
val targetSchema = Schema.newBuilder()
129
.column("name", DataTypes.STRING())
130
.column("age", DataTypes.INT())
131
.build()
132
val changelogStreamWithSchema = table.toChangelogStream(targetSchema)
133
134
// Convert with custom changelog mode
135
val changelogMode = ChangelogMode.insertOnly()
136
val changelogStreamWithMode = table.toChangelogStream(targetSchema, changelogMode)
137
```
138
139
### Legacy Methods
140
141
Deprecated methods that should not be used in new code:
142
143
```scala { .api }
144
/**
145
* Convert to append-only stream (deprecated)
146
* @deprecated Use toDataStream instead
147
*/
148
@deprecated("Use toDataStream", "1.18.0")
149
def toAppendStream[T: TypeInformation]: DataStream[T]
150
151
/**
152
* Convert to retract stream (deprecated)
153
* @deprecated Use toChangelogStream instead
154
*/
155
@deprecated("Use toChangelogStream", "1.18.0")
156
def toRetractStream[T: TypeInformation]: DataStream[(Boolean, T)]
157
```
158
159
**Migration Examples:**
160
161
```scala
162
// Legacy approach (deprecated)
163
val appendStream = table.toAppendStream[Person]
164
val retractStream = table.toRetractStream[Person]
165
166
// Preferred approach
167
val insertOnlyStream = table.toDataStream(classOf[Person])
168
val changelogStream = table.toChangelogStream
169
170
// Process changelog stream to get retract-style tuples if needed
171
val retractStyleStream = changelogStream.map { row =>
172
val isInsert = row.getKind == RowKind.INSERT || row.getKind == RowKind.UPDATE_AFTER
173
val person = Person(row.getField(0).toString, row.getField(1).asInstanceOf[Int])
174
(isInsert, person)
175
}
176
```
177
178
## Insert-Only vs Changelog Streams
179
180
### Insert-Only Streams
181
182
Use `toDataStream` methods when:
183
- Your table only contains INSERT operations
184
- You're working with bounded data or append-only streams
185
- You don't need to handle updates or deletes
186
187
```scala
188
// Good for append-only scenarios
189
val insertOnlyTable = tableEnv.sqlQuery("SELECT name, age FROM users WHERE age > 18")
190
val resultStream = insertOnlyTable.toDataStream(classOf[Person])
191
```
192
193
### Changelog Streams
194
195
Use `toChangelogStream` methods when:
196
- Your table may contain UPDATE or DELETE operations
197
- You're working with aggregations or joins that produce updates
198
- You need to handle the full lifecycle of data changes
199
200
```scala
201
// Good for aggregation scenarios
202
val aggregatedTable = tableEnv.sqlQuery("SELECT name, COUNT(*) as count FROM events GROUP BY name")
203
val changelogStream = aggregatedTable.toChangelogStream
204
205
changelogStream.map { row =>
206
row.getKind match {
207
case RowKind.INSERT => s"New group: ${row.getField(0)}"
208
case RowKind.UPDATE_AFTER => s"Updated count for: ${row.getField(0)}"
209
case _ => s"Other change for: ${row.getField(0)}"
210
}
211
}
212
```
213
214
## Type Conversion Requirements
215
216
For successful Table to DataStream conversion:
217
218
- **Target types** must be supported by Flink's type system
219
- **Schema compatibility** between table and target type is required
220
- **Changelog mode compatibility** must match the table's characteristics
221
- **Null handling** should be considered for nullable fields
222
223
Common target types include:
224
- **Row**: Universal type that can represent any table schema
225
- **Case classes**: Type-safe conversion for structured data
226
- **Tuples**: Simple conversion for tuple-like data
227
- **POJOs**: Java bean-style classes with getters/setters