CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-avro

Apache Flink SQL Avro format library that provides bundled and shaded Apache Avro dependencies for SQL usage in Flink applications.

Pending
Overview
Eval results
Files

utilities.mddocs/

Type System and Utilities

Capabilities

Schema Conversion Utilities

Utilities for converting between Flink types and Avro schemas.

/**
 * Utility class for converting between Flink RowType and Avro Schema
 * Located in org.apache.flink.formats.avro.typeutils package
 */
public class AvroSchemaConverter {
    
    /**
     * Converts a Flink LogicalType to an Avro Schema
     * @param logicalType The Flink LogicalType to convert
     * @return Corresponding Avro Schema
     */
    public static Schema convertToSchema(LogicalType logicalType);
    
    /**
     * Converts a Flink LogicalType to an Avro Schema with timestamp mapping control
     * @param logicalType The Flink LogicalType to convert
     * @param legacyTimestampMapping Whether to use legacy timestamp mapping
     * @return Corresponding Avro Schema
     */
    public static Schema convertToSchema(LogicalType logicalType, boolean legacyTimestampMapping);
    
    /**
     * Converts a Flink LogicalType to an Avro Schema with custom row name
     * @param logicalType The Flink LogicalType to convert
     * @param rowName Name for the root record schema
     * @return Corresponding Avro Schema
     */
    public static Schema convertToSchema(LogicalType logicalType, String rowName);
    
    /**
     * Converts an Avro schema string to a Flink DataType
     * @param avroSchemaString The Avro schema definition string
     * @return Corresponding Flink DataType
     */
    public static DataType convertToDataType(String avroSchemaString);
    
    /**
     * Converts an Avro schema string to a Flink DataType with timestamp mapping control
     * @param avroSchemaString The Avro schema definition string
     * @param legacyTimestampMapping Whether to use legacy timestamp mapping
     * @return Corresponding Flink DataType
     */
    public static DataType convertToDataType(String avroSchemaString, boolean legacyTimestampMapping);
    
    /**
     * Converts an Avro class into Flink TypeInformation
     * @param avroClass Avro specific record class
     * @return TypeInformation matching the schema
     */
    public static <T extends SpecificRecord> TypeInformation<Row> convertToTypeInfo(Class<T> avroClass);
    
    /**
     * Converts an Avro schema string into Flink TypeInformation
     * @param avroSchemaString Avro schema definition string
     * @return TypeInformation matching the schema
     */
    public static <T> TypeInformation<T> convertToTypeInfo(String avroSchemaString);
}

Type Information Classes

Type information classes for Avro record types in Flink's type system.

/**
 * Type information for Avro records
 * @param <T> The Avro record type
 */
public class AvroTypeInfo<T> extends TypeInformation<T> {
    
    /**
     * Creates AvroTypeInfo for a specific record class
     * @param recordClazz The Avro record class
     */
    public AvroTypeInfo(Class<T> recordClazz);
    
    /**
     * Creates AvroTypeInfo with explicit schema
     * @param recordClazz The Avro record class
     * @param schema The Avro schema
     */
    public AvroTypeInfo(Class<T> recordClazz, Schema schema);
    
    /**
     * Gets the Avro schema for this type
     * @return The Avro schema
     */
    public Schema getAvroSchema();
    
    /**
     * Gets the record class
     * @return The Java class for the Avro record
     */
    public Class<T> getRecordClazz();
    
    /**
     * Creates a serializer for this type
     * @param config Execution configuration
     * @return TypeSerializer for this Avro type
     */
    public TypeSerializer<T> createSerializer(ExecutionConfig config);
}

/**
 * Specialized type information for Avro GenericRecord
 */
public class GenericRecordAvroTypeInfo extends AvroTypeInfo<GenericRecord> {
    
    /**
     * Creates type information for GenericRecord with schema
     * @param schema The Avro schema for the GenericRecord
     */
    public GenericRecordAvroTypeInfo(Schema schema);
}

Schema Utilities

/**
 * Utility for encoding and decoding Avro schemas
 */
public class SchemaCoder {
    
    /**
     * Encodes an Avro schema to a string representation
     * @param schema The schema to encode
     * @return Encoded schema string
     */
    public static String encode(Schema schema);
    
    /**
     * Decodes a schema from string representation
     * @param encodedSchema The encoded schema string
     * @return Decoded Avro schema
     */
    public static Schema decode(String encodedSchema);
    
    /**
     * Validates schema compatibility between reader and writer schemas
     * @param writerSchema The schema used to write data
     * @param readerSchema The schema used to read data
     * @return true if schemas are compatible
     */
    public static boolean isCompatible(Schema writerSchema, Schema readerSchema);
}

