0
# DataStream Integration
1
2
Bridge between Table API and DataStream API enabling conversion between tables and data streams for hybrid stream/batch processing applications.
3
4
## Capabilities
5
6
### StreamTableEnvironment
7
8
Specialized TableEnvironment that provides seamless integration between Table API and DataStream API for streaming applications.
9
10
```java { .api }
11
/**
12
* Create StreamTableEnvironment from StreamExecutionEnvironment
13
* @param streamEnv Existing StreamExecutionEnvironment
14
* @return StreamTableEnvironment instance
15
*/
16
public static StreamTableEnvironment create(StreamExecutionEnvironment streamEnv);
17
18
/**
19
* Create StreamTableEnvironment with specific settings
20
* @param streamEnv Existing StreamExecutionEnvironment
21
* @param settings Table environment configuration
22
* @return StreamTableEnvironment instance
23
*/
24
public static StreamTableEnvironment create(StreamExecutionEnvironment streamEnv,
25
EnvironmentSettings settings);
26
27
/**
28
* Get the underlying StreamExecutionEnvironment
29
* @return StreamExecutionEnvironment instance
30
*/
31
public StreamExecutionEnvironment getStreamExecutionEnvironment();
32
```
33
34
**Basic Setup:**
35
36
```java
37
// Create streaming environment
38
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
39
env.setParallelism(4);
40
env.enableCheckpointing(10000);
41
42
// Create table environment
43
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
44
45
// Configure table environment
46
tEnv.getConfig().getConfiguration().setString("table.exec.mini-batch.enabled", "true");
47
```
48
49
### DataStream to Table Conversion
50
51
Convert DataStream instances to Table for SQL and Table API operations.
52
53
```java { .api }
54
/**
55
* Convert DataStream to Table using automatic schema inference
56
* @param dataStream DataStream to convert
57
* @return Table representing the stream data
58
*/
59
public <T> Table fromDataStream(DataStream<T> dataStream);
60
61
/**
62
* Convert DataStream to Table with explicit field selection
63
* @param dataStream DataStream to convert
64
* @param fields Expressions defining the table schema
65
* @return Table with specified schema
66
*/
67
public <T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);
68
69
/**
70
* Convert DataStream to Table with explicit schema
71
* @param dataStream DataStream to convert
72
* @param schema Complete schema definition including watermarks
73
* @return Table with specified schema
74
*/
75
public <T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
76
77
/**
78
* Create a temporary view from DataStream
79
* @param name View name for SQL queries
80
* @param dataStream DataStream to expose as view
81
*/
82
public <T> void createTemporaryView(String name, DataStream<T> dataStream);
83
84
/**
85
* Create a temporary view from DataStream with schema
86
* @param name View name for SQL queries
87
* @param dataStream DataStream to expose as view
88
* @param schema Schema definition for the view
89
*/
90
public <T> void createTemporaryView(String name, DataStream<T> dataStream, Schema schema);
91
```
92
93
**DataStream to Table Examples:**
94
95
```java
96
// Simple POJO conversion
97
DataStream<Order> orderStream = env.addSource(new OrderSource());
98
99
// Automatic schema inference from POJO
100
Table orders = tEnv.fromDataStream(orderStream);
101
102
// Explicit field selection and aliasing
103
Table ordersWithAlias = tEnv.fromDataStream(orderStream,
104
$("orderId").as("id"),
105
$("customerId"),
106
$("amount"),
107
$("orderTime"));
108
109
// Complex schema with watermarks for event time
110
Schema orderSchema = Schema.newBuilder()
111
.column("orderId", DataTypes.BIGINT())
112
.column("customerId", DataTypes.BIGINT())
113
.column("amount", DataTypes.DECIMAL(10, 2))
114
.column("orderTime", DataTypes.TIMESTAMP_LTZ(3))
115
.watermark("orderTime", $("orderTime").minus(lit(5).seconds()))
116
.build();
117
118
Table ordersWithWatermark = tEnv.fromDataStream(orderStream, orderSchema);
119
120
// Create temporary view for SQL access
121
tEnv.createTemporaryView("orders", orderStream, orderSchema);
122
tEnv.executeSql("SELECT customerId, SUM(amount) FROM orders " +
123
"WHERE orderTime > CURRENT_TIMESTAMP - INTERVAL '1' HOUR " +
124
"GROUP BY customerId").print();
125
```
126
127
### Table to DataStream Conversion
128
129
Convert Table instances back to DataStream for stream processing operations.
130
131
```java { .api }
132
/**
133
* Convert Table to DataStream with automatic type inference
134
* @param table Table to convert
135
* @return DataStream with Row type
136
*/
137
public DataStream<Row> toDataStream(Table table);
138
139
/**
140
* Convert Table to DataStream with specific target type
141
* @param table Table to convert
142
* @param targetClass Target Java class for stream elements
143
* @return DataStream with specified type
144
*/
145
public <T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
146
147
/**
148
* Convert Table to DataStream with type information
149
* @param table Table to convert
150
* @param targetType TypeInformation for stream elements
151
* @return DataStream with specified type
152
*/
153
public <T> DataStream<T> toDataStream(Table table, TypeInformation<T> targetType);
154
155
/**
156
* Convert changing table to changelog stream
157
* @param table Table to convert (may contain updates/deletes)
158
* @return DataStream of Row with change flags
159
*/
160
public DataStream<Row> toChangelogStream(Table table);
161
162
/**
163
* Convert changing table to changelog stream with specific type
164
* @param table Table to convert
165
* @param targetClass Target Java class for stream elements
166
* @return DataStream with change information
167
*/
168
public <T> DataStream<T> toChangelogStream(Table table, Class<T> targetClass);
169
170
/**
171
* Convert changing table to retract stream
172
* @param table Table to convert
173
* @return DataStream of Tuple2<Boolean, Row> where Boolean indicates add/retract
174
*/
175
public DataStream<Tuple2<Boolean, Row>> toRetractStream(Table table);
176
177
/**
178
* Convert changing table to retract stream with specific type
179
* @param table Table to convert
180
* @param targetClass Target Java class for stream elements
181
* @return DataStream of Tuple2<Boolean, T> with retract information
182
*/
183
public <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> targetClass);
184
```
185
186
**Table to DataStream Examples:**
187
188
```java
189
// Basic conversion to Row DataStream
190
Table filteredOrders = tEnv.from("orders")
191
.filter($("amount").isGreater(lit(100)));
192
DataStream<Row> resultStream = tEnv.toDataStream(filteredOrders);
193
194
// Convert to specific POJO type
195
DataStream<OrderSummary> summaryStream = tEnv.toDataStream(
196
tEnv.from("orders")
197
.groupBy($("customerId"))
198
.select($("customerId"), $("amount").sum().as("totalAmount")),
199
OrderSummary.class
200
);
201
202
// Handle updates with changelog stream
203
Table customerTotals = tEnv.from("orders")
204
.groupBy($("customerId"))
205
.select($("customerId"), $("amount").sum().as("total"));
206
207
DataStream<Row> changelogStream = tEnv.toChangelogStream(customerTotals);
208
changelogStream.process(new ProcessFunction<Row, String>() {
209
@Override
210
public void processElement(Row row, Context ctx, Collector<String> out) {
211
RowKind kind = row.getKind();
212
switch (kind) {
213
case INSERT:
214
out.collect("New customer total: " + row);
215
break;
216
case UPDATE_AFTER:
217
out.collect("Updated customer total: " + row);
218
break;
219
case DELETE:
220
out.collect("Removed customer: " + row);
221
break;
222
}
223
}
224
});
225
226
// Use retract stream for legacy compatibility
227
DataStream<Tuple2<Boolean, OrderSummary>> retractStream =
228
tEnv.toRetractStream(customerTotals, OrderSummary.class);
229
```
230
231
### Event Time and Watermarks
232
233
Handle event time processing and watermark propagation between DataStream and Table API.
234
235
```java { .api }
236
// Event time assignment in DataStream before conversion
237
DataStream<Order> ordersWithEventTime = orderStream
238
.assignTimestampsAndWatermarks(
239
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
240
.withTimestampAssigner((order, timestamp) -> order.getOrderTime())
241
);
242
243
// Convert to Table preserving event time
244
Table ordersTable = tEnv.fromDataStream(ordersWithEventTime,
245
Schema.newBuilder()
246
.column("orderId", DataTypes.BIGINT())
247
.column("customerId", DataTypes.BIGINT())
248
.column("amount", DataTypes.DECIMAL(10, 2))
249
.column("orderTime", DataTypes.TIMESTAMP_LTZ(3))
250
.watermark("orderTime", $("orderTime").minus(lit(5).seconds()))
251
.build()
252
);
253
```
254
255
### Type System Integration
256
257
Handle type conversions and mappings between DataStream and Table type systems.
258
259
```java { .api }
260
/**
261
* Type mapping utilities for DataStream-Table integration
262
*/
263
public class TypeConversions {
264
/**
265
* Convert DataStream TypeInformation to Table DataType
266
* @param typeInfo DataStream type information
267
* @return Equivalent Table API data type
268
*/
269
public static DataType fromLegacyInfoToDataType(TypeInformation<?> typeInfo);
270
271
/**
272
* Convert Table DataType to DataStream TypeInformation
273
* @param dataType Table API data type
274
* @return Equivalent DataStream type information
275
*/
276
public static TypeInformation<?> fromDataTypeToLegacyInfo(DataType dataType);
277
}
278
```
279
280
**Type Mapping Examples:**
281
282
```java
283
// Complex POJO with nested fields
284
@Data
285
public class ComplexOrder {
286
public Long orderId;
287
public CustomerInfo customer;
288
public List<OrderItem> items;
289
public Instant orderTime;
290
291
@Data
292
public static class CustomerInfo {
293
public Long customerId;
294
public String name;
295
public String email;
296
}
297
298
@Data
299
public static class OrderItem {
300
public String productId;
301
public Integer quantity;
302
public BigDecimal price;
303
}
304
}
305
306
// DataStream with complex type
307
DataStream<ComplexOrder> complexOrderStream = env.addSource(new ComplexOrderSource());
308
309
// Convert with nested field access
310
Table complexOrders = tEnv.fromDataStream(complexOrderStream,
311
$("orderId"),
312
$("customer.customerId").as("customerId"),
313
$("customer.name").as("customerName"),
314
$("items").cardinality().as("itemCount"),
315
$("orderTime"));
316
317
// Flatten nested structure with Table API
318
Table flattenedOrders = complexOrders
319
.joinLateral(call("EXPLODE", $("items")).as("item"))
320
.select($("orderId"),
321
$("customerId"),
322
$("item.productId").as("productId"),
323
$("item.quantity").as("quantity"));
324
```
325
326
### Stream Processing Patterns
327
328
Common patterns for combining DataStream and Table API operations.
329
330
```java { .api }
331
// Pattern 1: Stream -> Table -> Stream pipeline
332
DataStream<RawEvent> rawEvents = env.addSource(new EventSource());
333
334
// Data cleaning and enrichment with Table API
335
Table cleanedEvents = tEnv.fromDataStream(rawEvents)
336
.filter($("value").isNotNull())
337
.select($("eventId"),
338
$("value").upperCase().as("cleanValue"),
339
$("timestamp"))
340
.join(tEnv.from("reference_data"),
341
$("eventId").isEqual($("reference_data.id")));
342
343
// Continue processing with DataStream API
344
DataStream<EnrichedEvent> enrichedStream = tEnv.toDataStream(cleanedEvents, EnrichedEvent.class);
345
enrichedStream
346
.keyBy(EnrichedEvent::getCategory)
347
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
348
.aggregate(new EventAggregator())
349
.addSink(new ResultSink());
350
351
// Pattern 2: Hybrid aggregation with state
352
Table continuousAggregates = tEnv.from("event_stream")
353
.window(Tumble.over(lit(1).minutes()).on($("eventTime")).as("w"))
354
.groupBy($("w"), $("category"))
355
.select($("category"),
356
$("w").start().as("windowStart"),
357
$("value").sum().as("total"));
358
359
DataStream<WindowResult> aggregateStream = tEnv.toDataStream(continuousAggregates, WindowResult.class);
360
361
// Add custom stateful processing
362
aggregateStream
363
.keyBy(WindowResult::getCategory)
364
.process(new StatefulProcessor())
365
.addSink(new AlertSink());
366
```
367
368
### Configuration and Optimization
369
370
Configuration options for optimizing DataStream-Table integration.
371
372
```java { .api }
373
// Configure table environment for stream processing
374
Configuration config = tEnv.getConfig().getConfiguration();
375
376
// Enable mini-batch optimization for better throughput
377
config.setString("table.exec.mini-batch.enabled", "true");
378
config.setString("table.exec.mini-batch.allow-latency", "1s");
379
config.setString("table.exec.mini-batch.size", "1000");
380
381
// Configure state backend for table operations
382
config.setString("table.exec.state.backend", "rocksdb");
383
config.setString("table.exec.state.checkpoint-interval", "10s");
384
385
// Optimize for low latency vs high throughput
386
config.setString("table.exec.streaming.prefer-append-only", "true");
387
config.setString("table.exec.emit.early-fire.enabled", "true");
388
config.setString("table.exec.emit.early-fire.delay", "1000ms");
389
```
390
391
### Error Handling and Monitoring
392
393
Best practices for handling errors and monitoring DataStream-Table integration.
394
395
```java { .api }
396
// Error handling in conversions
397
try {
398
Table result = tEnv.fromDataStream(dataStream);
399
DataStream<Row> output = tEnv.toDataStream(result);
400
} catch (ValidationException e) {
401
// Handle schema validation errors
402
log.error("Schema validation failed: " + e.getMessage());
403
} catch (TableException e) {
404
// Handle table operation errors
405
log.error("Table operation failed: " + e.getMessage());
406
}
407
408
// Add monitoring to DataStream operations
409
DataStream<Order> monitoredStream = orderStream
410
.map(new MapFunction<Order, Order>() {
411
private transient Counter recordCounter;
412
413
@Override
414
public void open(Configuration parameters) {
415
recordCounter = getRuntimeContext()
416
.getMetricGroup()
417
.counter("records_processed");
418
}
419
420
@Override
421
public Order map(Order order) {
422
recordCounter.inc();
423
return order;
424
}
425
});
426
```