Apache Flink Avro format support library providing serialization and deserialization capabilities for Apache Avro data format within Flink streaming and batch processing applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-avro@2.1.0Apache Flink Avro format support library providing comprehensive serialization and deserialization capabilities for Apache Avro data format within Flink streaming and batch processing applications. It enables developers to work with Avro data using both generic and specific record types, with support for schema evolution, logical types, and integration with Confluent Schema Registry.
org.apache.flink:flink-avro:2.1.0import org.apache.flink.formats.avro.AvroSerializationSchema;
import org.apache.flink.formats.avro.AvroDeserializationSchema;
import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
import org.apache.flink.formats.avro.AvroInputFormat;
import org.apache.flink.formats.avro.AvroOutputFormat;
import org.apache.flink.formats.avro.AvroWriters;
import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;import org.apache.flink.formats.avro.AvroSerializationSchema;
import org.apache.flink.formats.avro.AvroDeserializationSchema;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
// For generic records
Schema schema = new Schema.Parser().parse(schemaString);
AvroSerializationSchema<GenericRecord> serializer = AvroSerializationSchema.forGeneric(schema);
AvroDeserializationSchema<GenericRecord> deserializer = AvroDeserializationSchema.forGeneric(schema);
// For specific records (generated from schema)
AvroSerializationSchema<User> userSerializer = AvroSerializationSchema.forSpecific(User.class);
AvroDeserializationSchema<User> userDeserializer = AvroDeserializationSchema.forSpecific(User.class);
// Use in DataStream
DataStream<GenericRecord> stream = ...;
stream.map(serializer::serialize);Flink Avro is organized around several key components:
Core serialization and deserialization schemas for converting between Java objects and Avro binary/JSON format in streaming applications.
public class AvroSerializationSchema<T> implements SerializationSchema<T> {
public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> tClass);
public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema);
public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> tClass, AvroEncoding encoding);
public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema, AvroEncoding encoding);
public byte[] serialize(T object);
public Schema getSchema();
public void open(InitializationContext context) throws Exception;
}
public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema);
public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, AvroEncoding encoding);
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass);
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass, AvroEncoding encoding);
public T deserialize(byte[] message) throws IOException;
public TypeInformation<T> getProducedType();
public boolean isEndOfStream(T nextElement);
}Serialization and Deserialization
Row-based serialization and deserialization for integration with Flink's Table API and SQL layer.
public class AvroRowDataSerializationSchema implements SerializationSchema<RowData> {
public AvroRowDataSerializationSchema(RowType rowType);
public AvroRowDataSerializationSchema(RowType rowType, AvroEncoding encoding);
public AvroRowDataSerializationSchema(RowType rowType, AvroEncoding encoding, boolean legacyTimestampMapping);
public AvroRowDataSerializationSchema(RowType rowType, SerializationSchema<GenericRecord> nestedSchema, RowDataToAvroConverters.RowDataToAvroConverter runtimeConverter);
public byte[] serialize(RowData row);
}
public class AvroRowDataDeserializationSchema implements DeserializationSchema<RowData> {
public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> producedTypeInfo);
public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> producedTypeInfo, AvroEncoding encoding);
public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> producedTypeInfo, AvroEncoding encoding, boolean legacyTimestampMapping);
public AvroRowDataDeserializationSchema(DeserializationSchema<GenericRecord> nestedSchema, AvroToRowDataConverters.AvroToRowDataConverter runtimeConverter, TypeInformation<RowData> typeInfo);
public void open(InitializationContext context) throws Exception;
public RowData deserialize(byte[] message) throws IOException;
public TypeInformation<RowData> getProducedType();
}Input and output formats for reading and writing Avro files in batch processing scenarios.
public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E>, CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
public AvroInputFormat(Path filePath, Class<E> type);
public void setReuseAvroValue(boolean reuseAvroValue);
public void setUnsplittable(boolean unsplittable);
public TypeInformation<E> getProducedType();
}
public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
public AvroOutputFormat(Path filePath, Class<E> type);
public AvroOutputFormat(Class<E> type);
public void setSchema(Schema schema);
public void setCodec(Codec codec);
}Factory and writer classes for efficient bulk writing of Avro files with various record types and compression options.
public class AvroWriters {
public static <T extends SpecificRecordBase> AvroWriterFactory<T> forSpecificRecord(Class<T> type);
public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema);
public static <T> AvroWriterFactory<T> forReflectRecord(Class<T> type);
}
public class AvroWriterFactory<T> implements BulkWriter.Factory<T> {
public AvroWriterFactory(AvroBuilder<T> builder);
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
}
public interface AvroBuilder<T> extends Serializable {
DataFileWriter<T> createWriter(OutputStream outputStream) throws IOException;
}
public class AvroBulkWriter<T> implements BulkWriter<T> {
public void addElement(T element) throws IOException;
public void flush() throws IOException;
public void finish() throws IOException;
}Type information classes and utilities for seamless integration with Flink's type system.
public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {
public AvroTypeInfo(Class<T> typeClass);
public TypeSerializer<T> createSerializer(SerializerConfig config);
}
public class GenericRecordAvroTypeInfo extends TypeInformation<GenericRecord> {
public GenericRecordAvroTypeInfo(Schema schema);
}
public class AvroSchemaConverter {
public static Schema convertToSchema(RowType rowType);
public static Schema convertToSchema(RowType rowType, boolean legacyTimestampMapping);
public static LogicalType convertToLogicalType(Schema schema);
}Extended serialization and deserialization schemas with schema registry support using SchemaCoder.
public class RegistryAvroSerializationSchema<T> extends AvroSerializationSchema<T> {
public RegistryAvroSerializationSchema(Class<T> recordClazz, Schema schema, SchemaCoder.SchemaCoderProvider schemaCoderProvider);
public RegistryAvroSerializationSchema(Class<T> recordClazz, Schema schema, SchemaCoder.SchemaCoderProvider schemaCoderProvider, AvroEncoding encoding);
}
public class RegistryAvroDeserializationSchema<T> extends AvroDeserializationSchema<T> {
public RegistryAvroDeserializationSchema(Class<T> recordClazz, Schema reader, SchemaCoder.SchemaCoderProvider schemaCoderProvider);
public RegistryAvroDeserializationSchema(Class<T> recordClazz, Schema reader, SchemaCoder.SchemaCoderProvider schemaCoderProvider, AvroEncoding encoding);
}public class AvroFormatOptions {
public static final ConfigOption<String> AVRO_OUTPUT_CODEC;
public static final ConfigOption<AvroEncoding> AVRO_ENCODING;
public static final ConfigOption<Boolean> AVRO_TIMESTAMP_LEGACY_MAPPING;
public enum AvroEncoding implements DescribedEnum {
BINARY("binary", "Use binary encoding for serialization and deserialization."),
JSON("json", "Use JSON encoding for serialization and deserialization.");
String toString();
InlineElement getDescription();
}
}public interface SchemaCoder {
Schema readSchema(InputStream in) throws IOException;
void writeSchema(Schema schema, OutputStream out) throws IOException;
interface SchemaCoderProvider extends Serializable {
SchemaCoder get();
}
}public interface AvroToRowDataConverter extends Serializable {
Object convert(Object object);
}
public interface RowDataToAvroConverter extends Serializable {
Object convert(Schema schema, Object object);
}public enum Codec {
NULL, SNAPPY, BZIP2, DEFLATE, XZ
}