0
# DataStream Conversions
1
2
The Flink Table API Java Bridge provides seamless conversion between DataStream and Table representations, enabling developers to combine the expressiveness of SQL/Table API with the flexibility of DataStream API.
3
4
## Conversion Overview
5
6
The bridge supports bidirectional conversions:
7
8
- **DataStream → Table**: Convert streaming data to table format for SQL operations
9
- **Table → DataStream**: Convert table results back to streaming format for further processing
10
11
## DataStream to Table Conversions
12
13
### Basic Conversion
14
15
Convert a `DataStream` to a `Table` using automatic schema inference:
16
17
```java { .api }
18
<T> Table fromDataStream(DataStream<T> dataStream);
19
```
20
21
**Usage Example:**
22
23
```java
24
// Create DataStream from POJO objects
25
DataStream<Person> personStream = env.fromElements(
26
new Person("Alice", 25, "Engineer"),
27
new Person("Bob", 30, "Manager"),
28
new Person("Charlie", 28, "Developer")
29
);
30
31
// Convert to Table (schema inferred from POJO structure)
32
Table personTable = tableEnv.fromDataStream(personStream);
33
34
// Now you can use SQL on the table
35
Table result = tableEnv.sqlQuery("SELECT name, age FROM " + personTable + " WHERE age > 26");
36
```
37
38
### Conversion with Custom Schema
39
40
Convert a `DataStream` to a `Table` with explicit schema definition:
41
42
```java { .api }
43
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
44
```
45
46
**Usage Example:**
47
48
```java
49
DataStream<Row> rowStream = env.fromElements(
50
Row.of("Alice", 25, "2023-01-15"),
51
Row.of("Bob", 30, "2023-02-20"),
52
Row.of("Charlie", 28, "2023-03-10")
53
);
54
55
// Define custom schema with proper data types
56
Schema schema = Schema.newBuilder()
57
.column("name", DataTypes.STRING())
58
.column("age", DataTypes.INT())
59
.column("hire_date", DataTypes.STRING())
60
.build();
61
62
Table employeeTable = tableEnv.fromDataStream(rowStream, schema);
63
```
64
65
### Changelog Stream Conversion
66
67
Convert changelog streams (insert/update/delete operations) to tables:
68
69
```java { .api }
70
Table fromChangelogStream(DataStream<Row> dataStream);
71
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
72
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
73
```
74
75
**Usage Example:**
76
77
```java
78
// Changelog stream with Row kind information
79
DataStream<Row> changelogStream = env.addSource(new ChangelogSourceFunction());
80
81
// Basic changelog conversion
82
Table changelogTable = tableEnv.fromChangelogStream(changelogStream);
83
84
// With custom schema
85
Schema schema = Schema.newBuilder()
86
.column("id", DataTypes.BIGINT())
87
.column("name", DataTypes.STRING())
88
.column("value", DataTypes.DOUBLE())
89
.build();
90
91
Table schemaTable = tableEnv.fromChangelogStream(changelogStream, schema);
92
93
// With specific changelog mode
94
ChangelogMode insertUpdateMode = ChangelogMode.insertUpdate();
95
Table modeTable = tableEnv.fromChangelogStream(changelogStream, schema, insertUpdateMode);
96
```
97
98
## Table to DataStream Conversions
99
100
### Basic Conversion to Row
101
102
Convert a `Table` to a `DataStream<Row>`:
103
104
```java { .api }
105
DataStream<Row> toDataStream(Table table);
106
```
107
108
**Usage Example:**
109
110
```java
111
// Create table from SQL query
112
Table resultTable = tableEnv.sqlQuery(
113
"SELECT name, age, salary FROM employees WHERE department = 'Engineering'"
114
);
115
116
// Convert to DataStream
117
DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
118
119
// Process the stream further
120
resultStream.map(row -> "Employee: " + row.getField(0) + ", Age: " + row.getField(1))
121
.print();
122
```
123
124
### Conversion to Specific Type
125
126
Convert a `Table` to a `DataStream` of a specific Java class:
127
128
```java { .api }
129
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
130
```
131
132
**Usage Example:**
133
134
```java
135
// Define result POJO
136
public static class EmployeeSummary {
137
public String name;
138
public Integer age;
139
public Double salary;
140
141
// Constructors, getters, setters
142
}
143
144
Table summaryTable = tableEnv.sqlQuery("SELECT name, age, salary FROM employees");
145
146
// Convert directly to POJO
147
DataStream<EmployeeSummary> summaryStream = tableEnv.toDataStream(summaryTable, EmployeeSummary.class);
148
149
summaryStream.filter(emp -> emp.salary > 50000.0)
150
.map(emp -> emp.name + " earns " + emp.salary)
151
.print();
152
```
153
154
### Conversion with Data Type
155
156
Convert a `Table` to a `DataStream` using explicit data type specification:
157
158
```java { .api }
159
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
160
```
161
162
**Usage Example:**
163
164
```java
165
Table aggregatedTable = tableEnv.sqlQuery("SELECT department, COUNT(*) as emp_count FROM employees GROUP BY department");
166
167
// Define target data type
168
AbstractDataType<?> rowType = DataTypes.ROW(
169
DataTypes.FIELD("department", DataTypes.STRING()),
170
DataTypes.FIELD("emp_count", DataTypes.BIGINT())
171
);
172
173
DataStream<Row> typedStream = tableEnv.toDataStream(aggregatedTable, rowType);
174
```
175
176
### Changelog Stream Conversion
177
178
Convert tables to changelog streams for capturing insert/update/delete operations:
179
180
```java { .api }
181
DataStream<Row> toChangelogStream(Table table);
182
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
183
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
184
```
185
186
**Usage Example:**
187
188
```java
189
// Create a table with updates (e.g., from a GROUP BY query)
190
Table dynamicTable = tableEnv.sqlQuery(
191
"SELECT user_id, COUNT(*) as event_count FROM user_events GROUP BY user_id"
192
);
193
194
// Convert to changelog stream
195
DataStream<Row> changelogStream = tableEnv.toChangelogStream(dynamicTable);
196
197
// Process changelog events
198
changelogStream.process(new ProcessFunction<Row, String>() {
199
@Override
200
public void processElement(Row row, Context ctx, Collector<String> out) {
201
RowKind kind = row.getKind();
202
switch (kind) {
203
case INSERT:
204
out.collect("New user: " + row.getField(0) + " with " + row.getField(1) + " events");
205
break;
206
case UPDATE_AFTER:
207
out.collect("Updated user: " + row.getField(0) + " now has " + row.getField(1) + " events");
208
break;
209
case DELETE:
210
out.collect("Removed user: " + row.getField(0));
211
break;
212
}
213
}
214
}).print();
215
216
// With target schema
217
Schema targetSchema = Schema.newBuilder()
218
.column("user_id", DataTypes.STRING())
219
.column("event_count", DataTypes.BIGINT())
220
.build();
221
222
DataStream<Row> schemaChangelogStream = tableEnv.toChangelogStream(dynamicTable, targetSchema);
223
224
// With specific changelog mode
225
ChangelogMode upsertMode = ChangelogMode.upsert();
226
DataStream<Row> upsertStream = tableEnv.toChangelogStream(dynamicTable, targetSchema, upsertMode);
227
```
228
229
## Schema Definition
230
231
When working with custom schemas, use the `Schema` builder:
232
233
```java { .api }
234
public class Schema {
235
public static Builder newBuilder();
236
237
public static class Builder {
238
public Builder column(String name, AbstractDataType<?> dataType);
239
public Builder columnByExpression(String name, String expression);
240
public Builder columnByMetadata(String name, AbstractDataType<?> dataType);
241
public Builder primaryKey(String... columnNames);
242
public Builder watermark(String columnName, String watermarkExpression);
243
public Schema build();
244
}
245
}
246
```
247
248
**Usage Example:**
249
250
```java
251
Schema complexSchema = Schema.newBuilder()
252
.column("id", DataTypes.BIGINT())
253
.column("name", DataTypes.STRING())
254
.column("timestamp_col", DataTypes.TIMESTAMP(3))
255
.column("event_time", DataTypes.TIMESTAMP_LTZ(3))
256
.columnByExpression("proc_time", "PROCTIME()")
257
.watermark("event_time", "event_time - INTERVAL '5' SECOND")
258
.primaryKey("id")
259
.build();
260
```
261
262
## Changelog Modes
263
264
The bridge supports different changelog modes for streaming tables:
265
266
```java { .api }
267
public enum ChangelogMode {
268
// Available modes
269
public static ChangelogMode insertOnly();
270
public static ChangelogMode insertUpdate();
271
public static ChangelogMode insertUpdateDelete();
272
public static ChangelogMode upsert();
273
public static ChangelogMode all();
274
}
275
```
276
277
**Mode Descriptions:**
278
279
- **insertOnly()**: Only INSERT operations (append-only stream)
280
- **insertUpdate()**: INSERT and UPDATE_AFTER operations
281
- **insertUpdateDelete()**: INSERT, UPDATE_AFTER, and DELETE operations
282
- **upsert()**: INSERT, UPDATE_AFTER, and DELETE operations with primary key
283
- **all()**: All possible row kinds including UPDATE_BEFORE
284
285
## Common Patterns
286
287
### Stream Processing with SQL
288
289
Combine DataStream processing with SQL queries:
290
291
```java
292
// Start with DataStream
293
DataStream<Event> eventStream = env.addSource(new EventSource());
294
295
// Convert to Table for SQL processing
296
Table eventTable = tableEnv.fromDataStream(eventStream);
297
tableEnv.createTemporaryView("events", eventTable);
298
299
// Apply SQL transformations
300
Table filteredEvents = tableEnv.sqlQuery(
301
"SELECT user_id, event_type, event_time " +
302
"FROM events " +
303
"WHERE event_type IN ('click', 'purchase') " +
304
"AND event_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR"
305
);
306
307
// Convert back to DataStream for further processing
308
DataStream<Row> processedStream = tableEnv.toDataStream(filteredEvents);
309
310
// Continue with DataStream operations
311
processedStream.keyBy(row -> row.getField(0))
312
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
313
.apply(new MyWindowFunction())
314
.addSink(new MySink());
315
```
316
317
### Aggregation with State
318
319
Use changelog streams for stateful aggregations:
320
321
```java
322
Table aggregationTable = tableEnv.sqlQuery(
323
"SELECT user_id, COUNT(*) as event_count, MAX(event_time) as last_event " +
324
"FROM events " +
325
"GROUP BY user_id"
326
);
327
328
DataStream<Row> aggregationChangelog = tableEnv.toChangelogStream(aggregationTable);
329
330
// Handle aggregation changes
331
aggregationChangelog.process(new KeyedProcessFunction<String, Row, Alert>() {
332
private ValueState<Long> lastCount;
333
334
@Override
335
public void processElement(Row row, Context ctx, Collector<Alert> out) {
336
String userId = row.getFieldAs(0);
337
Long newCount = row.getFieldAs(1);
338
Long previousCount = lastCount.value();
339
340
if (previousCount != null && newCount > previousCount * 2) {
341
out.collect(new Alert(userId, "Activity spike detected"));
342
}
343
344
lastCount.update(newCount);
345
}
346
});
347
```
348
349
## Error Handling
350
351
Handle common conversion errors:
352
353
```java
354
try {
355
// Schema mismatch
356
Schema schema = Schema.newBuilder()
357
.column("name", DataTypes.STRING())
358
.column("age", DataTypes.INT())
359
.build();
360
361
Table table = tableEnv.fromDataStream(invalidDataStream, schema);
362
363
} catch (ValidationException e) {
364
// Handle schema validation errors
365
logger.error("Schema validation failed: {}", e.getMessage());
366
367
} catch (TableException e) {
368
// Handle table conversion errors
369
logger.error("Table conversion failed: {}", e.getMessage());
370
371
} catch (IllegalArgumentException e) {
372
// Handle invalid arguments
373
logger.error("Invalid argument: {}", e.getMessage());
374
}
375
```
376
377
## Performance Considerations
378
379
1. **Schema Inference**: Explicit schema definition is more efficient than automatic inference
380
2. **Type Conversion**: Direct POJO conversion avoids Row overhead
381
3. **Changelog Overhead**: Changelog streams have additional metadata overhead
382
4. **Watermark Propagation**: Ensure proper watermark handling in conversions