Apache Flink SQL Avro format library that provides bundled and shaded Apache Avro dependencies for SQL usage in Flink applications.
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
The Apache Flink SQL Avro format library provides SQL support for Apache Avro data format within Flink applications. This library is a bundling and shading module that packages Apache Avro dependencies with appropriate relocations to avoid classpath conflicts, enabling seamless Avro format processing in Flink SQL pipelines.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-avro</artifactId>
<version>2.1.0</version>
</dependency>import org.apache.flink.formats.avro.AvroDeserializationSchema;
import org.apache.flink.formats.avro.AvroSerializationSchema;
import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
import org.apache.flink.formats.avro.AvroFormatOptions;
import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;// Create table with Avro format in Flink SQL
String createTableSQL = """
CREATE TABLE avro_table (
id INT,
name STRING,
timestamp_col TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'avro'
)
""";// Create deserializer for GenericRecord
Schema avroSchema = SchemaBuilder.record("MyRecord")
.fields()
.name("id").type().intType().noDefault()
.name("name").type().stringType().noDefault()
.endRecord();
AvroDeserializationSchema<GenericRecord> deserializer =
AvroDeserializationSchema.forGeneric(avroSchema);
// Create serializer for RowData
RowType rowType = RowType.of(
new DataType[] {DataTypes.INT(), DataTypes.STRING()},
new String[] {"id", "name"}
);
AvroRowDataSerializationSchema serializer =
new AvroRowDataSerializationSchema(rowType, AvroEncoding.BINARY, false);The library provides several layers of Avro integration:
All Apache Avro, Jackson, and Commons Compress dependencies are relocated to prevent classpath conflicts:
org.apache.avro.* → org.apache.flink.avro.shaded.org.apache.avro.*com.fasterxml.jackson.* → org.apache.flink.avro.shaded.com.fasterxml.jackson.*org.apache.commons.compress.* → org.apache.flink.avro.shaded.org.apache.commons.compress.*Configuration options for controlling Avro format behavior.
public enum AvroEncoding {
BINARY, // Binary encoding for serialization/deserialization
JSON // JSON encoding for serialization/deserialization
}
public static final ConfigOption<String> AVRO_OUTPUT_CODEC;
public static final ConfigOption<AvroEncoding> AVRO_ENCODING;
public static final ConfigOption<Boolean> AVRO_TIMESTAMP_LEGACY_MAPPING;Core functionality for converting between Java objects and Avro format.
public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema);
public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema);
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> recordClazz);
public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> recordClazz);Schema-based Serialization and Deserialization
Integration with Flink's internal RowData format for table processing.
public class AvroRowDataDeserializationSchema implements DeserializationSchema<RowData>;
public class AvroRowDataSerializationSchema implements SerializationSchema<RowData>;Bulk reading and writing of Avro files for batch processing.
public class AvroInputFormat<T> extends FileInputFormat<T>;
public class AvroOutputFormat<T> extends FileOutputFormat<T>;
public class AvroBulkWriter<T> implements BulkWriter<T>;
public interface AvroBuilder<T>;Support for messages with embedded schema information using configurable schema coders.
public class RegistryAvroDeserializationSchema<T> extends AvroDeserializationSchema<T>;
public class RegistryAvroSerializationSchema<T> extends AvroSerializationSchema<T>;
public interface SchemaCoder;Schema-Encoded Message Support
Utilities for converting between Flink types and Avro schemas.
public class AvroSchemaConverter {
public static Schema convertToSchema(LogicalType logicalType);
public static DataType convertToDataType(String avroSchemaString);
public static <T extends SpecificRecord> TypeInformation<Row> convertToTypeInfo(Class<T> avroClass);
}
public class AvroTypeInfo<T> extends TypeInformation<T>;
public class GenericRecordAvroTypeInfo extends AvroTypeInfo<GenericRecord>;