0
# DataStream Integration (Bridge API)
1
2
The bridge API provides seamless integration between Flink's Table API and DataStream API, enabling conversion between Table and DataStream objects and streaming-specific table operations.
3
4
## Core Classes
5
6
### StreamTableEnvironment
7
8
The main entry point for DataStream-Table integration, providing methods to convert between DataStream and Table objects.
9
10
```scala { .api }
11
trait StreamTableEnvironment extends TableEnvironment {
12
// DataStream to Table conversion
13
def fromDataStream[T](dataStream: DataStream[T]): Table
14
def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table
15
def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table
16
17
// Table to DataStream conversion
18
def toDataStream(table: Table): DataStream[Row]
19
def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T]
20
def toDataStream[T](table: Table, targetDataType: AbstractDataType[_]): DataStream[T]
21
22
// Changelog stream conversion
23
def toChangelogStream(table: Table): DataStream[Row]
24
def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row]
25
def toChangelogStream(
26
table: Table,
27
targetDataType: AbstractDataType[_],
28
changelogMode: ChangelogMode
29
): DataStream[Row]
30
}
31
```
32
33
### Factory Methods
34
35
```scala { .api }
36
object StreamTableEnvironment {
37
// Create from existing StreamExecutionEnvironment
38
def create(
39
executionEnvironment: StreamExecutionEnvironment
40
): StreamTableEnvironment
41
42
def create(
43
executionEnvironment: StreamExecutionEnvironment,
44
settings: EnvironmentSettings
45
): StreamTableEnvironment
46
47
def create(settings: EnvironmentSettings): StreamTableEnvironment
48
}
49
```
50
51
## Implicit Conversion Classes
52
53
### TableConversions
54
55
Provides implicit conversion methods for Table objects to DataStream.
56
57
```scala { .api }
58
class TableConversions(table: Table) {
59
/**
60
* Converts the Table to a DataStream of Row objects.
61
* Equivalent to StreamTableEnvironment.toDataStream(table)
62
*/
63
def toDataStream(): DataStream[Row]
64
65
/**
66
* Converts the Table to a typed DataStream.
67
* Equivalent to StreamTableEnvironment.toDataStream(table, targetClass)
68
*/
69
def toDataStream[T](targetClass: Class[T]): DataStream[T]
70
71
/**
72
* Converts the Table to a typed DataStream with specified data type.
73
* Equivalent to StreamTableEnvironment.toDataStream(table, targetDataType)
74
*/
75
def toDataStream[T](targetDataType: AbstractDataType[_]): DataStream[T]
76
77
/**
78
* Converts the Table to a changelog DataStream.
79
* Equivalent to StreamTableEnvironment.toChangelogStream(table)
80
*/
81
def toChangelogStream(): DataStream[Row]
82
83
/**
84
* Converts the Table to a changelog DataStream with schema.
85
* Equivalent to StreamTableEnvironment.toChangelogStream(table, targetSchema)
86
*/
87
def toChangelogStream(targetSchema: Schema): DataStream[Row]
88
89
/**
90
* Converts the Table to a changelog DataStream with data type and changelog mode.
91
*/
92
def toChangelogStream(
93
targetDataType: AbstractDataType[_],
94
changelogMode: ChangelogMode
95
): DataStream[Row]
96
}
97
```
98
99
### DataStreamConversions
100
101
Provides implicit conversion methods for DataStream objects to Table.
102
103
```scala { .api }
104
class DataStreamConversions[T](dataStream: DataStream[T]) {
105
/**
106
* Converts the DataStream to a Table.
107
* Equivalent to StreamTableEnvironment.fromDataStream(dataStream)
108
*/
109
def toTable()(implicit tEnv: StreamTableEnvironment): Table
110
111
/**
112
* Converts the DataStream to a Table with schema.
113
* Equivalent to StreamTableEnvironment.fromDataStream(dataStream, schema)
114
*/
115
def toTable(schema: Schema)(implicit tEnv: StreamTableEnvironment): Table
116
117
/**
118
* Converts the DataStream to a Table with field expressions.
119
* Equivalent to StreamTableEnvironment.fromDataStream(dataStream, fields)
120
*/
121
def toTable(fields: Expression*)(implicit tEnv: StreamTableEnvironment): Table
122
}
123
```
124
125
## Package Object Implicits
126
127
The `org.apache.flink.table.api.bridge.scala` package object provides automatic implicit conversions:
128
129
```scala { .api }
130
package object scala {
131
// Automatic Table to TableConversions
132
implicit def tableConversions(table: Table): TableConversions
133
134
// Automatic Table to DataStream[Row] conversion
135
implicit def tableToChangelogDataStream(table: Table): DataStream[Row]
136
137
// Automatic DataStream to DataStreamConversions
138
implicit def dataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T]
139
}
140
```
141
142
## Usage Examples
143
144
### Environment Setup
145
```scala
146
import org.apache.flink.streaming.api.scala._
147
import org.apache.flink.table.api._
148
import org.apache.flink.table.api.bridge.scala._
149
150
val env = StreamExecutionEnvironment.getExecutionEnvironment
151
val tableEnv = StreamTableEnvironment.create(env)
152
```
153
154
### DataStream to Table Conversion
155
```scala
156
case class Order(id: Int, product: String, amount: Double)
157
158
val orders: DataStream[Order] = env.fromElements(
159
Order(1, "laptop", 999.99),
160
Order(2, "mouse", 29.99)
161
)
162
163
// Direct conversion
164
val ordersTable = tableEnv.fromDataStream(orders)
165
166
// With schema
167
val ordersTableWithSchema = tableEnv.fromDataStream(
168
orders,
169
Schema.newBuilder()
170
.column("id", DataTypes.INT())
171
.column("product", DataTypes.STRING())
172
.column("amount", DataTypes.DOUBLE())
173
.build()
174
)
175
176
// Using implicit conversion
177
val ordersTableImplicit = orders.toTable()
178
```
179
180
### Table to DataStream Conversion
181
```scala
182
val processedTable = ordersTable
183
.select($"id", $"product", $"amount" * 1.1 as "amountWithTax")
184
.where($"amount" > 50.0)
185
186
// Direct conversion
187
val resultStream: DataStream[Row] = tableEnv.toDataStream(processedTable)
188
189
// Typed conversion
190
val typedStream: DataStream[Order] = tableEnv.toDataStream(processedTable, classOf[Order])
191
192
// Using implicit conversion
193
val implicitStream: DataStream[Row] = processedTable.toDataStream()
194
```
195
196
### Changelog Streams
197
```scala
198
// For tables with updates/deletes
199
val changelogStream = tableEnv.toChangelogStream(processedTable)
200
201
// Process changelog entries
202
changelogStream.process(new ProcessFunction[Row, String] {
203
override def processElement(
204
value: Row,
205
ctx: ProcessFunction[Row, String]#Context,
206
out: Collector[String]
207
): Unit = {
208
val rowKind = value.getKind
209
val data = value.toString
210
out.collect(s"$rowKind: $data")
211
}
212
})
213
```
214
215
## Integration Patterns
216
217
### Hybrid Processing Pipeline
218
```scala
219
// DataStream processing
220
val rawStream = env.addSource(new MySourceFunction())
221
val cleanedStream = rawStream.filter(_.isValid)
222
223
// Convert to Table for SQL processing
224
val cleanedTable = tableEnv.fromDataStream(cleanedStream)
225
val aggregatedTable = tableEnv.sqlQuery(
226
"SELECT category, COUNT(*) as cnt, AVG(amount) as avg_amount " +
227
"FROM " + cleanedTable + " " +
228
"GROUP BY category"
229
)
230
231
// Convert back to DataStream for further processing
232
val aggregatedStream = tableEnv.toDataStream(aggregatedTable)
233
aggregatedStream.addSink(new MySinkFunction())
234
```
235
236
## Notes
237
238
- All bridge API components are marked as `@deprecated` as part of FLIP-265
239
- Implicit conversions are automatically available when importing the bridge package
240
- Schema inference works automatically for case classes and basic types
241
- Changelog streams preserve Row-level change information (INSERT, UPDATE, DELETE)
242
- The bridge package requires both table and streaming dependencies