CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-avro

Apache Flink SQL Avro format library that provides bundled and shaded Apache Avro dependencies for SQL usage in Flink applications.

Pending
Overview
Eval results
Files

schemas.mddocs/

Schema-based Serialization and Deserialization

Capabilities

Generic Record Support

Support for working with Avro GenericRecord objects using runtime schemas.

/**
 * Deserialization schema for Avro GenericRecord with binary encoding
 * @param schema The Avro schema to use for deserialization
 * @return AvroDeserializationSchema instance for GenericRecord
 */
public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema);

/**
 * Deserialization schema for Avro GenericRecord with custom encoding
 * @param schema The Avro schema to use for deserialization  
 * @param encoding The encoding type (BINARY or JSON)
 * @return AvroDeserializationSchema instance for GenericRecord
 */
public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, AvroEncoding encoding);

/**
 * Serialization schema for Avro GenericRecord with binary encoding
 * @param schema The Avro schema to use for serialization
 * @return AvroSerializationSchema instance for GenericRecord
 */
public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema);

/**
 * Serialization schema for Avro GenericRecord with custom encoding
 * @param schema The Avro schema to use for serialization
 * @param encoding The encoding type (BINARY or JSON)
 * @return AvroSerializationSchema instance for GenericRecord
 */
public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema, AvroEncoding encoding);

Specific Record Support

Support for working with strongly-typed Avro SpecificRecord classes.

/**
 * Deserialization schema for Avro SpecificRecord with binary encoding
 * @param recordClazz The SpecificRecord class to deserialize to
 * @return AvroDeserializationSchema instance for the specific type
 */
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> recordClazz);

/**
 * Deserialization schema for Avro SpecificRecord with custom encoding
 * @param recordClazz The SpecificRecord class to deserialize to
 * @param encoding The encoding type (BINARY or JSON)
 * @return AvroDeserializationSchema instance for the specific type
 */
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> recordClazz, AvroEncoding encoding);

/**
 * Serialization schema for Avro SpecificRecord with binary encoding
 * @param recordClazz The SpecificRecord class to serialize from
 * @return AvroSerializationSchema instance for the specific type
 */
public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> recordClazz);

/**
 * Serialization schema for Avro SpecificRecord with custom encoding
 * @param recordClazz The SpecificRecord class to serialize from
 * @param encoding The encoding type (BINARY or JSON)
 * @return AvroSerializationSchema instance for the specific type
 */
public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> recordClazz, AvroEncoding encoding);

Core Schema Operations

/**
 * Deserialization schema that deserializes from Avro format
 * @param <T> Type of record it produces
 */
public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
    
    /**
     * Deserializes the byte message to an object
     * @param message Serialized message bytes, may be null
     * @return Deserialized object, or null if message was null
     * @throws IOException If deserialization fails
     */
    public T deserialize(byte[] message) throws IOException;
    
    /**
     * Checks if the given element signals end of the stream
     * @param nextElement The element to check
     * @return Always false (Avro records don't signal end of stream)
     */
    public boolean isEndOfStream(T nextElement);
    
    /**
     * Gets the data type produced by this deserializer
     * @return TypeInformation for the produced type
     */
    public TypeInformation<T> getProducedType();
}

/**
 * Serialization schema that serializes objects to Avro format
 * @param <T> Type of record it consumes
 */
public class AvroSerializationSchema<T> implements SerializationSchema<T> {
    
    /**
     * Serializes the given object to a byte array
     * @param object Object to serialize
     * @return Serialized byte array
     */
    public byte[] serialize(T object);
    
    /**
     * Gets the Avro schema used by this serializer
     * @return The Avro schema
     */
    public Schema getSchema();
    
    /**
     * Opens the serializer for initialization
     * @param context Initialization context
     * @throws Exception If initialization fails
     */
    public void open(InitializationContext context) throws Exception;
}

Usage Examples

Generic Record Usage

