0
# Row Data Integration
1
2
## Capabilities
3
4
### RowData Deserialization
5
6
Integration with Flink's internal RowData format for table API and SQL processing.
7
8
```java { .api }
9
/**
10
* Deserialization schema that converts Avro records to Flink RowData
11
* Marked as @PublicEvolving API
12
*/
13
public class AvroRowDataDeserializationSchema implements DeserializationSchema<RowData> {
14
15
/**
16
* Creates an AvroRowDataDeserializationSchema with default settings
17
* @param rowType The Flink RowType describing the target schema
18
* @param typeInfo Type information for the RowData
19
*/
20
public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> typeInfo);
21
22
/**
23
* Creates an AvroRowDataDeserializationSchema with custom encoding
24
* @param rowType The Flink RowType describing the target schema
25
* @param typeInfo Type information for the RowData
26
* @param encoding Avro encoding type (BINARY or JSON)
27
*/
28
public AvroRowDataDeserializationSchema(
29
RowType rowType,
30
TypeInformation<RowData> typeInfo,
31
AvroEncoding encoding);
32
33
/**
34
* Creates an AvroRowDataDeserializationSchema with full configuration
35
* @param rowType The Flink RowType describing the target schema
36
* @param typeInfo Type information for the RowData
37
* @param encoding Avro encoding type (BINARY or JSON)
38
* @param legacyTimestampMapping Whether to use legacy timestamp mapping
39
*/
40
public AvroRowDataDeserializationSchema(
41
RowType rowType,
42
TypeInformation<RowData> typeInfo,
43
AvroEncoding encoding,
44
boolean legacyTimestampMapping);
45
46
/**
47
* Creates an AvroRowDataDeserializationSchema with custom nested schema
48
* @param nestedSchema The nested Avro deserialization schema
49
* @param runtimeConverter Runtime converter for Avro to RowData conversion
50
* @param typeInfo Type information for the RowData
51
*/
52
public AvroRowDataDeserializationSchema(
53
DeserializationSchema<GenericRecord> nestedSchema,
54
AvroToRowDataConverters.AvroToRowDataConverter runtimeConverter,
55
TypeInformation<RowData> typeInfo);
56
57
/**
58
* Deserializes byte array to RowData
59
* @param message Serialized Avro message bytes
60
* @return RowData instance, or null if message is null
61
* @throws IOException If deserialization fails
62
*/
63
public RowData deserialize(byte[] message) throws IOException;
64
65
/**
66
* Checks if element signals end of stream
67
* @param nextElement The RowData element to check
68
* @return Always false for Avro records
69
*/
70
public boolean isEndOfStream(RowData nextElement);
71
72
/**
73
* Gets the type information for produced RowData
74
* @return TypeInformation for RowData
75
*/
76
public TypeInformation<RowData> getProducedType();
77
}
78
```
79
80
### RowData Serialization
81
82
```java { .api }
83
/**
84
* Serialization schema that converts Flink RowData to Avro format
85
*/
86
public class AvroRowDataSerializationSchema implements SerializationSchema<RowData> {
87
88
/**
89
* Creates an AvroRowDataSerializationSchema with default settings
90
* @param rowType The Flink RowType describing the source schema
91
*/
92
public AvroRowDataSerializationSchema(RowType rowType);
93
94
/**
95
* Creates an AvroRowDataSerializationSchema with custom encoding
96
* @param rowType The Flink RowType describing the source schema
97
* @param encoding Avro encoding type (BINARY or JSON)
98
*/
99
public AvroRowDataSerializationSchema(RowType rowType, AvroEncoding encoding);
100
101
/**
102
* Creates an AvroRowDataSerializationSchema with full configuration
103
* @param rowType The Flink RowType describing the source schema
104
* @param encoding Avro encoding type (BINARY or JSON)
105
* @param legacyTimestampMapping Whether to use legacy timestamp mapping
106
*/
107
public AvroRowDataSerializationSchema(
108
RowType rowType,
109
AvroEncoding encoding,
110
boolean legacyTimestampMapping);
111
112
/**
113
* Creates an AvroRowDataSerializationSchema with custom nested schema
114
* @param rowType The Flink RowType describing the source schema
115
* @param nestedSchema The nested Avro serialization schema
116
* @param runtimeConverter Runtime converter for RowData to Avro conversion
117
*/
118
public AvroRowDataSerializationSchema(
119
RowType rowType,
120
SerializationSchema<GenericRecord> nestedSchema,
121
RowDataToAvroConverters.RowDataToAvroConverter runtimeConverter);
122
123
/**
124
* Serializes RowData to byte array
125
* @param rowData The RowData to serialize
126
* @return Serialized byte array
127
*/
128
public byte[] serialize(RowData rowData);
129
130
/**
131
* Opens the serializer for initialization
132
* @param context Initialization context
133
* @throws Exception If initialization fails
134
*/
135
public void open(InitializationContext context) throws Exception;
136
}
137
```
138
139
### Type Conversion Utilities
140
141
```java { .api }
142
/**
143
* Utilities for converting Avro records to Flink RowData
144
*/
145
public class AvroToRowDataConverters {
146
147
/**
148
* Interface for converting Avro records to RowData
149
*/
150
public interface AvroToRowDataConverter {
151
/**
152
* Converts an Avro record to RowData
153
* @param avroObject The Avro record to convert
154
* @return Converted RowData
155
*/
156
Object convert(Object avroObject);
157
}
158
159
/**
160
* Creates a converter for the given RowType
161
* @param rowType The target Flink RowType
162
* @return Converter instance
163
*/
164
public static AvroToRowDataConverter createRowConverter(RowType rowType);
165
}
166
167
/**
168
* Utilities for converting Flink RowData to Avro records
169
*/
170
public class RowDataToAvroConverters {
171
172
/**
173
* Interface for converting RowData to Avro records
174
*/
175
public interface RowDataToAvroConverter {
176
/**
177
* Converts RowData to an Avro record
178
* @param schema Target Avro schema
179
* @param rowData The RowData to convert
180
* @return Converted Avro record
181
*/
182
Object convert(Schema schema, RowData rowData);
183
}
184
185
/**
186
* Creates a converter for the given RowType
187
* @param rowType The source Flink RowType
188
* @return Converter instance
189
*/
190
public static RowDataToAvroConverter createConverter(RowType rowType);
191
}
192
```
193
194
## Usage Examples
195
196
### Table API Integration
197
198
```java
199
// Define Flink table schema
200
RowType rowType = RowType.of(
201
new DataType[] {
202
DataTypes.BIGINT(),
203
DataTypes.STRING(),
204
DataTypes.STRING(),
205
DataTypes.TIMESTAMP(3)
206
},
207
new String[] {"user_id", "username", "email", "created_at"}
208
);
209
210
// Create type information
211
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(rowType);
212
213
// Create deserializer for table processing
214
AvroRowDataDeserializationSchema deserializer =
215
new AvroRowDataDeserializationSchema(
216
rowType,
217
typeInfo,
218
AvroEncoding.BINARY,
219
false // Use correct timestamp mapping
220
);
221
222
// Create serializer for output
223
AvroRowDataSerializationSchema serializer =
224
new AvroRowDataSerializationSchema(
225
rowType,
226
AvroEncoding.BINARY,
227
false
228
);
229
```
230
231
### DataStream to Table Conversion
232
233
```java
234
// Convert byte stream to RowData
235
DataStream<byte[]> avroStream = // ... your Avro byte stream
236
237
DataStream<RowData> rowDataStream = avroStream
238
.map(new MapFunction<byte[], RowData>() {
239
private AvroRowDataDeserializationSchema deserializer;
240
241
@Override
242
public void open(Configuration parameters) throws Exception {
243
RowType rowType = RowType.of(
244
new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
245
new String[] {"id", "name"}
246
);
247
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(rowType);
248
249
deserializer = new AvroRowDataDeserializationSchema(
250
rowType, typeInfo, AvroEncoding.BINARY, false
251
);
252
}
253
254
@Override
255
public RowData map(byte[] value) throws Exception {
256
return deserializer.deserialize(value);
257
}
258
});
259
260
// Convert to Table for SQL processing
261
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
262
Table table = tableEnv.fromDataStream(rowDataStream);
263
264
// Register as temporary table
265
tableEnv.createTemporaryView("my_table", table);
266
267
// Process with SQL
268
Table result = tableEnv.sqlQuery("SELECT id, UPPER(name) as name FROM my_table WHERE id > 100");
269
```
270
271
### Custom Type Conversions
272
273
```java
274
// Create custom converter for complex types
275
RowType complexRowType = RowType.of(
276
new DataType[] {
277
DataTypes.ROW(
278
DataTypes.FIELD("street", DataTypes.STRING()),
279
DataTypes.FIELD("city", DataTypes.STRING()),
280
DataTypes.FIELD("zipcode", DataTypes.STRING())
281
),
282
DataTypes.ARRAY(DataTypes.STRING()),
283
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())
284
},
285
new String[] {"address", "tags", "metadata"}
286
);
287
288
// Create converter
289
AvroToRowDataConverters.AvroToRowDataConverter converter =
290
AvroToRowDataConverters.createRowConverter(complexRowType);
291
292
// Use converter manually
293
GenericRecord avroRecord = // ... your Avro record
294
RowData rowData = (RowData) converter.convert(avroRecord);
295
296
// For serialization direction
297
RowDataToAvroConverters.RowDataToAvroConverter toAvroConverter =
298
RowDataToAvroConverters.createConverter(complexRowType);
299
300
Schema avroSchema = AvroSchemaConverter.convertToSchema(complexRowType);
301
GenericRecord converted = (GenericRecord) toAvroConverter.convert(avroSchema, rowData);
302
```
303
304
### SQL Table with Custom Schema
305
306
```java
307
// Create table with Avro format and specific configuration
308
String createTableSQL = """
309
CREATE TABLE user_events (
310
user_id BIGINT,
311
event_type STRING,
312
event_data ROW<
313
action STRING,
314
target STRING,
315
metadata MAP<STRING, STRING>
316
>,
317
event_time TIMESTAMP(3),
318
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
319
) WITH (
320
'connector' = 'kafka',
321
'topic' = 'user-events',
322
'properties.bootstrap.servers' = 'localhost:9092',
323
'format' = 'avro',
324
'avro.encoding' = 'binary',
325
'avro.timestamp_mapping.legacy' = 'false'
326
)
327
""";
328
329
tableEnv.executeSql(createTableSQL);
330
331
// Query the table
332
Table result = tableEnv.sqlQuery("""
333
SELECT
334
user_id,
335
event_type,
336
event_data.action,
337
COUNT(*) as event_count,
338
TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end
339
FROM user_events
340
WHERE event_data.action = 'click'
341
GROUP BY
342
user_id,
343
event_type,
344
event_data.action,
345
TUMBLE(event_time, INTERVAL '1' MINUTE)
346
""");
347
```
348
349
### Handling Nullable Fields
350
351
```java
352
// Schema with nullable fields
353
RowType nullableRowType = RowType.of(
354
new DataType[] {
355
DataTypes.BIGINT().notNull(), // Required field
356
DataTypes.STRING(), // Nullable string
357
DataTypes.INT(), // Nullable int
358
DataTypes.TIMESTAMP(3) // Nullable timestamp
359
},
360
new String[] {"id", "name", "age", "last_login"}
361
);
362
363
// The deserializer automatically handles null values according to Avro schema
364
AvroRowDataDeserializationSchema deserializer =
365
new AvroRowDataDeserializationSchema(
366
nullableRowType,
367
InternalTypeInfo.of(nullableRowType),
368
AvroEncoding.BINARY,
369
false
370
);
371
372
// Access nullable fields safely
373
DataStream<RowData> processed = rowDataStream
374
.map(new MapFunction<RowData, RowData>() {
375
@Override
376
public RowData map(RowData row) throws Exception {
377
// Check if name field is null (field index 1)
378
if (!row.isNullAt(1)) {
379
String name = row.getString(1).toString();
380
// Process non-null name
381
}
382
return row;
383
}
384
});
385
```