0
# DataStream Integration
1
2
StreamTableEnvironment provides seamless integration between Flink's Table API and DataStream API, enabling conversion between tables and data streams for complex stream processing pipelines that combine both APIs.
3
4
## Capabilities
5
6
### Environment Creation
7
8
Creates streaming table environments with DataStream integration capabilities.
9
10
```java { .api }
11
/**
12
* Creates a StreamTableEnvironment from a StreamExecutionEnvironment
13
* @param executionEnvironment The StreamExecutionEnvironment for stream processing
14
* @return StreamTableEnvironment with DataStream integration
15
*/
16
static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment);
17
18
/**
19
* Creates a StreamTableEnvironment with custom settings
20
* @param executionEnvironment The StreamExecutionEnvironment for stream processing
21
* @param settings Environment settings for the table environment
22
* @return StreamTableEnvironment with specified settings
23
*/
24
static StreamTableEnvironment create(
25
StreamExecutionEnvironment executionEnvironment,
26
EnvironmentSettings settings
27
);
28
```
29
30
**Usage Examples:**
31
32
```java
33
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
34
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
35
36
// Create from execution environment
37
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
38
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
39
40
// With custom settings
41
EnvironmentSettings settings = EnvironmentSettings
42
.newInstance()
43
.inStreamingMode()
44
.build();
45
StreamTableEnvironment customTableEnv = StreamTableEnvironment.create(env, settings);
46
```
47
48
### DataStream to Table Conversion
49
50
Converts DataStream objects to Table objects for SQL and Table API operations.
51
52
```java { .api }
53
/**
54
* Creates a Table from a DataStream with automatic schema inference
55
* @param dataStream The DataStream to convert
56
* @return Table representing the DataStream
57
*/
58
<T> Table fromDataStream(DataStream<T> dataStream);
59
60
/**
61
* Creates a Table from a DataStream with explicit schema
62
* @param dataStream The DataStream to convert
63
* @param schema Schema definition for the resulting Table
64
* @return Table with specified schema
65
*/
66
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
67
68
/**
69
* Creates a Table from a DataStream with field expressions
70
* @param dataStream The DataStream to convert
71
* @param fields Expressions defining field mappings and types
72
* @return Table with specified field mappings
73
*/
74
<T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);
75
```
76
77
**Usage Examples:**
78
79
```java
80
// Simple POJO DataStream
81
DataStream<Order> orderStream = env.fromSource(orderSource, watermarkStrategy, "orders");
82
83
// Automatic schema inference
84
Table ordersTable = tableEnv.fromDataStream(orderStream);
85
86
// With explicit schema
87
Schema orderSchema = Schema.newBuilder()
88
.column("orderId", DataTypes.BIGINT())
89
.column("customerId", DataTypes.BIGINT())
90
.column("amount", DataTypes.DECIMAL(10, 2))
91
.column("orderTime", DataTypes.TIMESTAMP(3))
92
.watermark("orderTime", $("orderTime").minus(lit(5).seconds()))
93
.build();
94
95
Table ordersWithSchema = tableEnv.fromDataStream(orderStream, orderSchema);
96
97
// With field expressions
98
Table ordersWithFields = tableEnv.fromDataStream(
99
orderStream,
100
$("orderId"),
101
$("customerId"),
102
$("amount"),
103
$("orderTime").rowtime()
104
);
105
```
106
107
### Table to DataStream Conversion
108
109
Converts Table objects back to DataStream objects for further stream processing.
110
111
```java { .api }
112
/**
113
* Converts a Table to a DataStream of Row objects
114
* @param table The Table to convert
115
* @return DataStream<Row> representing the Table data
116
*/
117
DataStream<Row> toDataStream(Table table);
118
119
/**
120
* Converts a Table to a DataStream with explicit target class
121
* @param table The Table to convert
122
* @param targetClass Target class for DataStream elements
123
* @return DataStream with specified element type
124
*/
125
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
126
127
/**
128
* Converts a Table to a DataStream with specific data type
129
* @param table The Table to convert
130
* @param targetDataType Target data type for DataStream elements
131
* @return DataStream with specified data type
132
*/
133
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
134
```
135
136
**Usage Examples:**
137
138
```java
139
// Table with aggregated results
140
Table aggregatedOrders = tableEnv
141
.from("orders")
142
.groupBy($("customerId"))
143
.select($("customerId"), $("amount").sum().as("totalAmount"));
144
145
// Convert to DataStream with automatic inference
146
DataStream<Row> resultStream = tableEnv.toDataStream(aggregatedOrders);
147
148
// Convert to specific POJO class
149
@Data
150
public class CustomerTotal {
151
public Long customerId;
152
public BigDecimal totalAmount;
153
}
154
155
DataStream<CustomerTotal> pojoStream = tableEnv.toDataStream(
156
aggregatedOrders,
157
CustomerTotal.class
158
);
159
160
// Convert with explicit data type
161
DataStream<Row> typedStream = tableEnv.toDataStream(
162
aggregatedOrders,
163
DataTypes.ROW(
164
DataTypes.FIELD("customerId", DataTypes.BIGINT()),
165
DataTypes.FIELD("totalAmount", DataTypes.DECIMAL(10, 2))
166
)
167
);
168
```
169
170
### Changelog Stream Operations
171
172
Handles change data capture (CDC) and changelog streams for maintaining state consistency.
173
174
```java { .api }
175
/**
176
* Converts a Table to a changelog DataStream
177
* @param table The Table to convert (should support changelog)
178
* @return DataStream of Row with RowKind information
179
*/
180
DataStream<Row> toChangelogStream(Table table);
181
182
/**
183
* Converts a Table to a changelog DataStream with explicit schema
184
* @param table The Table to convert
185
* @param targetSchema Schema for the resulting changelog stream
186
* @return DataStream of Row with RowKind information
187
*/
188
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
189
190
/**
191
* Converts a Table to a changelog DataStream with explicit schema and changelog mode
192
* @param table The Table to convert
193
* @param targetSchema Schema for the resulting changelog stream
194
* @param changelogMode Changelog mode specifying supported operations
195
* @return DataStream of Row with RowKind information
196
*/
197
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
198
```
199
200
**Usage Examples:**
201
202
```java
203
// Table with updates and deletes
204
Table updatingTable = tableEnv
205
.from("user_updates")
206
.groupBy($("userId"))
207
.select($("userId"), $("lastUpdate").max().as("latestUpdate"));
208
209
// Convert to changelog stream
210
DataStream<Row> changelogStream = tableEnv.toChangelogStream(updatingTable);
211
212
// Process changelog events
213
changelogStream.map(new MapFunction<Row, String>() {
214
@Override
215
public String map(Row row) throws Exception {
216
RowKind kind = row.getKind();
217
switch (kind) {
218
case INSERT:
219
return "New user: " + row.getField(0);
220
case UPDATE_AFTER:
221
return "Updated user: " + row.getField(0);
222
case DELETE:
223
return "Deleted user: " + row.getField(0);
224
default:
225
return "Unknown change: " + row.getField(0);
226
}
227
}
228
});
229
```
230
231
### Changelog to Table Conversion
232
233
Converts changelog DataStreams back to Table objects for further table operations.
234
235
```java { .api }
236
/**
237
* Converts a changelog DataStream to a Table
238
* @param dataStream Changelog DataStream with RowKind information
239
* @return Table representing the changelog stream
240
*/
241
Table fromChangelogStream(DataStream<Row> dataStream);
242
243
/**
244
* Converts a changelog DataStream to a Table with explicit schema
245
* @param dataStream Changelog DataStream with RowKind information
246
* @param schema Schema for the resulting table
247
* @return Table with specified schema
248
*/
249
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
250
251
/**
252
* Converts a changelog DataStream to a Table with schema and changelog mode
253
* @param dataStream Changelog DataStream with RowKind information
254
* @param schema Schema for the resulting table
255
* @param changelogMode Changelog mode specifying supported operations
256
* @return Table with specified schema and changelog mode
257
*/
258
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
259
```
260
261
**Usage Examples:**
262
263
```java
264
// DataStream with changelog semantics
265
DataStream<Row> changelogStream = env.addSource(new ChangelogSource());
266
267
// Convert to table with automatic schema inference
268
Table changelogTable = tableEnv.fromChangelogStream(changelogStream);
269
270
// Convert with explicit schema
271
Schema explicitSchema = Schema.newBuilder()
272
.column("user_id", DataTypes.BIGINT())
273
.column("user_name", DataTypes.STRING())
274
.column("last_login", DataTypes.TIMESTAMP(3))
275
.build();
276
277
Table typedChangelogTable = tableEnv.fromChangelogStream(changelogStream, explicitSchema);
278
279
// With changelog mode specification
280
ChangelogMode updateMode = ChangelogMode.newBuilder()
281
.addContainedKind(RowKind.INSERT)
282
.addContainedKind(RowKind.UPDATE_AFTER)
283
.addContainedKind(RowKind.DELETE)
284
.build();
285
286
Table modeAwareTable = tableEnv.fromChangelogStream(
287
changelogStream,
288
explicitSchema,
289
updateMode
290
);
291
```
292
293
### Temporary View Creation
294
295
Creates temporary views from DataStream objects for SQL query access.
296
297
```java { .api }
298
/**
299
* Creates a temporary view from a DataStream
300
* @param path View name/path
301
* @param dataStream DataStream to create view from
302
*/
303
<T> void createTemporaryView(String path, DataStream<T> dataStream);
304
305
/**
306
* Creates a temporary view from a DataStream with explicit schema
307
* @param path View name/path
308
* @param dataStream DataStream to create view from
309
* @param schema Schema for the view
310
*/
311
<T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema);
312
```
313
314
**Usage Examples:**
315
316
```java
317
DataStream<Order> orderStream = env.addSource(new OrderSource());
318
319
// Create temporary view with automatic schema inference
320
tableEnv.createTemporaryView("orders", orderStream);
321
322
// Create view with explicit schema
323
Schema orderSchema = Schema.newBuilder()
324
.column("order_id", DataTypes.BIGINT())
325
.column("customer_id", DataTypes.BIGINT())
326
.column("amount", DataTypes.DECIMAL(10, 2))
327
.column("order_time", DataTypes.TIMESTAMP(3))
328
.watermark("order_time", $("order_time").minus(lit(5).seconds()))
329
.build();
330
331
tableEnv.createTemporaryView("orders_with_watermark", orderStream, orderSchema);
332
333
// Use in SQL queries
334
Table results = tableEnv.sqlQuery(
335
"SELECT customer_id, SUM(amount) as total_amount " +
336
"FROM orders_with_watermark " +
337
"GROUP BY customer_id"
338
);
339
```
340
341
### Statement Set Creation
342
343
Creates statement sets for executing multiple streaming operations together.
344
345
```java { .api }
346
/**
347
* Creates a StreamStatementSet for batch execution of multiple streaming operations
348
* @return StreamStatementSet for adding multiple statements
349
*/
350
StreamStatementSet createStatementSet();
351
```
352
353
**Usage Examples:**
354
355
```java
356
// Create statement set for batch execution
357
StreamStatementSet statementSet = tableEnv.createStatementSet();
358
359
// Add multiple insert statements
360
Table processedOrders = tableEnv.from("orders")
361
.filter($("status").isEqual("processed"));
362
363
statementSet.addInsert("processed_orders_sink", processedOrders);
364
365
Table failedOrders = tableEnv.from("orders")
366
.filter($("status").isEqual("failed"));
367
368
statementSet.addInsert("failed_orders_sink", failedOrders);
369
370
// Execute all statements together
371
TableResult result = statementSet.execute();
372
373
// Or add SQL statements
374
statementSet.addInsertSql(
375
"INSERT INTO daily_summary " +
376
"SELECT DATE(order_time) as order_date, COUNT(*) as total_orders " +
377
"FROM orders " +
378
"GROUP BY DATE(order_time)"
379
);
380
```
381
382
### Legacy Conversion Methods (Deprecated)
383
384
Older methods for DataStream conversion that are now deprecated but still available for backward compatibility.
385
386
```java { .api }
387
/**
388
* @deprecated Use fromDataStream() instead
389
* Registers a DataStream as a temporary table
390
*/
391
@Deprecated
392
<T> void registerDataStream(String name, DataStream<T> dataStream);
393
394
/**
395
* @deprecated Use toDataStream() instead
396
* Converts a Table to an append-only DataStream
397
*/
398
@Deprecated
399
<T> DataStream<T> toAppendStream(Table table, Class<T> clazz);
400
401
/**
402
* @deprecated Use toChangelogStream() instead
403
* Converts a Table to a retract DataStream
404
*/
405
@Deprecated
406
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);
407
```
408
409
410
### Schema Definition for Conversion
411
412
Explicit schema definitions for controlling DataStream to Table conversions.
413
414
```java { .api }
415
class Schema {
416
/**
417
* Creates a new schema builder
418
* @return Builder for constructing schemas
419
*/
420
static Builder newBuilder();
421
422
interface Builder {
423
Builder column(String columnName, AbstractDataType<?> dataType);
424
Builder columnByExpression(String columnName, String expression);
425
Builder columnByMetadata(String columnName, AbstractDataType<?> dataType);
426
Builder columnByMetadata(String columnName, AbstractDataType<?> dataType, String metadataKey);
427
Builder watermark(String columnName, Expression watermarkExpression);
428
Builder primaryKey(String... columnNames);
429
Schema build();
430
}
431
}
432
```
433
434
**Usage Examples:**
435
436
```java
437
// Complex schema with computed columns and watermarks
438
Schema complexSchema = Schema.newBuilder()
439
.column("user_id", DataTypes.BIGINT())
440
.column("event_time", DataTypes.TIMESTAMP(3))
441
.column("event_data", DataTypes.STRING())
442
.columnByExpression("hour_of_day", "EXTRACT(HOUR FROM event_time)")
443
.columnByMetadata("kafka_offset", DataTypes.BIGINT(), "offset")
444
.watermark("event_time", $("event_time").minus(lit(30).seconds()))
445
.primaryKey("user_id", "event_time")
446
.build();
447
448
// Apply schema during conversion
449
Table enrichedEvents = tableEnv.fromDataStream(eventStream, complexSchema);
450
```
451
452
## Types
453
454
### Stream Table Environment
455
456
```java { .api }
457
interface StreamTableEnvironment extends TableEnvironment {
458
// Inherits all TableEnvironment methods
459
// Plus DataStream integration methods listed above
460
}
461
```
462
463
### Temporal Table Function
464
465
```java { .api }
466
class TemporalTableFunction extends TableFunction<Row> {
467
// Used for temporal joins in streaming scenarios
468
// Automatically created by createTemporalTableFunction()
469
}
470
```
471
472
### Row and RowKind
473
474
```java { .api }
475
class Row {
476
Object getField(int pos);
477
Object getField(String name);
478
int getArity();
479
RowKind getKind();
480
void setKind(RowKind kind);
481
}
482
483
enum RowKind {
484
INSERT, // +I: Insert operation
485
UPDATE_BEFORE, // -U: Update before (old value)
486
UPDATE_AFTER, // +U: Update after (new value)
487
DELETE // -D: Delete operation
488
}
489
```