// Define Avro schema
Schema schema = SchemaBuilder.record("User")
    .fields()
    .name("id").type().longType().noDefault()
    .name("username").type().stringType().noDefault()
    .name("email").type().unionOf().nullType().and().stringType().endUnion().nullDefault()
    .name("created_at").type().longType().noDefault()
    .endRecord();

// Create deserializer
AvroDeserializationSchema<GenericRecord> deserializer = 
    AvroDeserializationSchema.forGeneric(schema);

// Create serializer with JSON encoding
AvroSerializationSchema<GenericRecord> serializer = 
    AvroSerializationSchema.forGeneric(schema, AvroEncoding.JSON);

// Use in Flink DataStream
DataStream<byte[]> inputStream = // ... your byte stream
DataStream<GenericRecord> records = inputStream
    .map(new MapFunction<byte[], GenericRecord>() {
        @Override
        public GenericRecord map(byte[] value) throws Exception {
            return deserializer.deserialize(value);
        }
    });

// Serialize records back to bytes
DataStream<byte[]> outputStream = records
    .map(new MapFunction<GenericRecord, byte[]>() {
        @Override
        public byte[] map(GenericRecord record) throws Exception {
            return serializer.serialize(record);
        }
    });

Specific Record Usage

// Assuming you have a generated SpecificRecord class
public class User extends SpecificRecord {
    public Long id;
    public String username;
    public String email;
    public Long createdAt;
    
    // Generated methods...
}

// Create type-safe deserializer
AvroDeserializationSchema<User> deserializer = 
    AvroDeserializationSchema.forSpecific(User.class);

// Create type-safe serializer
AvroSerializationSchema<User> serializer = 
    AvroSerializationSchema.forSpecific(User.class, AvroEncoding.BINARY);

// Use in Flink DataStream with strong typing
DataStream<byte[]> inputStream = // ... your byte stream
DataStream<User> users = inputStream
    .map(new MapFunction<byte[], User>() {
        @Override
        public User map(byte[] value) throws Exception {
            return deserializer.deserialize(value);
        }
    });

// Process with type safety
DataStream<User> processedUsers = users
    .filter(user -> user.email != null)
    .map(user -> {
        user.username = user.username.toLowerCase();
        return user;
    });

Schema Evolution

// Reader schema (what you expect to read)
Schema readerSchema = SchemaBuilder.record("User")
    .fields()
    .name("id").type().longType().noDefault()
    .name("username").type().stringType().noDefault()
    .name("email").type().unionOf().nullType().and().stringType().endUnion().nullDefault()
    .name("full_name").type().unionOf().nullType().and().stringType().endUnion().nullDefault() // New field
    .endRecord();

// Writer schema (what was written to the data)
Schema writerSchema = SchemaBuilder.record("User")
    .fields()
    .name("id").type().longType().noDefault()
    .name("username").type().stringType().noDefault()
    .name("email").type().unionOf().nullType().and().stringType().endUnion().nullDefault()
    .endRecord();

// Avro automatically handles schema evolution during deserialization
AvroDeserializationSchema<GenericRecord> deserializer = 
    AvroDeserializationSchema.forGeneric(readerSchema);

// Records deserialized with reader schema will have null for new fields
// and missing fields will be ignored according to Avro resolution rules

Error Handling

AvroDeserializationSchema<GenericRecord> deserializer = 
    AvroDeserializationSchema.forGeneric(schema);

DataStream<GenericRecord> records = inputStream
    .map(new MapFunction<byte[], GenericRecord>() {
        @Override
        public GenericRecord map(byte[] value) throws Exception {
            try {
                return deserializer.deserialize(value);
            } catch (IOException e) {
                // Handle deserialization errors
                System.err.println("Failed to deserialize record: " + e.getMessage());
                return null; // Or throw runtime exception based on requirements
            }
        }
    })
    .filter(Objects::nonNull); // Filter out failed deserializations

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-avro

docs

configuration.md

filesystem.md

index.md

registry.md

rowdata.md

schemas.md

utilities.md

tile.json