0
# DataStream Conversions
1
2
The DataStreamConversions class provides utilities for converting DataStreams to Tables with various schema and conversion options. It serves as a fluent wrapper around DataStream instances to enable easy table conversion.
3
4
## Capabilities
5
6
### DataStreamConversions Construction
7
8
Wrapper class for DataStream conversion operations.
9
10
```scala { .api }
11
/**
12
* Creates conversion utilities for a DataStream
13
* @param dataStream The DataStream to provide conversion methods for
14
*/
15
class DataStreamConversions[T](dataStream: DataStream[T])
16
```
17
18
This class is typically accessed through implicit conversions from the package object:
19
20
```scala
21
import org.apache.flink.table.api.bridge.scala._
22
23
val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))
24
// dataStream now has conversion methods available via implicit conversion
25
val table = dataStream.toTable(tableEnv)
26
```
27
28
### Table Conversion Methods
29
30
Convert DataStreams to Tables with automatic or custom schema derivation.
31
32
```scala { .api }
33
/**
34
* Convert to Table with auto-derived schema
35
* @param tableEnv The StreamTableEnvironment to use for conversion
36
* @return Table representation of the DataStream
37
*/
38
def toTable(tableEnv: StreamTableEnvironment): Table
39
40
/**
41
* Convert to Table with custom schema
42
* @param tableEnv The StreamTableEnvironment to use for conversion
43
* @param schema Custom schema definition
44
* @return Table representation of the DataStream
45
*/
46
def toTable(tableEnv: StreamTableEnvironment, schema: Schema): Table
47
```
48
49
**Usage Examples:**
50
51
```scala
52
import org.apache.flink.table.api._
53
import org.apache.flink.table.api.bridge.scala._
54
import org.apache.flink.streaming.api.scala._
55
56
val env = StreamExecutionEnvironment.getExecutionEnvironment
57
val tableEnv = StreamTableEnvironment.create(env)
58
59
// Auto-derived schema
60
val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))
61
val table = dataStream.toTable(tableEnv)
62
63
// Custom schema
64
val schema = Schema.newBuilder()
65
.column("name", DataTypes.STRING())
66
.column("age", DataTypes.INT())
67
.build()
68
val tableWithSchema = dataStream.toTable(tableEnv, schema)
69
```
70
71
### Changelog Table Conversion
72
73
Convert changelog DataStreams to Tables for handling streams with insert/update/delete operations.
74
75
```scala { .api }
76
/**
77
* Convert changelog DataStream to Table
78
* @param tableEnv The StreamTableEnvironment to use for conversion
79
* @return Table representation of the changelog stream
80
*/
81
def toChangelogTable(tableEnv: StreamTableEnvironment): Table
82
83
/**
84
* Convert changelog DataStream to Table with custom schema
85
* @param tableEnv The StreamTableEnvironment to use for conversion
86
* @param schema Custom schema definition
87
* @return Table representation of the changelog stream
88
*/
89
def toChangelogTable(tableEnv: StreamTableEnvironment, schema: Schema): Table
90
91
/**
92
* Convert changelog DataStream to Table with custom schema and changelog mode
93
* @param tableEnv The StreamTableEnvironment to use for conversion
94
* @param schema Custom schema definition
95
* @param changelogMode Changelog mode configuration
96
* @return Table representation of the changelog stream
97
*/
98
def toChangelogTable(tableEnv: StreamTableEnvironment, schema: Schema, changelogMode: ChangelogMode): Table
99
```
100
101
**Usage Examples:**
102
103
```scala
104
import org.apache.flink.types.{Row, RowKind}
105
106
// Changelog stream with Row elements
107
val changelogStream = env.fromElements(
108
Row.of(RowKind.INSERT, "Alice", Integer.valueOf(25)),
109
Row.of(RowKind.UPDATE_AFTER, "Alice", Integer.valueOf(26)),
110
Row.of(RowKind.DELETE, "Bob", Integer.valueOf(30))
111
)
112
113
// Convert to changelog table
114
val changelogTable = changelogStream.toChangelogTable(tableEnv)
115
116
// With custom schema
117
val schema = Schema.newBuilder()
118
.column("name", DataTypes.STRING())
119
.column("age", DataTypes.INT())
120
.build()
121
val changelogTableWithSchema = changelogStream.toChangelogTable(tableEnv, schema)
122
123
// With custom changelog mode
124
val changelogMode = ChangelogMode.insertOnly()
125
val changelogTableWithMode = changelogStream.toChangelogTable(tableEnv, schema, changelogMode)
126
```
127
128
### Temporary View Creation
129
130
Create temporary views from DataStreams for use in SQL queries.
131
132
```scala { .api }
133
/**
134
* Create temporary view from the DataStream
135
* @param tableEnv The StreamTableEnvironment to use for view creation
136
* @param path The view path/name
137
*/
138
def createTemporaryView(tableEnv: StreamTableEnvironment, path: String): Unit
139
140
/**
141
* Create temporary view from the DataStream with custom schema
142
* @param tableEnv The StreamTableEnvironment to use for view creation
143
* @param path The view path/name
144
* @param schema Custom schema definition
145
*/
146
def createTemporaryView(tableEnv: StreamTableEnvironment, path: String, schema: Schema): Unit
147
```
148
149
**Usage Examples:**
150
151
```scala
152
val dataStream = env.fromElements(("Alice", 25), ("Bob", 30))
153
154
// Create temporary view
155
dataStream.createTemporaryView(tableEnv, "users")
156
157
// Create temporary view with custom schema
158
val schema = Schema.newBuilder()
159
.column("name", DataTypes.STRING())
160
.column("age", DataTypes.INT())
161
.build()
162
dataStream.createTemporaryView(tableEnv, "users_with_schema", schema)
163
164
// Now you can query the views with SQL
165
val result = tableEnv.sqlQuery("SELECT * FROM users WHERE age > 25")
166
```
167
168
### Legacy Methods
169
170
Deprecated methods that should not be used in new code:
171
172
```scala { .api }
173
/**
174
* Convert with field expressions (legacy)
175
* @deprecated Use toTable with Schema instead
176
*/
177
@deprecated("Use toTable with Schema", "1.18.0")
178
def toTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table
179
```
180
181
The legacy method allows specifying field expressions directly, but the new approach using Schema is preferred:
182
183
```scala
184
// Legacy approach (deprecated)
185
val legacyTable = dataStream.toTable(tableEnv, $"name", $"age")
186
187
// Preferred approach
188
val schema = Schema.newBuilder()
189
.column("name", DataTypes.STRING())
190
.column("age", DataTypes.INT())
191
.build()
192
val preferredTable = dataStream.toTable(tableEnv, schema)
193
```
194
195
## Type Requirements
196
197
For DataStreamConversions to work properly, the DataStream element type `T` must be one of:
198
199
- **Scala case classes** with public fields
200
- **Scala Tuples** (up to Tuple22)
201
- **Row types** for changelog streams
202
- **POJOs** with public fields and default constructor
203
- **Basic types** (String, Int, Long, etc.)
204
205
The type information is automatically derived using Flink's TypeInformation system for Scala types.