0
# Type System Integration
1
2
Type information classes and utilities for seamless integration with Flink's type system. Provides efficient serialization, type safety, and schema conversion between Flink and Avro type systems.
3
4
## AvroTypeInfo
5
6
Type information class for Avro specific record types that extend `SpecificRecordBase`.
7
8
```java { .api }
9
public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {
10
// Constructor
11
public AvroTypeInfo(Class<T> typeClass);
12
13
// Serializer creation
14
public TypeSerializer<T> createSerializer(SerializerConfig config);
15
}
16
```
17
18
### Usage Examples
19
20
**Creating Type Information:**
21
22
```java
23
import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
24
25
// Create type information for specific record
26
AvroTypeInfo<User> userTypeInfo = new AvroTypeInfo<>(User.class);
27
28
// Use in DataStream operations
29
DataStream<User> userStream = env.fromCollection(users, userTypeInfo);
30
31
// Explicit type hints
32
DataStream<User> transformedStream = rawStream
33
.map(data -> parseUser(data))
34
.returns(userTypeInfo);
35
```
36
37
**With DataSet API:**
38
39
```java
40
// Create DataSet with explicit type information
41
DataSet<User> userDataSet = env.fromCollection(userList, userTypeInfo);
42
43
// Type-safe operations
44
DataSet<String> names = userDataSet
45
.map(user -> user.getName().toString())
46
.returns(Types.STRING);
47
```
48
49
## GenericRecordAvroTypeInfo
50
51
Type information class for Avro generic records with schema information.
52
53
```java { .api }
54
public class GenericRecordAvroTypeInfo extends TypeInformation<GenericRecord> {
55
// Constructor
56
public GenericRecordAvroTypeInfo(Schema schema);
57
58
// Type system integration
59
public TypeSerializer<GenericRecord> createSerializer(SerializerConfig config);
60
public boolean isBasicType();
61
public boolean isTupleType();
62
public int getArity();
63
public int getTotalFields();
64
}
65
```
66
67
### Usage Examples
68
69
**Generic Record Processing:**
70
71
```java
72
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
73
import org.apache.avro.Schema;
74
import org.apache.avro.generic.GenericRecord;
75
76
// Parse schema
77
Schema schema = new Schema.Parser().parse(schemaString);
78
79
// Create type information
80
GenericRecordAvroTypeInfo typeInfo = new GenericRecordAvroTypeInfo(schema);
81
82
// Use in streaming
83
DataStream<GenericRecord> recordStream = env
84
.fromCollection(genericRecords, typeInfo);
85
86
// Type-safe transformations
87
DataStream<String> nameStream = recordStream
88
.map(record -> record.get("name").toString())
89
.name("extract-names");
90
```
91
92
**Schema Evolution Support:**
93
94
```java
95
// Handle multiple schema versions
96
Schema readerSchema = getReaderSchema();
97
Schema writerSchema = getWriterSchema();
98
99
GenericRecordAvroTypeInfo readerTypeInfo = new GenericRecordAvroTypeInfo(readerSchema);
100
101
// Process with schema evolution
102
DataStream<GenericRecord> evolvedStream = rawStream
103
.map(new SchemaEvolutionMapper(writerSchema, readerSchema))
104
.returns(readerTypeInfo);
105
```
106
107
## AvroSerializer
108
109
High-performance serializer for Avro types with schema evolution support.
110
111
```java { .api }
112
public class AvroSerializer<T> extends TypeSerializer<T> {
113
// Constructor
114
public AvroSerializer(Class<T> type);
115
116
// Serialization operations
117
public boolean isImmutableType();
118
public TypeSerializer<T> duplicate();
119
public T createInstance();
120
public T copy(T from);
121
public T copy(T from, T reuse);
122
public int getLength();
123
public void serialize(T record, DataOutputView target) throws IOException;
124
public T deserialize(DataInputView source) throws IOException;
125
public T deserialize(T reuse, DataInputView source) throws IOException;
126
public void copy(DataInputView source, DataOutputView target) throws IOException;
127
}
128
```
129
130
### Performance Characteristics
131
132
**Object Reuse:**
133
```java
134
// Serializer supports object reuse for better performance
135
AvroSerializer<User> serializer = new AvroSerializer<>(User.class);
136
137
// Reuse instance across deserialization calls
138
User reusableUser = serializer.createInstance();
139
User deserializedUser = serializer.deserialize(reusableUser, dataInput);
140
```
141
142
**Memory Efficiency:**
143
```java
144
// Efficient copying without full deserialization
145
serializer.copy(inputView, outputView); // Direct byte copying when possible
146
```
147
148
## AvroSchemaConverter
149
150
Utility class for converting between Flink and Avro type systems.
151
152
```java { .api }
153
public class AvroSchemaConverter {
154
// Schema conversion methods
155
public static Schema convertToSchema(RowType rowType);
156
public static Schema convertToSchema(RowType rowType, boolean legacyTimestampMapping);
157
public static LogicalType convertToLogicalType(Schema schema);
158
public static RowType convertToRowType(Schema schema);
159
public static RowType convertToRowType(Schema schema, boolean legacyTimestampMapping);
160
}
161
```
162
163
### Type Conversion Examples
164
165
**Flink RowType to Avro Schema:**
166
167
```java
168
import org.apache.flink.table.types.logical.*;
169
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
170
171
// Define Flink row type
172
RowType rowType = RowType.of(
173
new LogicalType[] {
174
new VarCharType(50), // name
175
new IntType(), // age
176
new BooleanType(), // active
177
new TimestampType(3), // created_at
178
new DecimalType(10, 2), // salary
179
new ArrayType(new VarCharType(20)) // tags
180
},
181
new String[] {"name", "age", "active", "created_at", "salary", "tags"}
182
);
183
184
// Convert to Avro schema
185
Schema avroSchema = AvroSchemaConverter.convertToSchema(rowType);
186
187
// With legacy timestamp mapping
188
Schema legacySchema = AvroSchemaConverter.convertToSchema(rowType, true);
189
```
190
191
**Avro Schema to Flink RowType:**
192
193
```java
194
// Parse Avro schema
195
String schemaJson = "{ \"type\": \"record\", \"name\": \"User\", ... }";
196
Schema schema = new Schema.Parser().parse(schemaJson);
197
198
// Convert to Flink row type
199
RowType flinkRowType = AvroSchemaConverter.convertToRowType(schema);
200
201
// Use in Table API
202
Table table = tableEnv.fromDataStream(stream, Schema.of(flinkRowType));
203
```
204
205
### Type Mapping Reference
206
207
**Primitive Types:**
208
209
| Flink Type | Avro Type | Notes |
210
|------------|-----------|-------|
211
| BooleanType | boolean | Direct mapping |
212
| TinyIntType | int | Promoted to int |
213
| SmallIntType | int | Promoted to int |
214
| IntType | int | Direct mapping |
215
| BigIntType | long | Direct mapping |
216
| FloatType | float | Direct mapping |
217
| DoubleType | double | Direct mapping |
218
| VarCharType/CharType | string | UTF-8 encoding |
219
| VarBinaryType/BinaryType | bytes | Direct mapping |
220
221
**Complex Types:**
222
223
| Flink Type | Avro Type | Notes |
224
|------------|-----------|-------|
225
| ArrayType | array | Element type converted recursively |
226
| MapType | map | Key must be string, value converted |
227
| RowType | record | Fields converted recursively |
228
| MultisetType | map | Special map with int values |
229
230
**Temporal Types:**
231
232
| Flink Type | Avro Type | Logical Type |
233
|------------|-----------|--------------|
234
| DateType | int | date |
235
| TimeType | int | time-millis |
236
| TimestampType | long | timestamp-millis (legacy) |
237
| TimestampType | long | local-timestamp-millis (correct) |
238
| LocalZonedTimestampType | long | timestamp-millis |
239
240
**Decimal Types:**
241
242
| Flink Type | Avro Type | Logical Type |
243
|------------|-----------|--------------|
244
| DecimalType | bytes | decimal with precision/scale |
245
| DecimalType | fixed | decimal with precision/scale |
246
247
## SerializableAvroSchema
248
249
Wrapper class for making Avro Schema objects serializable in Flink operations.
250
251
```java { .api }
252
public class SerializableAvroSchema implements Serializable {
253
// Constructor
254
public SerializableAvroSchema(Schema schema);
255
256
// Schema access
257
public Schema getAvroSchema();
258
259
// Serialization support
260
private void writeObject(ObjectOutputStream out) throws IOException;
261
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException;
262
}
263
```
264
265
### Usage in Distributed Operations
266
267
```java
268
// Wrap schema for serialization
269
Schema schema = new Schema.Parser().parse(schemaString);
270
SerializableAvroSchema serializableSchema = new SerializableAvroSchema(schema);
271
272
// Use in map function
273
DataStream<GenericRecord> processed = recordStream.map(new RichMapFunction<GenericRecord, GenericRecord>() {
274
private transient Schema schema;
275
276
@Override
277
public void open(Configuration parameters) {
278
this.schema = serializableSchema.getAvroSchema();
279
}
280
281
@Override
282
public GenericRecord map(GenericRecord record) throws Exception {
283
// Use schema in processing
284
return processRecord(record, schema);
285
}
286
});
287
```
288
289
## AvroFactory
290
291
Factory utilities for creating Avro-specific data structures and serializers.
292
293
```java { .api }
294
public class AvroFactory {
295
// Schema extraction
296
public static Schema extractAvroSpecificSchema(Class<?> specificRecordClass, SpecificData specificData);
297
298
// SpecificData creation
299
public static SpecificData getSpecificDataForClass(Class<? extends SpecificData> specificDataClass, ClassLoader classLoader);
300
}
301
```
302
303
### Advanced Usage
304
305
```java
306
// Extract schema from specific record class
307
Class<User> userClass = User.class;
308
SpecificData specificData = SpecificData.get();
309
Schema userSchema = AvroFactory.extractAvroSpecificSchema(userClass, specificData);
310
311
// Custom SpecificData with class loader
312
ClassLoader customClassLoader = Thread.currentThread().getContextClassLoader();
313
SpecificData customSpecificData = AvroFactory.getSpecificDataForClass(
314
SpecificData.class,
315
customClassLoader
316
);
317
```
318
319
## Performance Optimization
320
321
### Serializer Configuration
322
323
**Enable Object Reuse:**
324
```java
325
// Configure execution environment for object reuse
326
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
327
env.getConfig().enableObjectReuse(); // Better performance with Avro serializers
328
```
329
330
**Type Hint Usage:**
331
```java
332
// Provide explicit type information to avoid reflection
333
DataStream<User> typedStream = rawStream
334
.map(data -> parseUser(data))
335
.returns(new AvroTypeInfo<>(User.class)); // Avoid type erasure
336
```
337
338
### Schema Caching
339
340
```java
341
// Cache converted schemas to avoid repeated conversion
342
private static final ConcurrentHashMap<String, Schema> SCHEMA_CACHE = new ConcurrentHashMap<>();
343
344
public Schema getCachedSchema(RowType rowType) {
345
String key = rowType.toString();
346
return SCHEMA_CACHE.computeIfAbsent(key,
347
k -> AvroSchemaConverter.convertToSchema(rowType));
348
}
349
```
350
351
## Error Handling
352
353
**Type Mismatch Errors:**
354
```java
355
try {
356
Schema convertedSchema = AvroSchemaConverter.convertToSchema(rowType);
357
} catch (IllegalArgumentException e) {
358
logger.error("Unsupported type conversion: " + e.getMessage());
359
// Handle unsupported type
360
}
361
```
362
363
**Serialization Errors:**
364
```java
365
try {
366
serializer.serialize(record, output);
367
} catch (IOException e) {
368
logger.error("Serialization failed", e);
369
// Implement error recovery
370
}
371
```
372
373
**Schema Evolution Errors:**
374
```java
375
// Handle schema evolution gracefully
376
try {
377
GenericRecord evolved = SchemaEvolution.evolve(record, oldSchema, newSchema);
378
} catch (AvroRuntimeException e) {
379
logger.warn("Schema evolution failed, using default values", e);
380
// Apply default values or skip record
381
}
382
```