0
# Type System and Utilities
1
2
## Capabilities
3
4
### Schema Conversion Utilities
5
6
Utilities for converting between Flink types and Avro schemas.
7
8
```java { .api }
9
/**
10
* Utility class for converting between Flink RowType and Avro Schema
11
* Located in org.apache.flink.formats.avro.typeutils package
12
*/
13
public class AvroSchemaConverter {
14
15
/**
16
* Converts a Flink LogicalType to an Avro Schema
17
* @param logicalType The Flink LogicalType to convert
18
* @return Corresponding Avro Schema
19
*/
20
public static Schema convertToSchema(LogicalType logicalType);
21
22
/**
23
* Converts a Flink LogicalType to an Avro Schema with timestamp mapping control
24
* @param logicalType The Flink LogicalType to convert
25
* @param legacyTimestampMapping Whether to use legacy timestamp mapping
26
* @return Corresponding Avro Schema
27
*/
28
public static Schema convertToSchema(LogicalType logicalType, boolean legacyTimestampMapping);
29
30
/**
31
* Converts a Flink LogicalType to an Avro Schema with custom row name
32
* @param logicalType The Flink LogicalType to convert
33
* @param rowName Name for the root record schema
34
* @return Corresponding Avro Schema
35
*/
36
public static Schema convertToSchema(LogicalType logicalType, String rowName);
37
38
/**
39
* Converts an Avro schema string to a Flink DataType
40
* @param avroSchemaString The Avro schema definition string
41
* @return Corresponding Flink DataType
42
*/
43
public static DataType convertToDataType(String avroSchemaString);
44
45
/**
46
* Converts an Avro schema string to a Flink DataType with timestamp mapping control
47
* @param avroSchemaString The Avro schema definition string
48
* @param legacyTimestampMapping Whether to use legacy timestamp mapping
49
* @return Corresponding Flink DataType
50
*/
51
public static DataType convertToDataType(String avroSchemaString, boolean legacyTimestampMapping);
52
53
/**
54
* Converts an Avro class into Flink TypeInformation
55
* @param avroClass Avro specific record class
56
* @return TypeInformation matching the schema
57
*/
58
public static <T extends SpecificRecord> TypeInformation<Row> convertToTypeInfo(Class<T> avroClass);
59
60
/**
61
* Converts an Avro schema string into Flink TypeInformation
62
* @param avroSchemaString Avro schema definition string
63
* @return TypeInformation matching the schema
64
*/
65
public static <T> TypeInformation<T> convertToTypeInfo(String avroSchemaString);
66
}
67
```
68
69
### Type Information Classes
70
71
Type information classes for Avro record types in Flink's type system.
72
73
```java { .api }
74
/**
75
* Type information for Avro records
76
* @param <T> The Avro record type
77
*/
78
public class AvroTypeInfo<T> extends TypeInformation<T> {
79
80
/**
81
* Creates AvroTypeInfo for a specific record class
82
* @param recordClazz The Avro record class
83
*/
84
public AvroTypeInfo(Class<T> recordClazz);
85
86
/**
87
* Creates AvroTypeInfo with explicit schema
88
* @param recordClazz The Avro record class
89
* @param schema The Avro schema
90
*/
91
public AvroTypeInfo(Class<T> recordClazz, Schema schema);
92
93
/**
94
* Gets the Avro schema for this type
95
* @return The Avro schema
96
*/
97
public Schema getAvroSchema();
98
99
/**
100
* Gets the record class
101
* @return The Java class for the Avro record
102
*/
103
public Class<T> getRecordClazz();
104
105
/**
106
* Creates a serializer for this type
107
* @param config Execution configuration
108
* @return TypeSerializer for this Avro type
109
*/
110
public TypeSerializer<T> createSerializer(ExecutionConfig config);
111
}
112
113
/**
114
* Specialized type information for Avro GenericRecord
115
*/
116
public class GenericRecordAvroTypeInfo extends AvroTypeInfo<GenericRecord> {
117
118
/**
119
* Creates type information for GenericRecord with schema
120
* @param schema The Avro schema for the GenericRecord
121
*/
122
public GenericRecordAvroTypeInfo(Schema schema);
123
}
124
```
125
126
### Schema Utilities
127
128
```java { .api }
129
/**
130
* Utility for encoding and decoding Avro schemas
131
*/
132
public class SchemaCoder {
133
134
/**
135
* Encodes an Avro schema to a string representation
136
* @param schema The schema to encode
137
* @return Encoded schema string
138
*/
139
public static String encode(Schema schema);
140
141
/**
142
* Decodes a schema from string representation
143
* @param encodedSchema The encoded schema string
144
* @return Decoded Avro schema
145
*/
146
public static Schema decode(String encodedSchema);
147
148
/**
149
* Validates schema compatibility between reader and writer schemas
150
* @param writerSchema The schema used to write data
151
* @param readerSchema The schema used to read data
152
* @return true if schemas are compatible
153
*/
154
public static boolean isCompatible(Schema writerSchema, Schema readerSchema);
155
}
156
157
/**
158
* Serializable wrapper for Avro Schema objects
159
*/
160
public class SerializableAvroSchema implements Serializable {
161
162
/**
163
* Creates a serializable schema wrapper
164
* @param schema The Avro schema to wrap
165
*/
166
public SerializableAvroSchema(Schema schema);
167
168
/**
169
* Gets the wrapped Avro schema
170
* @return The Avro schema
171
*/
172
public Schema getAvroSchema();
173
174
/**
175
* Creates from schema string
176
* @param schemaString JSON representation of the schema
177
* @return SerializableAvroSchema instance
178
*/
179
public static SerializableAvroSchema fromString(String schemaString);
180
}
181
```
182
183
### Kryo Serialization Support
184
185
```java { .api }
186
/**
187
* Utilities for Kryo serialization of Avro objects
188
*/
189
public class AvroKryoSerializerUtils {
190
191
/**
192
* Registers Avro types with Kryo for efficient serialization
193
* @param kryo The Kryo instance to register with
194
*/
195
public static void registerAvroTypes(Kryo kryo);
196
197
/**
198
* Creates optimized Kryo serializer for specific Avro record type
199
* @param recordClass The Avro record class
200
* @return Configured Kryo serializer
201
*/
202
public static <T> Serializer<T> createAvroSerializer(Class<T> recordClass);
203
}
204
```
205
206
## Usage Examples
207
208
### Schema Conversion
209
210
```java
211
// Convert Flink RowType to Avro Schema
212
RowType flinkRowType = RowType.of(
213
new DataType[] {
214
DataTypes.BIGINT(),
215
DataTypes.STRING(),
216
DataTypes.ARRAY(DataTypes.STRING()),
217
DataTypes.ROW(
218
DataTypes.FIELD("street", DataTypes.STRING()),
219
DataTypes.FIELD("city", DataTypes.STRING()),
220
DataTypes.FIELD("zipcode", DataTypes.STRING())
221
),
222
DataTypes.TIMESTAMP(3)
223
},
224
new String[] {"user_id", "username", "tags", "address", "created_at"}
225
);
226
227
// Convert to Avro schema
228
Schema avroSchema = AvroSchemaConverter.convertToSchema(flinkRowType);
229
230
System.out.println("Generated Avro Schema:");
231
System.out.println(avroSchema.toString(true));
232
233
// Convert back to Flink RowType
234
RowType convertedRowType = AvroSchemaConverter.convertToRowType(avroSchema);
235
236
// Convert to DataType for table processing
237
DataType dataType = AvroSchemaConverter.convertToDataType(avroSchema);
238
```
239
240
### Type Information Usage
241
242
```java
243
// Create type information for SpecificRecord
244
AvroTypeInfo<User> userTypeInfo = new AvroTypeInfo<>(User.class);
245
246
// Use in DataStream
247
DataStream<User> userStream = env
248
.fromCollection(userList, userTypeInfo)
249
.map(new MapFunction<User, User>() {
250
@Override
251
public User map(User user) throws Exception {
252
// Process user
253
return user;
254
}
255
});
256
257
// Create type information for GenericRecord
258
Schema schema = new Schema.Parser().parse(schemaString);
259
GenericRecordAvroTypeInfo genericTypeInfo = new GenericRecordAvroTypeInfo(schema);
260
261
DataStream<GenericRecord> genericStream = env
262
.fromCollection(genericRecordList, genericTypeInfo);
263
264
// Get schema from type info
265
Schema retrievedSchema = userTypeInfo.getAvroSchema();
266
Class<User> recordClass = userTypeInfo.getRecordClazz();
267
```
268
269
### Schema Validation and Compatibility
270
271
```java
272
// Schema evolution example
273
Schema v1Schema = SchemaBuilder.record("User")
274
.fields()
275
.name("id").type().longType().noDefault()
276
.name("name").type().stringType().noDefault()
277
.endRecord();
278
279
Schema v2Schema = SchemaBuilder.record("User")
280
.fields()
281
.name("id").type().longType().noDefault()
282
.name("name").type().stringType().noDefault()
283
.name("email").type().unionOf().nullType().and().stringType().endUnion().nullDefault()
284
.endRecord();
285
286
// Check compatibility
287
boolean compatible = SchemaCoder.isCompatible(v1Schema, v2Schema);
288
System.out.println("Schemas compatible: " + compatible);
289
290
// Encode schema for storage/transmission
291
String encodedSchema = SchemaCoder.encode(v2Schema);
292
293
// Decode schema
294
Schema decodedSchema = SchemaCoder.decode(encodedSchema);
295
296
// Create serializable schema for distributed processing
297
SerializableAvroSchema serializableSchema = new SerializableAvroSchema(v2Schema);
298
```
299
300
### Complex Type Mapping
301
302
```java
303
// Complex Flink type with nested structures
304
RowType complexType = RowType.of(
305
new DataType[] {
306
DataTypes.ROW(
307
DataTypes.FIELD("personal", DataTypes.ROW(
308
DataTypes.FIELD("firstName", DataTypes.STRING()),
309
DataTypes.FIELD("lastName", DataTypes.STRING()),
310
DataTypes.FIELD("age", DataTypes.INT())
311
)),
312
DataTypes.FIELD("contact", DataTypes.ROW(
313
DataTypes.FIELD("email", DataTypes.STRING()),
314
DataTypes.FIELD("phones", DataTypes.ARRAY(DataTypes.STRING()))
315
))
316
),
317
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
318
DataTypes.ARRAY(DataTypes.ROW(
319
DataTypes.FIELD("type", DataTypes.STRING()),
320
DataTypes.FIELD("value", DataTypes.STRING())
321
))
322
},
323
new String[] {"user_profile", "metadata", "preferences"}
324
);
325
326
// Convert complex type to Avro schema
327
Schema complexAvroSchema = AvroSchemaConverter.convertToSchema(complexType);
328
329
// The generated schema will have proper Avro types:
330
// - ROW becomes RECORD
331
// - ARRAY becomes ARRAY
332
// - MAP becomes MAP
333
// - Nested structures are properly represented
334
335
System.out.println("Complex Avro Schema:");
336
System.out.println(complexAvroSchema.toString(true));
337
```
338
339
### Custom Type Registration
340
341
```java
342
// Register custom Avro types with Flink's type system
343
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
344
345
// Register Avro types for better performance
346
env.getConfig().registerTypeWithKryoSerializer(
347
GenericRecord.class,
348
AvroKryoSerializerUtils.createAvroSerializer(GenericRecord.class)
349
);
350
351
// Register specific record types
352
env.getConfig().registerTypeWithKryoSerializer(
353
User.class,
354
AvroKryoSerializerUtils.createAvroSerializer(User.class)
355
);
356
357
// Enable Kryo for all Avro types
358
AvroKryoSerializerUtils.registerAvroTypes(
359
env.getConfig().getSerializerConfig().getKryo()
360
);
361
```
362
363
### Schema Registry Integration with Utilities
364
365
```java
366
// Utility for managing schemas with registry
367
public class AvroSchemaManager {
368
369
private final SchemaRegistryClient registryClient;
370
371
public AvroSchemaManager(String registryUrl) {
372
this.registryClient = new CachedSchemaRegistryClient(registryUrl, 100);
373
}
374
375
/**
376
* Convert Flink table to Avro schema and register
377
*/
378
public int registerTableSchema(String subject, RowType rowType) throws IOException {
379
Schema avroSchema = AvroSchemaConverter.convertToSchema(rowType);
380
return registryClient.register(subject, avroSchema);
381
}
382
383
/**
384
* Get Flink RowType from registered schema
385
*/
386
public RowType getRowTypeFromRegistry(String subject) throws IOException {
387
Schema schema = registryClient.getLatestSchemaMetadata(subject).getSchema();
388
return AvroSchemaConverter.convertToRowType(schema);
389
}
390
391
/**
392
* Validate schema evolution
393
*/
394
public boolean validateEvolution(String subject, RowType newRowType) throws IOException {
395
Schema newSchema = AvroSchemaConverter.convertToSchema(newRowType);
396
return registryClient.testCompatibility(subject, newSchema);
397
}
398
}
399
400
// Usage
401
AvroSchemaManager schemaManager = new AvroSchemaManager("http://localhost:8081");
402
403
// Register table schema
404
RowType tableSchema = RowType.of(
405
new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
406
new String[] {"id", "name"}
407
);
408
409
int schemaId = schemaManager.registerTableSchema("users-value", tableSchema);
410
411
// Later, retrieve schema for processing
412
RowType retrievedSchema = schemaManager.getRowTypeFromRegistry("users-value");
413
```
414
415
### Performance Optimization with Type Information
416
417
```java
418
// Optimized processing with proper type information
419
public class OptimizedAvroProcessor {
420
421
public static <T> DataStream<T> processAvroStream(
422
DataStream<byte[]> rawStream,
423
AvroTypeInfo<T> typeInfo) {
424
425
// Create efficient deserializer
426
AvroDeserializationSchema<T> deserializer =
427
AvroDeserializationSchema.forGeneric(typeInfo.getAvroSchema());
428
429
return rawStream
430
.map(new MapFunction<byte[], T>() {
431
@Override
432
public T map(byte[] value) throws Exception {
433
return deserializer.deserialize(value);
434
}
435
})
436
.returns(typeInfo); // Explicit type information for optimization
437
}
438
439
public static DataStream<GenericRecord> optimizeGenericRecordProcessing(
440
DataStream<GenericRecord> stream,
441
Schema schema) {
442
443
// Use optimized type info
444
GenericRecordAvroTypeInfo typeInfo = new GenericRecordAvroTypeInfo(schema);
445
446
return stream
447
.rebalance() // Distribute load evenly
448
.map(new RichMapFunction<GenericRecord, GenericRecord>() {
449
private transient GenericRecord reusableRecord;
450
451
@Override
452
public void open(Configuration parameters) {
453
// Create reusable record for better performance
454
reusableRecord = new GenericData.Record(schema);
455
}
456
457
@Override
458
public GenericRecord map(GenericRecord record) throws Exception {
459
// Reuse record object to reduce GC pressure
460
for (Schema.Field field : schema.getFields()) {
461
reusableRecord.put(field.name(), record.get(field.name()));
462
}
463
return reusableRecord;
464
}
465
})
466
.returns(typeInfo);
467
}
468
}
469
```