0
# DataStream Integration
1
2
This document covers the seamless conversion between Flink Tables and DataStreams, enabling hybrid processing workflows that combine Table API with DataStream API capabilities.
3
4
## StreamTableEnvironment
5
6
The StreamTableEnvironment provides integration between Table API and DataStream API.
7
8
### Environment Creation
9
10
```java { .api }
11
class StreamTableEnvironment {
12
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
13
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings);
14
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, TableConfig tableConfig);
15
}
16
```
17
18
**Usage:**
19
20
```java
21
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
22
23
// Create with default settings
24
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
25
26
// Create with custom settings
27
EnvironmentSettings settings = EnvironmentSettings.newInstance()
28
.useBlinkPlanner()
29
.inStreamingMode()
30
.build();
31
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
32
```
33
34
## DataStream to Table Conversion
35
36
### Basic Conversion
37
38
```java { .api }
39
interface StreamTableEnvironment {
40
Table fromDataStream(DataStream<?> dataStream);
41
Table fromDataStream(DataStream<?> dataStream, Expression... fields);
42
Table fromDataStream(DataStream<?> dataStream, Schema schema);
43
44
// Changelog stream conversion
45
Table fromChangelogStream(DataStream<Row> dataStream);
46
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
47
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
48
}
49
```
50
51
**Usage:**
52
53
```java
54
DataStream<Tuple3<String, Integer, Long>> dataStream = env.fromElements(
55
Tuple3.of("Alice", 25, 1000L),
56
Tuple3.of("Bob", 30, 2000L)
57
);
58
59
// Convert with automatic field inference
60
Table table1 = tableEnv.fromDataStream(dataStream);
61
62
// Convert with field mapping
63
Table table2 = tableEnv.fromDataStream(dataStream, $("name"), $("age"), $("salary"));
64
65
// Convert with explicit schema
66
Schema schema = Schema.newBuilder()
67
.column("name", DataTypes.STRING())
68
.column("age", DataTypes.INT())
69
.column("salary", DataTypes.BIGINT())
70
.build();
71
Table table3 = tableEnv.fromDataStream(dataStream, schema);
72
```
73
74
### Complex Type Conversion
75
76
```java
77
// POJO conversion
78
public static class User {
79
public String name;
80
public int age;
81
public long timestamp;
82
// constructors, getters, setters
83
}
84
85
DataStream<User> userStream = env.addSource(new UserSource());
86
Table userTable = tableEnv.fromDataStream(userStream);
87
88
// Row type conversion
89
DataStream<Row> rowStream = env.addSource(new RowSource());
90
Table rowTable = tableEnv.fromDataStream(rowStream,
91
$("user_id").bigint(),
92
$("event_time").timestamp(3),
93
$("event_type").string()
94
);
95
```
96
97
### Time Attribute Mapping
98
99
```java
100
DataStream<Tuple3<String, Long, String>> eventStream = env.addSource(new EventSource());
101
102
// Processing time
103
Table table = tableEnv.fromDataStream(eventStream,
104
$("user_id"),
105
$("event_time").rowtime(), // Event time from field
106
$("event_type"),
107
$("proc_time").proctime() // Processing time
108
);
109
110
// Event time with watermarks
111
DataStream<Event> watermarkedStream = eventStream
112
.assignTimestampsAndWatermarks(
113
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
114
.withTimestampAssigner((event, timestamp) -> event.timestamp)
115
);
116
117
Table eventTable = tableEnv.fromDataStream(watermarkedStream,
118
$("user_id"),
119
$("timestamp").rowtime(),
120
$("event_type")
121
);
122
```
123
124
## Table to DataStream Conversion
125
126
### Basic Conversion
127
128
```java { .api }
129
interface StreamTableEnvironment {
130
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
131
<T> DataStream<T> toDataStream(Table table, AbstractDataType<T> targetDataType);
132
DataStream<Row> toDataStream(Table table);
133
134
// Legacy methods (deprecated)
135
<T> DataStream<T> toAppendStream(Table table, Class<T> clazz);
136
<T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo);
137
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);
138
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo);
139
}
140
```
141
142
**Usage:**
143
144
```java
145
Table table = tableEnv.sqlQuery("SELECT name, age FROM users WHERE age > 18");
146
147
// Convert to Row (default)
148
DataStream<Row> rowStream = tableEnv.toDataStream(table);
149
150
// Convert to POJO
151
DataStream<User> userStream = tableEnv.toDataStream(table, User.class);
152
153
// Convert to Tuple
154
DataStream<Tuple2<String, Integer>> tupleStream = tableEnv.toDataStream(table,
155
Types.TUPLE(Types.STRING, Types.INT));
156
```
157
158
### Changelog Stream Conversion
159
160
```java { .api }
161
interface StreamTableEnvironment {
162
DataStream<Row> toChangelogStream(Table table);
163
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
164
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
165
}
166
```
167
168
**Usage:**
169
170
```java
171
Table aggregatedTable = tableEnv.sqlQuery(
172
"SELECT user_id, COUNT(*) as cnt FROM events GROUP BY user_id"
173
);
174
175
// Convert to changelog stream (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE)
176
DataStream<Row> changelogStream = tableEnv.toChangelogStream(aggregatedTable);
177
178
changelogStream.process(new ProcessFunction<Row, String>() {
179
@Override
180
public void processElement(Row row, Context ctx, Collector<String> out) {
181
RowKind kind = row.getKind();
182
String message = String.format("Kind: %s, Data: %s", kind, row);
183
out.collect(message);
184
}
185
});
186
```
187
188
## Temporary Views from DataStreams
189
190
```java { .api }
191
interface StreamTableEnvironment {
192
void createTemporaryView(String path, DataStream<?> dataStream);
193
void createTemporaryView(String path, DataStream<?> dataStream, Expression... fields);
194
void createTemporaryView(String path, DataStream<?> dataStream, Schema schema);
195
}
196
```
197
198
**Usage:**
199
200
```java
201
DataStream<Order> orderStream = env.addSource(new OrderSource());
202
203
// Create temporary view
204
tableEnv.createTemporaryView("orders", orderStream,
205
$("order_id"),
206
$("customer_id"),
207
$("amount"),
208
$("order_time").rowtime()
209
);
210
211
// Use in SQL
212
Table result = tableEnv.sqlQuery(
213
"SELECT customer_id, SUM(amount) " +
214
"FROM orders " +
215
"GROUP BY customer_id, TUMBLE(order_time, INTERVAL '1' HOUR)"
216
);
217
```
218
219
## Batch Integration (Legacy)
220
221
For batch processing with DataSet API (deprecated in newer versions):
222
223
```java { .api }
224
class BatchTableEnvironment {
225
static BatchTableEnvironment create(ExecutionEnvironment executionEnvironment);
226
227
Table fromDataSet(DataSet<?> dataSet);
228
Table fromDataSet(DataSet<?> dataSet, Expression... fields);
229
230
<T> DataSet<T> toDataSet(Table table, Class<T> clazz);
231
DataSet<Row> toDataSet(Table table);
232
}
233
```
234
235
## Scala Integration
236
237
### StreamTableEnvironment (Scala)
238
239
```scala { .api }
240
object StreamTableEnvironment {
241
def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment
242
def create(executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings): StreamTableEnvironment
243
}
244
245
class StreamTableEnvironment {
246
def fromDataStream[T](dataStream: DataStream[T]): Table
247
def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table
248
249
def toAppendStream[T: TypeInformation](table: Table): DataStream[T]
250
def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)]
251
}
252
```
253
254
**Scala Usage:**
255
256
```scala
257
import org.apache.flink.streaming.api.scala._
258
import org.apache.flink.table.api.bridge.scala._
259
260
val env = StreamExecutionEnvironment.getExecutionEnvironment
261
val tableEnv = StreamTableEnvironment.create(env)
262
263
// DataStream to Table
264
val dataStream: DataStream[(String, Int)] = env.fromElements(("Alice", 25), ("Bob", 30))
265
val table = tableEnv.fromDataStream(dataStream, 'name, 'age)
266
267
// Table to DataStream
268
val resultStream: DataStream[(String, Int)] = tableEnv.toAppendStream[(String, Int)](table)
269
```
270
271
## Type System Integration
272
273
### Type Inference
274
275
```java { .api }
276
class TypeInference {
277
static DataType inferDataType(Class<?> clazz);
278
static DataType inferDataType(TypeInformation<?> typeInfo);
279
}
280
```
281
282
### Row Kind Handling
283
284
```java { .api }
285
enum RowKind {
286
INSERT("+I"),
287
UPDATE_BEFORE("-U"),
288
UPDATE_AFTER("+U"),
289
DELETE("-D");
290
}
291
292
class Row {
293
RowKind getKind();
294
void setKind(RowKind kind);
295
Object getField(int pos);
296
Object getField(String name);
297
}
298
```
299
300
## Configuration
301
302
### Execution Configuration
303
304
```java
305
// Configure execution environment
306
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
307
env.getConfig().setAutoWatermarkInterval(1000);
308
309
// Configure table environment
310
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
311
tableEnv.getConfig().getConfiguration().setString("table.exec.mini-batch.enabled", "true");
312
```
313
314
### Memory Management
315
316
```java
317
// Configure state backend
318
env.setStateBackend(new HashMapStateBackend());
319
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoints");
320
321
// Configure memory
322
Configuration config = new Configuration();
323
config.setString("taskmanager.memory.process.size", "1024m");
324
config.setString("taskmanager.memory.flink.size", "768m");
325
```
326
327
## Common Patterns
328
329
### Hybrid Processing Pipeline
330
331
```java
332
// Start with DataStream processing
333
DataStream<Event> eventStream = env
334
.addSource(new KafkaSource())
335
.filter(event -> event.isValid())
336
.keyBy(Event::getUserId);
337
338
// Convert to Table for SQL processing
339
tableEnv.createTemporaryView("events", eventStream);
340
341
Table aggregated = tableEnv.sqlQuery(
342
"SELECT user_id, COUNT(*) as event_count, " +
343
" TUMBLE_START(event_time, INTERVAL '5' MINUTES) as window_start " +
344
"FROM events " +
345
"GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTES)"
346
);
347
348
// Convert back to DataStream for complex processing
349
DataStream<UserStats> statsStream = tableEnv.toDataStream(aggregated, UserStats.class);
350
351
statsStream
352
.keyBy(UserStats::getUserId)
353
.process(new ComplexStatefulFunction())
354
.addSink(new ElasticsearchSink<>());
355
```
356
357
### Real-time Feature Engineering
358
359
```java
360
// Raw events from multiple sources
361
DataStream<ClickEvent> clicks = env.addSource(new ClickEventSource());
362
DataStream<PurchaseEvent> purchases = env.addSource(new PurchaseEventSource());
363
364
// Register as tables
365
tableEnv.createTemporaryView("clicks", clicks);
366
tableEnv.createTemporaryView("purchases", purchases);
367
368
// Feature engineering with SQL
369
Table features = tableEnv.sqlQuery(
370
"SELECT " +
371
" c.user_id," +
372
" COUNT(c.click_id) as clicks_last_hour," +
373
" COALESCE(SUM(p.amount), 0) as purchases_last_hour " +
374
"FROM clicks c " +
375
"LEFT JOIN purchases p ON c.user_id = p.user_id " +
376
" AND p.purchase_time BETWEEN c.click_time - INTERVAL '1' HOUR AND c.click_time " +
377
"WHERE c.click_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR " +
378
"GROUP BY c.user_id"
379
);
380
381
// Output as enriched stream
382
DataStream<UserFeatures> featureStream = tableEnv.toDataStream(features, UserFeatures.class);
383
```
384
385
## Error Handling
386
387
```java { .api }
388
class StreamTableException extends TableException;
389
class DataStreamConversionException extends StreamTableException;
390
```
391
392
## Types
393
394
```java { .api }
395
class Schema {
396
static Schema.Builder newBuilder();
397
398
interface Builder {
399
Builder column(String columnName, DataType dataType);
400
Builder columnByExpression(String columnName, String expression);
401
Builder columnByMetadata(String columnName, DataType dataType);
402
Builder watermark(String columnName, String watermarkExpression);
403
Builder primaryKey(String... columnNames);
404
Schema build();
405
}
406
}
407
408
enum ChangelogMode {
409
INSERT_ONLY,
410
UPSERT,
411
ALL
412
}
413
414
interface AbstractDataType<T>;
415
class DataTypes {
416
static AbstractDataType<Row> ROW(AbstractDataType<?>... fieldDataTypes);
417
static <T> AbstractDataType<T> of(Class<T> expectedClass);
418
}
419
```