/**
 * Serializable wrapper for Avro Schema objects
 */
public class SerializableAvroSchema implements Serializable {
    
    /**
     * Creates a serializable schema wrapper
     * @param schema The Avro schema to wrap
     */
    public SerializableAvroSchema(Schema schema);
    
    /**
     * Gets the wrapped Avro schema
     * @return The Avro schema
     */
    public Schema getAvroSchema();
    
    /**
     * Creates from schema string
     * @param schemaString JSON representation of the schema
     * @return SerializableAvroSchema instance
     */
    public static SerializableAvroSchema fromString(String schemaString);
}

Kryo Serialization Support

/**
 * Utilities for Kryo serialization of Avro objects
 */
public class AvroKryoSerializerUtils {
    
    /**
     * Registers Avro types with Kryo for efficient serialization
     * @param kryo The Kryo instance to register with
     */
    public static void registerAvroTypes(Kryo kryo);
    
    /**
     * Creates optimized Kryo serializer for specific Avro record type
     * @param recordClass The Avro record class
     * @return Configured Kryo serializer
     */
    public static <T> Serializer<T> createAvroSerializer(Class<T> recordClass);
}

Usage Examples

Schema Conversion

// Convert Flink RowType to Avro Schema
RowType flinkRowType = RowType.of(
    new DataType[] {
        DataTypes.BIGINT(),
        DataTypes.STRING(),
        DataTypes.ARRAY(DataTypes.STRING()),
        DataTypes.ROW(
            DataTypes.FIELD("street", DataTypes.STRING()),
            DataTypes.FIELD("city", DataTypes.STRING()),
            DataTypes.FIELD("zipcode", DataTypes.STRING())
        ),
        DataTypes.TIMESTAMP(3)
    },
    new String[] {"user_id", "username", "tags", "address", "created_at"}
);

// Convert to Avro schema
Schema avroSchema = AvroSchemaConverter.convertToSchema(flinkRowType);

System.out.println("Generated Avro Schema:");
System.out.println(avroSchema.toString(true));

// Convert back to Flink RowType
RowType convertedRowType = AvroSchemaConverter.convertToRowType(avroSchema);

// Convert to DataType for table processing
DataType dataType = AvroSchemaConverter.convertToDataType(avroSchema);

Type Information Usage

// Create type information for SpecificRecord
AvroTypeInfo<User> userTypeInfo = new AvroTypeInfo<>(User.class);

// Use in DataStream
DataStream<User> userStream = env
    .fromCollection(userList, userTypeInfo)
    .map(new MapFunction<User, User>() {
        @Override
        public User map(User user) throws Exception {
            // Process user
            return user;
        }
    });

// Create type information for GenericRecord
Schema schema = new Schema.Parser().parse(schemaString);
GenericRecordAvroTypeInfo genericTypeInfo = new GenericRecordAvroTypeInfo(schema);

DataStream<GenericRecord> genericStream = env
    .fromCollection(genericRecordList, genericTypeInfo);

// Get schema from type info
Schema retrievedSchema = userTypeInfo.getAvroSchema();
Class<User> recordClass = userTypeInfo.getRecordClazz();

Schema Validation and Compatibility

// Schema evolution example
Schema v1Schema = SchemaBuilder.record("User")
    .fields()
    .name("id").type().longType().noDefault()
    .name("name").type().stringType().noDefault()
    .endRecord();

Schema v2Schema = SchemaBuilder.record("User")
    .fields()
    .name("id").type().longType().noDefault()
    .name("name").type().stringType().noDefault()
    .name("email").type().unionOf().nullType().and().stringType().endUnion().nullDefault()
    .endRecord();

// Check compatibility
boolean compatible = SchemaCoder.isCompatible(v1Schema, v2Schema);
System.out.println("Schemas compatible: " + compatible);

// Encode schema for storage/transmission
String encodedSchema = SchemaCoder.encode(v2Schema);

// Decode schema
Schema decodedSchema = SchemaCoder.decode(encodedSchema);

// Create serializable schema for distributed processing
SerializableAvroSchema serializableSchema = new SerializableAvroSchema(v2Schema);

Complex Type Mapping

// Complex Flink type with nested structures
RowType complexType = RowType.of(
    new DataType[] {
        DataTypes.ROW(
            DataTypes.FIELD("personal", DataTypes.ROW(
                DataTypes.FIELD("firstName", DataTypes.STRING()),
                DataTypes.FIELD("lastName", DataTypes.STRING()),
                DataTypes.FIELD("age", DataTypes.INT())
            )),
            DataTypes.FIELD("contact", DataTypes.ROW(
                DataTypes.FIELD("email", DataTypes.STRING()),
                DataTypes.FIELD("phones", DataTypes.ARRAY(DataTypes.STRING()))
            ))
        ),
        DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
        DataTypes.ARRAY(DataTypes.ROW(
            DataTypes.FIELD("type", DataTypes.STRING()),
            DataTypes.FIELD("value", DataTypes.STRING())
        ))
    },
    new String[] {"user_profile", "metadata", "preferences"}
);

