Apache Flink SQL Avro format library that provides bundled and shaded Apache Avro dependencies for SQL usage in Flink applications.
—
Support for working with Avro GenericRecord objects using runtime schemas.
/**
* Deserialization schema for Avro GenericRecord with binary encoding
* @param schema The Avro schema to use for deserialization
* @return AvroDeserializationSchema instance for GenericRecord
*/
public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema);
/**
* Deserialization schema for Avro GenericRecord with custom encoding
* @param schema The Avro schema to use for deserialization
* @param encoding The encoding type (BINARY or JSON)
* @return AvroDeserializationSchema instance for GenericRecord
*/
public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, AvroEncoding encoding);
/**
* Serialization schema for Avro GenericRecord with binary encoding
* @param schema The Avro schema to use for serialization
* @return AvroSerializationSchema instance for GenericRecord
*/
public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema);
/**
* Serialization schema for Avro GenericRecord with custom encoding
* @param schema The Avro schema to use for serialization
* @param encoding The encoding type (BINARY or JSON)
* @return AvroSerializationSchema instance for GenericRecord
*/
public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema, AvroEncoding encoding);Support for working with strongly-typed Avro SpecificRecord classes.
/**
* Deserialization schema for Avro SpecificRecord with binary encoding
* @param recordClazz The SpecificRecord class to deserialize to
* @return AvroDeserializationSchema instance for the specific type
*/
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> recordClazz);
/**
* Deserialization schema for Avro SpecificRecord with custom encoding
* @param recordClazz The SpecificRecord class to deserialize to
* @param encoding The encoding type (BINARY or JSON)
* @return AvroDeserializationSchema instance for the specific type
*/
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> recordClazz, AvroEncoding encoding);
/**
* Serialization schema for Avro SpecificRecord with binary encoding
* @param recordClazz The SpecificRecord class to serialize from
* @return AvroSerializationSchema instance for the specific type
*/
public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> recordClazz);
/**
* Serialization schema for Avro SpecificRecord with custom encoding
* @param recordClazz The SpecificRecord class to serialize from
* @param encoding The encoding type (BINARY or JSON)
* @return AvroSerializationSchema instance for the specific type
*/
public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> recordClazz, AvroEncoding encoding);/**
* Deserialization schema that deserializes from Avro format
* @param <T> Type of record it produces
*/
public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
/**
* Deserializes the byte message to an object
* @param message Serialized message bytes, may be null
* @return Deserialized object, or null if message was null
* @throws IOException If deserialization fails
*/
public T deserialize(byte[] message) throws IOException;
/**
* Checks if the given element signals end of the stream
* @param nextElement The element to check
* @return Always false (Avro records don't signal end of stream)
*/
public boolean isEndOfStream(T nextElement);
/**
* Gets the data type produced by this deserializer
* @return TypeInformation for the produced type
*/
public TypeInformation<T> getProducedType();
}
/**
* Serialization schema that serializes objects to Avro format
* @param <T> Type of record it consumes
*/
public class AvroSerializationSchema<T> implements SerializationSchema<T> {
/**
* Serializes the given object to a byte array
* @param object Object to serialize
* @return Serialized byte array
*/
public byte[] serialize(T object);
/**
* Gets the Avro schema used by this serializer
* @return The Avro schema
*/
public Schema getSchema();
/**
* Opens the serializer for initialization
* @param context Initialization context
* @throws Exception If initialization fails
*/
public void open(InitializationContext context) throws Exception;
}// Define Avro schema
Schema schema = SchemaBuilder.record("User")
.fields()
.name("id").type().longType().noDefault()
.name("username").type().stringType().noDefault()
.name("email").type().unionOf().nullType().and().stringType().endUnion().nullDefault()
.name("created_at").type().longType().noDefault()
.endRecord();
// Create deserializer
AvroDeserializationSchema<GenericRecord> deserializer =
AvroDeserializationSchema.forGeneric(schema);
// Create serializer with JSON encoding
AvroSerializationSchema<GenericRecord> serializer =
AvroSerializationSchema.forGeneric(schema, AvroEncoding.JSON);
// Use in Flink DataStream
DataStream<byte[]> inputStream = // ... your byte stream
DataStream<GenericRecord> records = inputStream
.map(new MapFunction<byte[], GenericRecord>() {
@Override
public GenericRecord map(byte[] value) throws Exception {
return deserializer.deserialize(value);
}
});
// Serialize records back to bytes
DataStream<byte[]> outputStream = records
.map(new MapFunction<GenericRecord, byte[]>() {
@Override
public byte[] map(GenericRecord record) throws Exception {
return serializer.serialize(record);
}
});// Assuming you have a generated SpecificRecord class
public class User extends SpecificRecord {
public Long id;
public String username;
public String email;
public Long createdAt;
// Generated methods...
}
// Create type-safe deserializer
AvroDeserializationSchema<User> deserializer =
AvroDeserializationSchema.forSpecific(User.class);
// Create type-safe serializer
AvroSerializationSchema<User> serializer =
AvroSerializationSchema.forSpecific(User.class, AvroEncoding.BINARY);
// Use in Flink DataStream with strong typing
DataStream<byte[]> inputStream = // ... your byte stream
DataStream<User> users = inputStream
.map(new MapFunction<byte[], User>() {
@Override
public User map(byte[] value) throws Exception {
return deserializer.deserialize(value);
}
});
// Process with type safety
DataStream<User> processedUsers = users
.filter(user -> user.email != null)
.map(user -> {
user.username = user.username.toLowerCase();
return user;
});// Reader schema (what you expect to read)
Schema readerSchema = SchemaBuilder.record("User")
.fields()
.name("id").type().longType().noDefault()
.name("username").type().stringType().noDefault()
.name("email").type().unionOf().nullType().and().stringType().endUnion().nullDefault()
.name("full_name").type().unionOf().nullType().and().stringType().endUnion().nullDefault() // New field
.endRecord();
// Writer schema (what was written to the data)
Schema writerSchema = SchemaBuilder.record("User")
.fields()
.name("id").type().longType().noDefault()
.name("username").type().stringType().noDefault()
.name("email").type().unionOf().nullType().and().stringType().endUnion().nullDefault()
.endRecord();
// Avro automatically handles schema evolution during deserialization
AvroDeserializationSchema<GenericRecord> deserializer =
AvroDeserializationSchema.forGeneric(readerSchema);
// Records deserialized with reader schema will have null for new fields
// and missing fields will be ignored according to Avro resolution rulesAvroDeserializationSchema<GenericRecord> deserializer =
AvroDeserializationSchema.forGeneric(schema);
DataStream<GenericRecord> records = inputStream
.map(new MapFunction<byte[], GenericRecord>() {
@Override
public GenericRecord map(byte[] value) throws Exception {
try {
return deserializer.deserialize(value);
} catch (IOException e) {
// Handle deserialization errors
System.err.println("Failed to deserialize record: " + e.getMessage());
return null; // Or throw runtime exception based on requirements
}
}
})
.filter(Objects::nonNull); // Filter out failed deserializationsInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-avro