0
# DataStream Integration
1
2
Bi-directional conversion between DataStream and Table APIs with support for custom schemas, changelog processing, and type mapping. This enables seamless integration between stream processing and SQL operations.
3
4
## Capabilities
5
6
### DataStream to Table Conversion
7
8
Converts DataStream instances to Table objects for SQL processing.
9
10
```java { .api }
11
/**
12
* Converts DataStream to Table with automatic schema derivation
13
* @param dataStream The input DataStream to convert
14
* @return Table representation of the DataStream
15
*/
16
<T> Table fromDataStream(DataStream<T> dataStream);
17
18
/**
19
* Converts DataStream to Table with custom schema
20
* @param dataStream The input DataStream to convert
21
* @param schema Custom schema for column mapping and metadata
22
* @return Table representation with specified schema
23
*/
24
<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);
25
```
26
27
**Usage Examples:**
28
29
```java
30
import org.apache.flink.streaming.api.datastream.DataStream;
31
import org.apache.flink.table.api.Table;
32
import org.apache.flink.table.api.Schema;
33
import org.apache.flink.table.api.DataTypes;
34
import org.apache.flink.types.Row;
35
36
// Automatic schema derivation
37
DataStream<Row> orderStream = env.fromElements(
38
Row.of(1L, "Alice", 100.0),
39
Row.of(2L, "Bob", 200.0)
40
);
41
Table orderTable = tableEnv.fromDataStream(orderStream);
42
43
// Custom schema with computed columns
44
Schema orderSchema = Schema.newBuilder()
45
.column("order_id", DataTypes.BIGINT())
46
.column("customer", DataTypes.STRING())
47
.column("amount", DataTypes.DOUBLE())
48
.columnByExpression("tax", "amount * 0.1")
49
.columnByExpression("total", "amount + tax")
50
.build();
51
52
Table enrichedTable = tableEnv.fromDataStream(orderStream, orderSchema);
53
```
54
55
### Changelog Stream Processing
56
57
Processes DataStream of Row records with RowKind information for handling insertions, updates, and deletions.
58
59
```java { .api }
60
/**
61
* Converts changelog DataStream to Table with all RowKind changes
62
* @param dataStream DataStream of Row records with RowKind information
63
* @return Table supporting all changelog operations
64
*/
65
Table fromChangelogStream(DataStream<Row> dataStream);
66
67
/**
68
* Converts changelog DataStream to Table with custom schema
69
* @param dataStream DataStream of Row records with RowKind information
70
* @param schema Custom schema for the resulting table
71
* @return Table with specified schema supporting changelog operations
72
*/
73
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);
74
75
/**
76
* Converts changelog DataStream to Table with explicit changelog mode
77
* @param dataStream DataStream of Row records with RowKind information
78
* @param schema Custom schema for the resulting table
79
* @param changelogMode Explicit changelog mode defining allowed operations
80
* @return Table with specified schema and changelog constraints
81
*/
82
Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode);
83
```
84
85
**Usage Examples:**
86
87
```java
88
import org.apache.flink.table.connector.ChangelogMode;
89
import org.apache.flink.types.RowKind;
90
91
// Create changelog stream
92
DataStream<Row> changelogStream = env.fromElements(
93
Row.ofKind(RowKind.INSERT, 1L, "Alice", 100.0),
94
Row.ofKind(RowKind.UPDATE_AFTER, 1L, "Alice", 150.0),
95
Row.ofKind(RowKind.DELETE, 2L, "Bob", 200.0)
96
);
97
98
// Convert with automatic changelog mode
99
Table changelogTable = tableEnv.fromChangelogStream(changelogStream);
100
101
// Convert with upsert mode (no UPDATE_BEFORE required)
102
Table upsertTable = tableEnv.fromChangelogStream(
103
changelogStream,
104
schema,
105
ChangelogMode.upsert()
106
);
107
```
108
109
### Table to DataStream Conversion
110
111
Converts Table objects back to DataStream for further stream processing.
112
113
```java { .api }
114
/**
115
* Converts insert-only Table to DataStream of Row
116
* @param table The Table to convert (must be insert-only)
117
* @return DataStream of Row records
118
*/
119
DataStream<Row> toDataStream(Table table);
120
121
/**
122
* Converts insert-only Table to DataStream of specific type
123
* @param table The Table to convert (must be insert-only)
124
* @param targetClass Target class for type mapping
125
* @return DataStream of specified type
126
*/
127
<T> DataStream<T> toDataStream(Table table, Class<T> targetClass);
128
129
/**
130
* Converts insert-only Table to DataStream with custom data type
131
* @param table The Table to convert (must be insert-only)
132
* @param targetDataType Target data type for conversion
133
* @return DataStream of specified data type
134
*/
135
<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
136
```
137
138
**Usage Examples:**
139
140
```java
141
import org.apache.flink.api.common.typeinfo.Types;
142
import org.apache.flink.table.api.DataTypes;
143
144
// Convert to Row DataStream
145
Table resultTable = tableEnv.sqlQuery("SELECT customer, SUM(amount) as total FROM orders GROUP BY customer");
146
DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
147
148
// Convert to POJO class
149
public static class CustomerTotal {
150
public String customer;
151
public Double total;
152
153
public CustomerTotal() {}
154
public CustomerTotal(String customer, Double total) {
155
this.customer = customer;
156
this.total = total;
157
}
158
}
159
160
DataStream<CustomerTotal> pojoStream = tableEnv.toDataStream(resultTable, CustomerTotal.class);
161
162
// Convert with explicit data type
163
DataStream<CustomerTotal> typedStream = tableEnv.toDataStream(
164
resultTable,
165
DataTypes.of(CustomerTotal.class)
166
);
167
```
168
169
### Changelog DataStream Generation
170
171
Converts updating tables to changelog DataStream with RowKind information.
172
173
```java { .api }
174
/**
175
* Converts Table to changelog DataStream with all RowKind changes
176
* @param table The Table to convert (can be updating or insert-only)
177
* @return DataStream of Row with RowKind information
178
*/
179
DataStream<Row> toChangelogStream(Table table);
180
181
/**
182
* Converts Table to changelog DataStream with custom schema
183
* @param table The Table to convert
184
* @param targetSchema Schema for output format customization
185
* @return DataStream of Row with custom schema and RowKind
186
*/
187
DataStream<Row> toChangelogStream(Table table, Schema targetSchema);
188
189
/**
190
* Converts Table to changelog DataStream with explicit changelog mode
191
* @param table The Table to convert
192
* @param targetSchema Schema for output format
193
* @param changelogMode Required changelog mode for validation
194
* @return DataStream with specified changelog constraints
195
*/
196
DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode);
197
```
198
199
**Usage Examples:**
200
201
```java
202
// Convert updating table to changelog stream
203
Table aggregatedTable = tableEnv.sqlQuery(
204
"SELECT customer, COUNT(*) as order_count FROM orders GROUP BY customer"
205
);
206
DataStream<Row> changelogStream = tableEnv.toChangelogStream(aggregatedTable);
207
208
// Process changelog events
209
changelogStream.map(row -> {
210
RowKind kind = row.getKind();
211
switch (kind) {
212
case INSERT:
213
return "New customer: " + row.getField(0);
214
case UPDATE_AFTER:
215
return "Updated customer: " + row.getField(0);
216
case DELETE:
217
return "Customer removed: " + row.getField(0);
218
default:
219
return "Unknown change: " + row.getField(0);
220
}
221
});
222
```
223
224
### Legacy Conversion Methods (Deprecated)
225
226
Historical conversion methods maintained for backward compatibility.
227
228
```java { .api }
229
/**
230
* @deprecated Use fromDataStream(DataStream, Schema) instead
231
*/
232
@Deprecated
233
<T> Table fromDataStream(DataStream<T> dataStream, Expression... fields);
234
235
/**
236
* @deprecated Use toDataStream(Table, Class) instead
237
*/
238
@Deprecated
239
<T> DataStream<T> toAppendStream(Table table, Class<T> clazz);
240
241
/**
242
* @deprecated Use toDataStream(Table, TypeInformation) instead
243
*/
244
@Deprecated
245
<T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo);
246
247
/**
248
* @deprecated Use toChangelogStream(Table, Schema) instead
249
*/
250
@Deprecated
251
<T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz);
252
```
253
254
## Type Definitions
255
256
### Schema Configuration
257
258
```java { .api }
259
import org.apache.flink.table.api.Schema;
260
import org.apache.flink.table.api.DataTypes;
261
import org.apache.flink.api.common.typeinfo.TypeInformation;
262
import org.apache.flink.table.types.AbstractDataType;
263
import org.apache.flink.api.java.tuple.Tuple2;
264
265
// Advanced schema examples
266
Schema schema = Schema.newBuilder()
267
// Physical columns from DataStream
268
.column("id", DataTypes.BIGINT())
269
.column("name", DataTypes.STRING())
270
271
// Computed columns
272
.columnByExpression("name_length", "CHAR_LENGTH(name)")
273
274
// Metadata columns (timestamp extraction)
275
.columnByMetadata("proc_time", DataTypes.TIMESTAMP_LTZ(3), "timestamp")
276
.columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
277
278
// Watermark strategy
279
.watermark("rowtime", "rowtime - INTERVAL '5' SECOND")
280
281
// Primary key definition
282
.primaryKey("id")
283
.build();
284
```
285
286
### Changelog Modes
287
288
```java { .api }
289
import org.apache.flink.table.connector.ChangelogMode;
290
291
// Available changelog modes
292
ChangelogMode.all() // INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE
293
ChangelogMode.insertOnly() // INSERT only
294
ChangelogMode.upsert() // INSERT, UPDATE_AFTER, DELETE (no UPDATE_BEFORE)
295
```
296
297
### Row Kind Operations
298
299
```java { .api }
300
import org.apache.flink.types.RowKind;
301
302
// RowKind enumeration
303
RowKind.INSERT // New record insertion
304
RowKind.UPDATE_BEFORE // Previous version of updated record
305
RowKind.UPDATE_AFTER // New version of updated record
306
RowKind.DELETE // Record deletion
307
```
308
309
### Type Mapping
310
311
DataStream to Table type mapping follows these rules:
312
313
- **Primitive types**: Direct mapping (int → INT, String → STRING)
314
- **Composite types**: Flattened to top-level columns
315
- **Row types**: Field-by-field mapping with position or name matching
316
- **POJO types**: Field name-based mapping with type validation
317
- **Tuple types**: Position-based mapping (f0, f1, f2, ...)
318
319
Table to DataStream type mapping:
320
321
- **Row class**: Direct field access by position
322
- **POJO classes**: Field name matching with type conversion
323
- **Tuple types**: Position-based assignment with type checking
324
- **Primitive types**: Single-column table conversion
325
326
### Legacy Source/Sink Base Classes (Deprecated)
327
328
Abstract base classes for implementing legacy table sources and sinks with InputFormat and OutputFormat.
329
330
```java { .api }
331
import org.apache.flink.api.common.io.InputFormat;
332
import org.apache.flink.api.common.io.OutputFormat;
333
import org.apache.flink.streaming.api.datastream.DataStreamSink;
334
335
/**
336
* Base class for bounded table sources using InputFormat
337
* @deprecated Use DynamicTableSource instead
338
*/
339
@Deprecated
340
public abstract class InputFormatTableSource<T> implements StreamTableSource<T> {
341
/**
342
* Returns an InputFormat for reading bounded table data
343
* @return InputFormat instance for data reading
344
*/
345
public abstract InputFormat<T, ?> getInputFormat();
346
347
/**
348
* Always returns true indicating this is a bounded source
349
* @return true for bounded sources
350
*/
351
public final boolean isBounded();
352
353
/**
354
* Creates DataStream using InputFormat
355
* @param execEnv StreamExecutionEnvironment for DataStream creation
356
* @return DataStream of table data
357
*/
358
public final DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);
359
}
360
361
/**
362
* Base class for bounded table sinks using OutputFormat
363
* @deprecated Use DynamicTableSink instead
364
*/
365
@Deprecated
366
public abstract class OutputFormatTableSink<T> implements StreamTableSink<T> {
367
/**
368
* Returns an OutputFormat for writing bounded table data
369
* @return OutputFormat instance for data writing
370
*/
371
public abstract OutputFormat<T> getOutputFormat();
372
373
/**
374
* Consumes DataStream using OutputFormat
375
* @param dataStream Input DataStream to consume
376
* @return DataStreamSink transformation
377
*/
378
public final DataStreamSink<T> consumeDataStream(DataStream<T> dataStream);
379
}
380
```
381
382
These legacy base classes are marked as both `@Deprecated` and `@Experimental`, indicating they are maintained for backward compatibility but should not be used in new implementations.