// Convert complex type to Avro schema
Schema complexAvroSchema = AvroSchemaConverter.convertToSchema(complexType);

// The generated schema will have proper Avro types:
// - ROW becomes RECORD
// - ARRAY becomes ARRAY  
// - MAP becomes MAP
// - Nested structures are properly represented

System.out.println("Complex Avro Schema:");
System.out.println(complexAvroSchema.toString(true));

Custom Type Registration

// Register custom Avro types with Flink's type system
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Register Avro types for better performance
env.getConfig().registerTypeWithKryoSerializer(
    GenericRecord.class, 
    AvroKryoSerializerUtils.createAvroSerializer(GenericRecord.class)
);

// Register specific record types
env.getConfig().registerTypeWithKryoSerializer(
    User.class,
    AvroKryoSerializerUtils.createAvroSerializer(User.class)
);

// Enable Kryo for all Avro types
AvroKryoSerializerUtils.registerAvroTypes(
    env.getConfig().getSerializerConfig().getKryo()
);

Schema Registry Integration with Utilities

// Utility for managing schemas with registry
public class AvroSchemaManager {
    
    private final SchemaRegistryClient registryClient;
    
    public AvroSchemaManager(String registryUrl) {
        this.registryClient = new CachedSchemaRegistryClient(registryUrl, 100);
    }
    
    /**
     * Convert Flink table to Avro schema and register
     */
    public int registerTableSchema(String subject, RowType rowType) throws IOException {
        Schema avroSchema = AvroSchemaConverter.convertToSchema(rowType);
        return registryClient.register(subject, avroSchema);
    }
    
    /**
     * Get Flink RowType from registered schema
     */
    public RowType getRowTypeFromRegistry(String subject) throws IOException {
        Schema schema = registryClient.getLatestSchemaMetadata(subject).getSchema();
        return AvroSchemaConverter.convertToRowType(schema);
    }
    
    /**
     * Validate schema evolution
     */
    public boolean validateEvolution(String subject, RowType newRowType) throws IOException {
        Schema newSchema = AvroSchemaConverter.convertToSchema(newRowType);
        return registryClient.testCompatibility(subject, newSchema);
    }
}

// Usage
AvroSchemaManager schemaManager = new AvroSchemaManager("http://localhost:8081");

// Register table schema
RowType tableSchema = RowType.of(
    new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
    new String[] {"id", "name"}
);

int schemaId = schemaManager.registerTableSchema("users-value", tableSchema);

// Later, retrieve schema for processing
RowType retrievedSchema = schemaManager.getRowTypeFromRegistry("users-value");

Performance Optimization with Type Information

// Optimized processing with proper type information
public class OptimizedAvroProcessor {
    
    public static <T> DataStream<T> processAvroStream(
            DataStream<byte[]> rawStream,
            AvroTypeInfo<T> typeInfo) {
        
        // Create efficient deserializer
        AvroDeserializationSchema<T> deserializer = 
            AvroDeserializationSchema.forGeneric(typeInfo.getAvroSchema());
        
        return rawStream
            .map(new MapFunction<byte[], T>() {
                @Override
                public T map(byte[] value) throws Exception {
                    return deserializer.deserialize(value);
                }
            })
            .returns(typeInfo); // Explicit type information for optimization
    }
    
    public static DataStream<GenericRecord> optimizeGenericRecordProcessing(
            DataStream<GenericRecord> stream,
            Schema schema) {
        
        // Use optimized type info
        GenericRecordAvroTypeInfo typeInfo = new GenericRecordAvroTypeInfo(schema);
        
        return stream
            .rebalance() // Distribute load evenly
            .map(new RichMapFunction<GenericRecord, GenericRecord>() {
                private transient GenericRecord reusableRecord;
                
                @Override
                public void open(Configuration parameters) {
                    // Create reusable record for better performance
                    reusableRecord = new GenericData.Record(schema);
                }
                
                @Override
                public GenericRecord map(GenericRecord record) throws Exception {
                    // Reuse record object to reduce GC pressure
                    for (Schema.Field field : schema.getFields()) {
                        reusableRecord.put(field.name(), record.get(field.name()));
                    }
                    return reusableRecord;
                }
            })
            .returns(typeInfo);
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-avro

docs

configuration.md

filesystem.md

index.md

registry.md

rowdata.md

schemas.md

utilities.md

tile.json