Apache Flink SQL Avro format library that provides bundled and shaded Apache Avro dependencies for SQL usage in Flink applications.
—
Utilities for converting between Flink types and Avro schemas.
/**
* Utility class for converting between Flink RowType and Avro Schema
* Located in org.apache.flink.formats.avro.typeutils package
*/
public class AvroSchemaConverter {
/**
* Converts a Flink LogicalType to an Avro Schema
* @param logicalType The Flink LogicalType to convert
* @return Corresponding Avro Schema
*/
public static Schema convertToSchema(LogicalType logicalType);
/**
* Converts a Flink LogicalType to an Avro Schema with timestamp mapping control
* @param logicalType The Flink LogicalType to convert
* @param legacyTimestampMapping Whether to use legacy timestamp mapping
* @return Corresponding Avro Schema
*/
public static Schema convertToSchema(LogicalType logicalType, boolean legacyTimestampMapping);
/**
* Converts a Flink LogicalType to an Avro Schema with custom row name
* @param logicalType The Flink LogicalType to convert
* @param rowName Name for the root record schema
* @return Corresponding Avro Schema
*/
public static Schema convertToSchema(LogicalType logicalType, String rowName);
/**
* Converts an Avro schema string to a Flink DataType
* @param avroSchemaString The Avro schema definition string
* @return Corresponding Flink DataType
*/
public static DataType convertToDataType(String avroSchemaString);
/**
* Converts an Avro schema string to a Flink DataType with timestamp mapping control
* @param avroSchemaString The Avro schema definition string
* @param legacyTimestampMapping Whether to use legacy timestamp mapping
* @return Corresponding Flink DataType
*/
public static DataType convertToDataType(String avroSchemaString, boolean legacyTimestampMapping);
/**
* Converts an Avro class into Flink TypeInformation
* @param avroClass Avro specific record class
* @return TypeInformation matching the schema
*/
public static <T extends SpecificRecord> TypeInformation<Row> convertToTypeInfo(Class<T> avroClass);
/**
* Converts an Avro schema string into Flink TypeInformation
* @param avroSchemaString Avro schema definition string
* @return TypeInformation matching the schema
*/
public static <T> TypeInformation<T> convertToTypeInfo(String avroSchemaString);
}Type information classes for Avro record types in Flink's type system.
/**
* Type information for Avro records
* @param <T> The Avro record type
*/
public class AvroTypeInfo<T> extends TypeInformation<T> {
/**
* Creates AvroTypeInfo for a specific record class
* @param recordClazz The Avro record class
*/
public AvroTypeInfo(Class<T> recordClazz);
/**
* Creates AvroTypeInfo with explicit schema
* @param recordClazz The Avro record class
* @param schema The Avro schema
*/
public AvroTypeInfo(Class<T> recordClazz, Schema schema);
/**
* Gets the Avro schema for this type
* @return The Avro schema
*/
public Schema getAvroSchema();
/**
* Gets the record class
* @return The Java class for the Avro record
*/
public Class<T> getRecordClazz();
/**
* Creates a serializer for this type
* @param config Execution configuration
* @return TypeSerializer for this Avro type
*/
public TypeSerializer<T> createSerializer(ExecutionConfig config);
}
/**
* Specialized type information for Avro GenericRecord
*/
public class GenericRecordAvroTypeInfo extends AvroTypeInfo<GenericRecord> {
/**
* Creates type information for GenericRecord with schema
* @param schema The Avro schema for the GenericRecord
*/
public GenericRecordAvroTypeInfo(Schema schema);
}/**
* Utility for encoding and decoding Avro schemas
*/
public class SchemaCoder {
/**
* Encodes an Avro schema to a string representation
* @param schema The schema to encode
* @return Encoded schema string
*/
public static String encode(Schema schema);
/**
* Decodes a schema from string representation
* @param encodedSchema The encoded schema string
* @return Decoded Avro schema
*/
public static Schema decode(String encodedSchema);
/**
* Validates schema compatibility between reader and writer schemas
* @param writerSchema The schema used to write data
* @param readerSchema The schema used to read data
* @return true if schemas are compatible
*/
public static boolean isCompatible(Schema writerSchema, Schema readerSchema);
}
/**
* Serializable wrapper for Avro Schema objects
*/
public class SerializableAvroSchema implements Serializable {
/**
* Creates a serializable schema wrapper
* @param schema The Avro schema to wrap
*/
public SerializableAvroSchema(Schema schema);
/**
* Gets the wrapped Avro schema
* @return The Avro schema
*/
public Schema getAvroSchema();
/**
* Creates from schema string
* @param schemaString JSON representation of the schema
* @return SerializableAvroSchema instance
*/
public static SerializableAvroSchema fromString(String schemaString);
}/**
* Utilities for Kryo serialization of Avro objects
*/
public class AvroKryoSerializerUtils {
/**
* Registers Avro types with Kryo for efficient serialization
* @param kryo The Kryo instance to register with
*/
public static void registerAvroTypes(Kryo kryo);
/**
* Creates optimized Kryo serializer for specific Avro record type
* @param recordClass The Avro record class
* @return Configured Kryo serializer
*/
public static <T> Serializer<T> createAvroSerializer(Class<T> recordClass);
}// Convert Flink RowType to Avro Schema
RowType flinkRowType = RowType.of(
new DataType[] {
DataTypes.BIGINT(),
DataTypes.STRING(),
DataTypes.ARRAY(DataTypes.STRING()),
DataTypes.ROW(
DataTypes.FIELD("street", DataTypes.STRING()),
DataTypes.FIELD("city", DataTypes.STRING()),
DataTypes.FIELD("zipcode", DataTypes.STRING())
),
DataTypes.TIMESTAMP(3)
},
new String[] {"user_id", "username", "tags", "address", "created_at"}
);
// Convert to Avro schema
Schema avroSchema = AvroSchemaConverter.convertToSchema(flinkRowType);
System.out.println("Generated Avro Schema:");
System.out.println(avroSchema.toString(true));
// Convert back to Flink RowType
RowType convertedRowType = AvroSchemaConverter.convertToRowType(avroSchema);
// Convert to DataType for table processing
DataType dataType = AvroSchemaConverter.convertToDataType(avroSchema);// Create type information for SpecificRecord
AvroTypeInfo<User> userTypeInfo = new AvroTypeInfo<>(User.class);
// Use in DataStream
DataStream<User> userStream = env
.fromCollection(userList, userTypeInfo)
.map(new MapFunction<User, User>() {
@Override
public User map(User user) throws Exception {
// Process user
return user;
}
});
// Create type information for GenericRecord
Schema schema = new Schema.Parser().parse(schemaString);
GenericRecordAvroTypeInfo genericTypeInfo = new GenericRecordAvroTypeInfo(schema);
DataStream<GenericRecord> genericStream = env
.fromCollection(genericRecordList, genericTypeInfo);
// Get schema from type info
Schema retrievedSchema = userTypeInfo.getAvroSchema();
Class<User> recordClass = userTypeInfo.getRecordClazz();// Schema evolution example
Schema v1Schema = SchemaBuilder.record("User")
.fields()
.name("id").type().longType().noDefault()
.name("name").type().stringType().noDefault()
.endRecord();
Schema v2Schema = SchemaBuilder.record("User")
.fields()
.name("id").type().longType().noDefault()
.name("name").type().stringType().noDefault()
.name("email").type().unionOf().nullType().and().stringType().endUnion().nullDefault()
.endRecord();
// Check compatibility
boolean compatible = SchemaCoder.isCompatible(v1Schema, v2Schema);
System.out.println("Schemas compatible: " + compatible);
// Encode schema for storage/transmission
String encodedSchema = SchemaCoder.encode(v2Schema);
// Decode schema
Schema decodedSchema = SchemaCoder.decode(encodedSchema);
// Create serializable schema for distributed processing
SerializableAvroSchema serializableSchema = new SerializableAvroSchema(v2Schema);// Complex Flink type with nested structures
RowType complexType = RowType.of(
new DataType[] {
DataTypes.ROW(
DataTypes.FIELD("personal", DataTypes.ROW(
DataTypes.FIELD("firstName", DataTypes.STRING()),
DataTypes.FIELD("lastName", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT())
)),
DataTypes.FIELD("contact", DataTypes.ROW(
DataTypes.FIELD("email", DataTypes.STRING()),
DataTypes.FIELD("phones", DataTypes.ARRAY(DataTypes.STRING()))
))
),
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
DataTypes.ARRAY(DataTypes.ROW(
DataTypes.FIELD("type", DataTypes.STRING()),
DataTypes.FIELD("value", DataTypes.STRING())
))
},
new String[] {"user_profile", "metadata", "preferences"}
);
// Convert complex type to Avro schema
Schema complexAvroSchema = AvroSchemaConverter.convertToSchema(complexType);
// The generated schema will have proper Avro types:
// - ROW becomes RECORD
// - ARRAY becomes ARRAY
// - MAP becomes MAP
// - Nested structures are properly represented
System.out.println("Complex Avro Schema:");
System.out.println(complexAvroSchema.toString(true));// Register custom Avro types with Flink's type system
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Register Avro types for better performance
env.getConfig().registerTypeWithKryoSerializer(
GenericRecord.class,
AvroKryoSerializerUtils.createAvroSerializer(GenericRecord.class)
);
// Register specific record types
env.getConfig().registerTypeWithKryoSerializer(
User.class,
AvroKryoSerializerUtils.createAvroSerializer(User.class)
);
// Enable Kryo for all Avro types
AvroKryoSerializerUtils.registerAvroTypes(
env.getConfig().getSerializerConfig().getKryo()
);// Utility for managing schemas with registry
public class AvroSchemaManager {
private final SchemaRegistryClient registryClient;
public AvroSchemaManager(String registryUrl) {
this.registryClient = new CachedSchemaRegistryClient(registryUrl, 100);
}
/**
* Convert Flink table to Avro schema and register
*/
public int registerTableSchema(String subject, RowType rowType) throws IOException {
Schema avroSchema = AvroSchemaConverter.convertToSchema(rowType);
return registryClient.register(subject, avroSchema);
}
/**
* Get Flink RowType from registered schema
*/
public RowType getRowTypeFromRegistry(String subject) throws IOException {
Schema schema = registryClient.getLatestSchemaMetadata(subject).getSchema();
return AvroSchemaConverter.convertToRowType(schema);
}
/**
* Validate schema evolution
*/
public boolean validateEvolution(String subject, RowType newRowType) throws IOException {
Schema newSchema = AvroSchemaConverter.convertToSchema(newRowType);
return registryClient.testCompatibility(subject, newSchema);
}
}
// Usage
AvroSchemaManager schemaManager = new AvroSchemaManager("http://localhost:8081");
// Register table schema
RowType tableSchema = RowType.of(
new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
new String[] {"id", "name"}
);
int schemaId = schemaManager.registerTableSchema("users-value", tableSchema);
// Later, retrieve schema for processing
RowType retrievedSchema = schemaManager.getRowTypeFromRegistry("users-value");// Optimized processing with proper type information
public class OptimizedAvroProcessor {
public static <T> DataStream<T> processAvroStream(
DataStream<byte[]> rawStream,
AvroTypeInfo<T> typeInfo) {
// Create efficient deserializer
AvroDeserializationSchema<T> deserializer =
AvroDeserializationSchema.forGeneric(typeInfo.getAvroSchema());
return rawStream
.map(new MapFunction<byte[], T>() {
@Override
public T map(byte[] value) throws Exception {
return deserializer.deserialize(value);
}
})
.returns(typeInfo); // Explicit type information for optimization
}
public static DataStream<GenericRecord> optimizeGenericRecordProcessing(
DataStream<GenericRecord> stream,
Schema schema) {
// Use optimized type info
GenericRecordAvroTypeInfo typeInfo = new GenericRecordAvroTypeInfo(schema);
return stream
.rebalance() // Distribute load evenly
.map(new RichMapFunction<GenericRecord, GenericRecord>() {
private transient GenericRecord reusableRecord;
@Override
public void open(Configuration parameters) {
// Create reusable record for better performance
reusableRecord = new GenericData.Record(schema);
}
@Override
public GenericRecord map(GenericRecord record) throws Exception {
// Reuse record object to reduce GC pressure
for (Schema.Field field : schema.getFields()) {
reusableRecord.put(field.name(), record.get(field.name()));
}
return reusableRecord;
}
})
.returns(typeInfo);
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-avro