0
# DataStream Integration
1
2
## Overview
3
4
The DataStream integration capabilities enable seamless conversion between Flink's DataStream API and Table API. This allows developers to leverage both stream processing paradigms within the same application.
5
6
## Core API
7
8
### DataStream to Table Conversion
9
10
```scala { .api }
11
trait StreamTableEnvironment {
12
def fromDataStream[T](dataStream: DataStream[T]): Table
13
def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table
14
}
15
```
16
17
### Table to DataStream Conversion
18
19
```scala { .api }
20
trait StreamTableEnvironment {
21
def toDataStream(table: Table): DataStream[Row]
22
def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T]
23
def toDataStream[T](table: Table, targetDataType: AbstractDataType[_]): DataStream[T]
24
}
25
```
26
27
### Conversion Utility Classes
28
29
```scala { .api }
30
class DataStreamConversions[T](dataStream: DataStream[T]) {
31
def toTable(tableEnv: StreamTableEnvironment): Table
32
def toTable(tableEnv: StreamTableEnvironment, schema: Schema): Table
33
}
34
35
class TableConversions(table: Table) {
36
def toDataStream: DataStream[Row]
37
def toDataStream[T](targetClass: Class[T]): DataStream[T]
38
def toDataStream[T](targetDataType: AbstractDataType[_]): DataStream[T]
39
}
40
```
41
42
## DataStream to Table Conversion
43
44
### Automatic Schema Derivation
45
46
```scala
47
case class User(name: String, age: Int, email: String)
48
49
val userStream: DataStream[User] = env.fromCollection(Seq(
50
User("Alice", 25, "alice@example.com"),
51
User("Bob", 30, "bob@example.com")
52
))
53
54
// Automatic schema derivation from case class
55
val userTable: Table = tableEnv.fromDataStream(userStream)
56
```
57
58
### Custom Schema Definition
59
60
```scala
61
import org.apache.flink.table.api.Schema
62
import org.apache.flink.table.types.DataTypes
63
64
val schema = Schema.newBuilder()
65
.column("user_name", DataTypes.STRING())
66
.column("user_age", DataTypes.INT())
67
.column("user_email", DataTypes.STRING())
68
.column("process_time", DataTypes.TIMESTAMP_LTZ(3))
69
.columnByExpression("process_time", "PROCTIME()")
70
.build()
71
72
val userTable: Table = tableEnv.fromDataStream(userStream, schema)
73
```
74
75
### Event-Time Processing
76
77
```scala
78
val schema = Schema.newBuilder()
79
.column("name", DataTypes.STRING())
80
.column("age", DataTypes.INT())
81
.column("timestamp", DataTypes.TIMESTAMP_LTZ(3))
82
.columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
83
.watermark("rowtime", "SOURCE_WATERMARK()")
84
.build()
85
86
val eventTimeTable: Table = tableEnv.fromDataStream(timestampedStream, schema)
87
```
88
89
### Using Implicit Conversions
90
91
```scala
92
import org.apache.flink.table.api.bridge.scala._
93
94
// Implicit conversion from DataStream to DataStreamConversions
95
val userTable: Table = userStream.toTable(tableEnv)
96
97
// With custom schema
98
val userTableWithSchema: Table = userStream.toTable(tableEnv, schema)
99
```
100
101
## Table to DataStream Conversion
102
103
### Basic Conversion to Row
104
105
```scala
106
val resultTable: Table = tableEnv.sqlQuery("SELECT name, age FROM users WHERE age > 25")
107
108
// Convert to DataStream[Row]
109
val resultStream: DataStream[Row] = tableEnv.toDataStream(resultTable)
110
111
// Using implicit conversion
112
val resultStream2: DataStream[Row] = resultTable.toDataStream
113
```
114
115
### Conversion to Typed DataStream
116
117
```scala
118
case class UserResult(name: String, age: Int)
119
120
// Convert to specific type
121
val typedStream: DataStream[UserResult] = tableEnv.toDataStream(resultTable, classOf[UserResult])
122
123
// Using TableConversions
124
val typedStream2: DataStream[UserResult] = resultTable.toDataStream(classOf[UserResult])
125
```
126
127
### Custom Data Type Conversion
128
129
```scala
130
import org.apache.flink.table.types.DataTypes
131
132
val targetDataType = DataTypes.ROW(
133
DataTypes.FIELD("name", DataTypes.STRING()),
134
DataTypes.FIELD("age", DataTypes.INT()),
135
DataTypes.FIELD("category", DataTypes.STRING())
136
)
137
138
val customStream: DataStream[Row] = tableEnv.toDataStream(resultTable, targetDataType)
139
```
140
141
## Advanced Schema Handling
142
143
### Complex Types
144
145
```scala
146
case class Address(street: String, city: String)
147
case class UserWithAddress(name: String, age: Int, address: Address)
148
149
val complexStream: DataStream[UserWithAddress] = // ... source
150
151
// Flink automatically flattens nested structures
152
val complexTable: Table = tableEnv.fromDataStream(complexStream)
153
// Results in columns: name, age, address.street, address.city
154
```
155
156
### Array and Map Types
157
158
```scala
159
case class UserWithTags(name: String, tags: Array[String], metadata: Map[String, String])
160
161
val userWithTagsStream: DataStream[UserWithTags] = // ... source
162
val tagsTable: Table = tableEnv.fromDataStream(userWithTagsStream)
163
164
// Query array elements
165
val queryResult = tableEnv.sqlQuery("""
166
SELECT name, tags[1] as first_tag, metadata['category'] as category
167
FROM user_tags
168
""")
169
```
170
171
### Timestamp and Watermark Handling
172
173
```scala
174
// DataStream with timestamps and watermarks
175
val timestampedStream: DataStream[User] = userStream
176
.assignTimestampsAndWatermarks(
177
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
178
.withTimestampAssigner((user, _) => user.timestamp)
179
)
180
181
// Schema that propagates watermarks
182
val schema = Schema.newBuilder()
183
.columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
184
.watermark("rowtime", "SOURCE_WATERMARK()")
185
.build()
186
187
val eventTimeTable: Table = tableEnv.fromDataStream(timestampedStream, schema)
188
```
189
190
## Type System Integration
191
192
### Supported Scala Types
193
194
The bridge supports conversion of these Scala types:
195
196
- **Primitive types**: Int, Long, Float, Double, Boolean, String
197
- **Case classes**: Automatically mapped to Row types
198
- **Collections**: Array, List, Map (converted to Flink array/map types)
199
- **Option types**: Handled as nullable fields
200
- **Timestamps**: java.time.LocalDateTime, java.sql.Timestamp
201
202
### Type Information Requirements
203
204
```scala
205
import org.apache.flink.api.common.typeinfo.TypeInformation
206
import org.apache.flink.api.scala._
207
208
case class CustomType(id: Long, data: String)
209
210
// Ensure TypeInformation is available
211
val customStream: DataStream[CustomType] = env.fromCollection(data)
212
val customTable: Table = tableEnv.fromDataStream(customStream)
213
```
214
215
## Error Handling
216
217
Common exceptions and their causes:
218
219
```scala
220
try {
221
val table = tableEnv.fromDataStream(dataStream)
222
} catch {
223
case e: ValidationException =>
224
// Schema validation failed or unsupported type
225
case e: TableException =>
226
// Table creation or conversion error
227
}
228
```
229
230
## Performance Considerations
231
232
1. **Schema Caching**: Reuse Schema objects when possible
233
2. **Type Conversion**: Direct type conversions are more efficient than Row conversions
234
3. **Memory Usage**: Large case classes may impact memory usage
235
4. **Serialization**: Ensure custom types are serializable
236
237
## Best Practices
238
239
1. **Use Case Classes**: Leverage Scala case classes for type safety
240
2. **Define Schemas Explicitly**: For production use, explicitly define schemas
241
3. **Handle Time Correctly**: Use proper time attributes for event-time processing
242
4. **Validate Types**: Ensure all types are supported by Flink's type system
243
5. **Consider Performance**: Choose appropriate conversion methods based on use case