Apache Flink Avro format support library providing serialization and deserialization capabilities for Apache Avro data format within Flink streaming and batch processing applications
—
Core serialization and deserialization functionality for converting between Java objects and Avro binary/JSON format in Flink streaming applications. Supports both generic and specific record types with configurable encoding options.
Serializes Java objects to Avro binary or JSON format for use in Flink streaming operations.
public class AvroSerializationSchema<T> implements SerializationSchema<T> {
// Static factory methods for specific records
public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> tClass);
public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> tClass, AvroEncoding encoding);
// Static factory methods for generic records
public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema);
public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema, AvroEncoding encoding);
// Instance methods
public byte[] serialize(T object);
public Schema getSchema();
public void open(InitializationContext context) throws Exception;
}For Specific Records (generated from Avro schema):
import org.apache.flink.formats.avro.AvroSerializationSchema;
import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding;
// Binary encoding (default)
AvroSerializationSchema<User> userSerializer = AvroSerializationSchema.forSpecific(User.class);
// JSON encoding
AvroSerializationSchema<User> jsonSerializer = AvroSerializationSchema.forSpecific(User.class, AvroEncoding.JSON);
// Use in DataStream
DataStream<User> userStream = ...;
DataStream<byte[]> serializedStream = userStream.map(userSerializer::serialize);For Generic Records:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
// Parse schema from string
Schema schema = new Schema.Parser().parse(schemaString);
// Create serializer
AvroSerializationSchema<GenericRecord> genericSerializer = AvroSerializationSchema.forGeneric(schema);
// Use with generic records
DataStream<GenericRecord> recordStream = ...;
DataStream<byte[]> serializedStream = recordStream.map(genericSerializer::serialize);Deserializes Avro binary or JSON format back to Java objects for use in Flink streaming operations.
public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
// Static factory methods for generic records
public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema);
public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, AvroEncoding encoding);
// Static factory methods for specific records
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass);
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass, AvroEncoding encoding);
// Instance methods
public T deserialize(byte[] message) throws IOException;
public TypeInformation<T> getProducedType();
public boolean isEndOfStream(T nextElement);
}For Specific Records:
// Create deserializer for specific record type
AvroDeserializationSchema<User> userDeserializer = AvroDeserializationSchema.forSpecific(User.class);
// Use in Kafka source
FlinkKafkaConsumer<User> consumer = new FlinkKafkaConsumer<>(
"user-topic",
userDeserializer,
properties
);
DataStream<User> userStream = env.addSource(consumer);For Generic Records:
// Create deserializer for generic records
AvroDeserializationSchema<GenericRecord> genericDeserializer =
AvroDeserializationSchema.forGeneric(schema);
// Use in streaming context
DataStream<byte[]> byteStream = ...;
DataStream<GenericRecord> recordStream = byteStream.map(data -> {
try {
return genericDeserializer.deserialize(data);
} catch (IOException e) {
throw new RuntimeException("Deserialization failed", e);
}
});Both serialization and deserialization schemas support two encoding formats:
public enum AvroEncoding implements DescribedEnum {
BINARY("binary", "Use binary encoding for serialization and deserialization."),
JSON("json", "Use JSON encoding for serialization and deserialization.");
}Serialization Errors:
null for null input objectsWrappingRuntimeException for schema or encoding failuresDeserialization Errors:
null for null input bytesIOException for malformed data or schema mismatchesThe deserialization schema automatically provides type information to Flink:
// Type information is automatically inferred
TypeInformation<User> typeInfo = userDeserializer.getProducedType();
// For generic records, includes schema information
TypeInformation<GenericRecord> genericTypeInfo = genericDeserializer.getProducedType();Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-avro