Apache Flink Avro format support library providing serialization and deserialization capabilities for Apache Avro data format within Flink streaming and batch processing applications
—
Type information classes and utilities for seamless integration with Flink's type system. Provides efficient serialization, type safety, and schema conversion between Flink and Avro type systems.
Type information class for Avro specific record types that extend SpecificRecordBase.
public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {
// Constructor
public AvroTypeInfo(Class<T> typeClass);
// Serializer creation
public TypeSerializer<T> createSerializer(SerializerConfig config);
}Creating Type Information:
import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
// Create type information for specific record
AvroTypeInfo<User> userTypeInfo = new AvroTypeInfo<>(User.class);
// Use in DataStream operations
DataStream<User> userStream = env.fromCollection(users, userTypeInfo);
// Explicit type hints
DataStream<User> transformedStream = rawStream
.map(data -> parseUser(data))
.returns(userTypeInfo);With DataSet API:
// Create DataSet with explicit type information
DataSet<User> userDataSet = env.fromCollection(userList, userTypeInfo);
// Type-safe operations
DataSet<String> names = userDataSet
.map(user -> user.getName().toString())
.returns(Types.STRING);Type information class for Avro generic records with schema information.
public class GenericRecordAvroTypeInfo extends TypeInformation<GenericRecord> {
// Constructor
public GenericRecordAvroTypeInfo(Schema schema);
// Type system integration
public TypeSerializer<GenericRecord> createSerializer(SerializerConfig config);
public boolean isBasicType();
public boolean isTupleType();
public int getArity();
public int getTotalFields();
}Generic Record Processing:
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
// Parse schema
Schema schema = new Schema.Parser().parse(schemaString);
// Create type information
GenericRecordAvroTypeInfo typeInfo = new GenericRecordAvroTypeInfo(schema);
// Use in streaming
DataStream<GenericRecord> recordStream = env
.fromCollection(genericRecords, typeInfo);
// Type-safe transformations
DataStream<String> nameStream = recordStream
.map(record -> record.get("name").toString())
.name("extract-names");Schema Evolution Support:
// Handle multiple schema versions
Schema readerSchema = getReaderSchema();
Schema writerSchema = getWriterSchema();
GenericRecordAvroTypeInfo readerTypeInfo = new GenericRecordAvroTypeInfo(readerSchema);
// Process with schema evolution
DataStream<GenericRecord> evolvedStream = rawStream
.map(new SchemaEvolutionMapper(writerSchema, readerSchema))
.returns(readerTypeInfo);High-performance serializer for Avro types with schema evolution support.
public class AvroSerializer<T> extends TypeSerializer<T> {
// Constructor
public AvroSerializer(Class<T> type);
// Serialization operations
public boolean isImmutableType();
public TypeSerializer<T> duplicate();
public T createInstance();
public T copy(T from);
public T copy(T from, T reuse);
public int getLength();
public void serialize(T record, DataOutputView target) throws IOException;
public T deserialize(DataInputView source) throws IOException;
public T deserialize(T reuse, DataInputView source) throws IOException;
public void copy(DataInputView source, DataOutputView target) throws IOException;
}Object Reuse:
// Serializer supports object reuse for better performance
AvroSerializer<User> serializer = new AvroSerializer<>(User.class);
// Reuse instance across deserialization calls
User reusableUser = serializer.createInstance();
User deserializedUser = serializer.deserialize(reusableUser, dataInput);Memory Efficiency:
// Efficient copying without full deserialization
serializer.copy(inputView, outputView); // Direct byte copying when possibleUtility class for converting between Flink and Avro type systems.
public class AvroSchemaConverter {
// Schema conversion methods
public static Schema convertToSchema(RowType rowType);
public static Schema convertToSchema(RowType rowType, boolean legacyTimestampMapping);
public static LogicalType convertToLogicalType(Schema schema);
public static RowType convertToRowType(Schema schema);
public static RowType convertToRowType(Schema schema, boolean legacyTimestampMapping);
}Flink RowType to Avro Schema:
import org.apache.flink.table.types.logical.*;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
// Define Flink row type
RowType rowType = RowType.of(
new LogicalType[] {
new VarCharType(50), // name
new IntType(), // age
new BooleanType(), // active
new TimestampType(3), // created_at
new DecimalType(10, 2), // salary
new ArrayType(new VarCharType(20)) // tags
},
new String[] {"name", "age", "active", "created_at", "salary", "tags"}
);
// Convert to Avro schema
Schema avroSchema = AvroSchemaConverter.convertToSchema(rowType);
// With legacy timestamp mapping
Schema legacySchema = AvroSchemaConverter.convertToSchema(rowType, true);Avro Schema to Flink RowType:
// Parse Avro schema
String schemaJson = "{ \"type\": \"record\", \"name\": \"User\", ... }";
Schema schema = new Schema.Parser().parse(schemaJson);
// Convert to Flink row type
RowType flinkRowType = AvroSchemaConverter.convertToRowType(schema);
// Use in Table API
Table table = tableEnv.fromDataStream(stream, Schema.of(flinkRowType));Primitive Types:
| Flink Type | Avro Type | Notes |
|---|---|---|
| BooleanType | boolean | Direct mapping |
| TinyIntType | int | Promoted to int |
| SmallIntType | int | Promoted to int |
| IntType | int | Direct mapping |
| BigIntType | long | Direct mapping |
| FloatType | float | Direct mapping |
| DoubleType | double | Direct mapping |
| VarCharType/CharType | string | UTF-8 encoding |
| VarBinaryType/BinaryType | bytes | Direct mapping |
Complex Types:
| Flink Type | Avro Type | Notes |
|---|---|---|
| ArrayType | array | Element type converted recursively |
| MapType | map | Key must be string, value converted |
| RowType | record | Fields converted recursively |
| MultisetType | map | Special map with int values |
Temporal Types:
| Flink Type | Avro Type | Logical Type |
|---|---|---|
| DateType | int | date |
| TimeType | int | time-millis |
| TimestampType | long | timestamp-millis (legacy) |
| TimestampType | long | local-timestamp-millis (correct) |
| LocalZonedTimestampType | long | timestamp-millis |
Decimal Types:
| Flink Type | Avro Type | Logical Type |
|---|---|---|
| DecimalType | bytes | decimal with precision/scale |
| DecimalType | fixed | decimal with precision/scale |
Wrapper class for making Avro Schema objects serializable in Flink operations.
public class SerializableAvroSchema implements Serializable {
// Constructor
public SerializableAvroSchema(Schema schema);
// Schema access
public Schema getAvroSchema();
// Serialization support
private void writeObject(ObjectOutputStream out) throws IOException;
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException;
}// Wrap schema for serialization
Schema schema = new Schema.Parser().parse(schemaString);
SerializableAvroSchema serializableSchema = new SerializableAvroSchema(schema);
// Use in map function
DataStream<GenericRecord> processed = recordStream.map(new RichMapFunction<GenericRecord, GenericRecord>() {
private transient Schema schema;
@Override
public void open(Configuration parameters) {
this.schema = serializableSchema.getAvroSchema();
}
@Override
public GenericRecord map(GenericRecord record) throws Exception {
// Use schema in processing
return processRecord(record, schema);
}
});Factory utilities for creating Avro-specific data structures and serializers.
public class AvroFactory {
// Schema extraction
public static Schema extractAvroSpecificSchema(Class<?> specificRecordClass, SpecificData specificData);
// SpecificData creation
public static SpecificData getSpecificDataForClass(Class<? extends SpecificData> specificDataClass, ClassLoader classLoader);
}// Extract schema from specific record class
Class<User> userClass = User.class;
SpecificData specificData = SpecificData.get();
Schema userSchema = AvroFactory.extractAvroSpecificSchema(userClass, specificData);
// Custom SpecificData with class loader
ClassLoader customClassLoader = Thread.currentThread().getContextClassLoader();
SpecificData customSpecificData = AvroFactory.getSpecificDataForClass(
SpecificData.class,
customClassLoader
);Enable Object Reuse:
// Configure execution environment for object reuse
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse(); // Better performance with Avro serializersType Hint Usage:
// Provide explicit type information to avoid reflection
DataStream<User> typedStream = rawStream
.map(data -> parseUser(data))
.returns(new AvroTypeInfo<>(User.class)); // Avoid type erasure// Cache converted schemas to avoid repeated conversion
private static final ConcurrentHashMap<String, Schema> SCHEMA_CACHE = new ConcurrentHashMap<>();
public Schema getCachedSchema(RowType rowType) {
String key = rowType.toString();
return SCHEMA_CACHE.computeIfAbsent(key,
k -> AvroSchemaConverter.convertToSchema(rowType));
}Type Mismatch Errors:
try {
Schema convertedSchema = AvroSchemaConverter.convertToSchema(rowType);
} catch (IllegalArgumentException e) {
logger.error("Unsupported type conversion: " + e.getMessage());
// Handle unsupported type
}Serialization Errors:
try {
serializer.serialize(record, output);
} catch (IOException e) {
logger.error("Serialization failed", e);
// Implement error recovery
}Schema Evolution Errors:
// Handle schema evolution gracefully
try {
GenericRecord evolved = SchemaEvolution.evolve(record, oldSchema, newSchema);
} catch (AvroRuntimeException e) {
logger.warn("Schema evolution failed, using default values", e);
// Apply default values or skip record
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-avro