Apache Flink SQL Avro format library that provides bundled and shaded Apache Avro dependencies for SQL usage in Flink applications.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-avro@2.1.0The Apache Flink SQL Avro format library provides SQL support for Apache Avro data format within Flink applications. This library is a bundling and shading module that packages Apache Avro dependencies with appropriate relocations to avoid classpath conflicts, enabling seamless Avro format processing in Flink SQL pipelines.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-avro</artifactId>
<version>2.1.0</version>
</dependency>import org.apache.flink.formats.avro.AvroDeserializationSchema;
import org.apache.flink.formats.avro.AvroSerializationSchema;
import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
import org.apache.flink.formats.avro.AvroFormatOptions;
import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;// Create table with Avro format in Flink SQL
String createTableSQL = """
CREATE TABLE avro_table (
id INT,
name STRING,
timestamp_col TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'avro'
)
""";// Create deserializer for GenericRecord
Schema avroSchema = SchemaBuilder.record("MyRecord")
.fields()
.name("id").type().intType().noDefault()
.name("name").type().stringType().noDefault()
.endRecord();
AvroDeserializationSchema<GenericRecord> deserializer =
AvroDeserializationSchema.forGeneric(avroSchema);
// Create serializer for RowData
RowType rowType = RowType.of(
new DataType[] {DataTypes.INT(), DataTypes.STRING()},
new String[] {"id", "name"}
);
AvroRowDataSerializationSchema serializer =
new AvroRowDataSerializationSchema(rowType, AvroEncoding.BINARY, false);The library provides several layers of Avro integration:
All Apache Avro, Jackson, and Commons Compress dependencies are relocated to prevent classpath conflicts:
org.apache.avro.* → org.apache.flink.avro.shaded.org.apache.avro.*com.fasterxml.jackson.* → org.apache.flink.avro.shaded.com.fasterxml.jackson.*org.apache.commons.compress.* → org.apache.flink.avro.shaded.org.apache.commons.compress.*Configuration options for controlling Avro format behavior.
public enum AvroEncoding {
BINARY, // Binary encoding for serialization/deserialization
JSON // JSON encoding for serialization/deserialization
}
public static final ConfigOption<String> AVRO_OUTPUT_CODEC;
public static final ConfigOption<AvroEncoding> AVRO_ENCODING;
public static final ConfigOption<Boolean> AVRO_TIMESTAMP_LEGACY_MAPPING;Core functionality for converting between Java objects and Avro format.
public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema);
public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema);
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> recordClazz);
public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> recordClazz);Schema-based Serialization and Deserialization
Integration with Flink's internal RowData format for table processing.
public class AvroRowDataDeserializationSchema implements DeserializationSchema<RowData>;
public class AvroRowDataSerializationSchema implements SerializationSchema<RowData>;Bulk reading and writing of Avro files for batch processing.
public class AvroInputFormat<T> extends FileInputFormat<T>;
public class AvroOutputFormat<T> extends FileOutputFormat<T>;
public class AvroBulkWriter<T> implements BulkWriter<T>;
public interface AvroBuilder<T>;Support for messages with embedded schema information using configurable schema coders.
public class RegistryAvroDeserializationSchema<T> extends AvroDeserializationSchema<T>;
public class RegistryAvroSerializationSchema<T> extends AvroSerializationSchema<T>;
public interface SchemaCoder;Schema-Encoded Message Support
Utilities for converting between Flink types and Avro schemas.
public class AvroSchemaConverter {
public static Schema convertToSchema(LogicalType logicalType);
public static DataType convertToDataType(String avroSchemaString);
public static <T extends SpecificRecord> TypeInformation<Row> convertToTypeInfo(Class<T> avroClass);
}
public class AvroTypeInfo<T> extends TypeInformation<T>;
public class GenericRecordAvroTypeInfo extends AvroTypeInfo<GenericRecord>;