CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-avro

Apache Flink Avro format support library providing serialization and deserialization capabilities for Apache Avro data format within Flink streaming and batch processing applications

Pending
Overview
Eval results
Files

type-system-integration.mddocs/

Type System Integration

Type information classes and utilities for seamless integration with Flink's type system. Provides efficient serialization, type safety, and schema conversion between Flink and Avro type systems.

AvroTypeInfo

Type information class for Avro specific record types that extend SpecificRecordBase.

public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {
    // Constructor
    public AvroTypeInfo(Class<T> typeClass);
    
    // Serializer creation
    public TypeSerializer<T> createSerializer(SerializerConfig config);
}

Usage Examples

Creating Type Information:

import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;

// Create type information for specific record
AvroTypeInfo<User> userTypeInfo = new AvroTypeInfo<>(User.class);

// Use in DataStream operations
DataStream<User> userStream = env.fromCollection(users, userTypeInfo);

// Explicit type hints
DataStream<User> transformedStream = rawStream
    .map(data -> parseUser(data))
    .returns(userTypeInfo);

With DataSet API:

// Create DataSet with explicit type information
DataSet<User> userDataSet = env.fromCollection(userList, userTypeInfo);

// Type-safe operations
DataSet<String> names = userDataSet
    .map(user -> user.getName().toString())
    .returns(Types.STRING);

GenericRecordAvroTypeInfo

Type information class for Avro generic records with schema information.

public class GenericRecordAvroTypeInfo extends TypeInformation<GenericRecord> {
    // Constructor
    public GenericRecordAvroTypeInfo(Schema schema);
    
    // Type system integration
    public TypeSerializer<GenericRecord> createSerializer(SerializerConfig config);
    public boolean isBasicType();
    public boolean isTupleType();
    public int getArity();
    public int getTotalFields();
}

Usage Examples

Generic Record Processing:

import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

// Parse schema
Schema schema = new Schema.Parser().parse(schemaString);

// Create type information
GenericRecordAvroTypeInfo typeInfo = new GenericRecordAvroTypeInfo(schema);

// Use in streaming
DataStream<GenericRecord> recordStream = env
    .fromCollection(genericRecords, typeInfo);

// Type-safe transformations
DataStream<String> nameStream = recordStream
    .map(record -> record.get("name").toString())
    .name("extract-names");

Schema Evolution Support:

// Handle multiple schema versions
Schema readerSchema = getReaderSchema();
Schema writerSchema = getWriterSchema();

GenericRecordAvroTypeInfo readerTypeInfo = new GenericRecordAvroTypeInfo(readerSchema);

// Process with schema evolution
DataStream<GenericRecord> evolvedStream = rawStream
    .map(new SchemaEvolutionMapper(writerSchema, readerSchema))
    .returns(readerTypeInfo);

AvroSerializer

High-performance serializer for Avro types with schema evolution support.

public class AvroSerializer<T> extends TypeSerializer<T> {
    // Constructor
    public AvroSerializer(Class<T> type);
    
    // Serialization operations
    public boolean isImmutableType();
    public TypeSerializer<T> duplicate();
    public T createInstance();
    public T copy(T from);
    public T copy(T from, T reuse);
    public int getLength();
    public void serialize(T record, DataOutputView target) throws IOException;
    public T deserialize(DataInputView source) throws IOException;
    public T deserialize(T reuse, DataInputView source) throws IOException;
    public void copy(DataInputView source, DataOutputView target) throws IOException;
}

Performance Characteristics

Object Reuse:

// Serializer supports object reuse for better performance
AvroSerializer<User> serializer = new AvroSerializer<>(User.class);

// Reuse instance across deserialization calls
User reusableUser = serializer.createInstance();
User deserializedUser = serializer.deserialize(reusableUser, dataInput);

Memory Efficiency:

// Efficient copying without full deserialization
serializer.copy(inputView, outputView); // Direct byte copying when possible

AvroSchemaConverter

Utility class for converting between Flink and Avro type systems.

public class AvroSchemaConverter {
    // Schema conversion methods
    public static Schema convertToSchema(RowType rowType);
    public static Schema convertToSchema(RowType rowType, boolean legacyTimestampMapping);
    public static LogicalType convertToLogicalType(Schema schema);
    public static RowType convertToRowType(Schema schema);
    public static RowType convertToRowType(Schema schema, boolean legacyTimestampMapping);
}

Type Conversion Examples

Flink RowType to Avro Schema:

import org.apache.flink.table.types.logical.*;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;

// Define Flink row type
RowType rowType = RowType.of(
    new LogicalType[] {
        new VarCharType(50),        // name
        new IntType(),              // age  
        new BooleanType(),          // active
        new TimestampType(3),       // created_at
        new DecimalType(10, 2),     // salary
        new ArrayType(new VarCharType(20)) // tags
    },
    new String[] {"name", "age", "active", "created_at", "salary", "tags"}
);

// Convert to Avro schema
Schema avroSchema = AvroSchemaConverter.convertToSchema(rowType);

// With legacy timestamp mapping
Schema legacySchema = AvroSchemaConverter.convertToSchema(rowType, true);

Avro Schema to Flink RowType:

// Parse Avro schema
String schemaJson = "{ \"type\": \"record\", \"name\": \"User\", ... }";
Schema schema = new Schema.Parser().parse(schemaJson);

// Convert to Flink row type
RowType flinkRowType = AvroSchemaConverter.convertToRowType(schema);

// Use in Table API
Table table = tableEnv.fromDataStream(stream, Schema.of(flinkRowType));

Type Mapping Reference

Primitive Types:

Flink TypeAvro TypeNotes
BooleanTypebooleanDirect mapping
TinyIntTypeintPromoted to int
SmallIntTypeintPromoted to int
IntTypeintDirect mapping
BigIntTypelongDirect mapping
FloatTypefloatDirect mapping
DoubleTypedoubleDirect mapping
VarCharType/CharTypestringUTF-8 encoding
VarBinaryType/BinaryTypebytesDirect mapping

Complex Types:

Flink TypeAvro TypeNotes
ArrayTypearrayElement type converted recursively
MapTypemapKey must be string, value converted
RowTyperecordFields converted recursively
MultisetTypemapSpecial map with int values

Temporal Types:

Flink TypeAvro TypeLogical Type
DateTypeintdate
TimeTypeinttime-millis
TimestampTypelongtimestamp-millis (legacy)
TimestampTypelonglocal-timestamp-millis (correct)
LocalZonedTimestampTypelongtimestamp-millis

Decimal Types:

Flink TypeAvro TypeLogical Type
DecimalTypebytesdecimal with precision/scale
DecimalTypefixeddecimal with precision/scale

SerializableAvroSchema

Wrapper class for making Avro Schema objects serializable in Flink operations.

public class SerializableAvroSchema implements Serializable {
    // Constructor
    public SerializableAvroSchema(Schema schema);
    
    // Schema access
    public Schema getAvroSchema();
    
    // Serialization support
    private void writeObject(ObjectOutputStream out) throws IOException;
    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException;
}

Usage in Distributed Operations

// Wrap schema for serialization
Schema schema = new Schema.Parser().parse(schemaString);
SerializableAvroSchema serializableSchema = new SerializableAvroSchema(schema);

// Use in map function
DataStream<GenericRecord> processed = recordStream.map(new RichMapFunction<GenericRecord, GenericRecord>() {
    private transient Schema schema;
    
    @Override
    public void open(Configuration parameters) {
        this.schema = serializableSchema.getAvroSchema();
    }
    
    @Override
    public GenericRecord map(GenericRecord record) throws Exception {
        // Use schema in processing
        return processRecord(record, schema);
    }
});

AvroFactory

Factory utilities for creating Avro-specific data structures and serializers.

public class AvroFactory {
    // Schema extraction
    public static Schema extractAvroSpecificSchema(Class<?> specificRecordClass, SpecificData specificData);
    
    // SpecificData creation
    public static SpecificData getSpecificDataForClass(Class<? extends SpecificData> specificDataClass, ClassLoader classLoader);
}

Advanced Usage

// Extract schema from specific record class
Class<User> userClass = User.class;
SpecificData specificData = SpecificData.get();
Schema userSchema = AvroFactory.extractAvroSpecificSchema(userClass, specificData);

// Custom SpecificData with class loader
ClassLoader customClassLoader = Thread.currentThread().getContextClassLoader();
SpecificData customSpecificData = AvroFactory.getSpecificDataForClass(
    SpecificData.class, 
    customClassLoader
);

Performance Optimization

Serializer Configuration

Enable Object Reuse:

// Configure execution environment for object reuse
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse(); // Better performance with Avro serializers

Type Hint Usage:

// Provide explicit type information to avoid reflection
DataStream<User> typedStream = rawStream
    .map(data -> parseUser(data))
    .returns(new AvroTypeInfo<>(User.class)); // Avoid type erasure

Schema Caching

// Cache converted schemas to avoid repeated conversion
private static final ConcurrentHashMap<String, Schema> SCHEMA_CACHE = new ConcurrentHashMap<>();

public Schema getCachedSchema(RowType rowType) {
    String key = rowType.toString();
    return SCHEMA_CACHE.computeIfAbsent(key, 
        k -> AvroSchemaConverter.convertToSchema(rowType));
}

Error Handling

Type Mismatch Errors:

try {
    Schema convertedSchema = AvroSchemaConverter.convertToSchema(rowType);
} catch (IllegalArgumentException e) {
    logger.error("Unsupported type conversion: " + e.getMessage());
    // Handle unsupported type
}

Serialization Errors:

try {
    serializer.serialize(record, output);
} catch (IOException e) {
    logger.error("Serialization failed", e);
    // Implement error recovery
}

Schema Evolution Errors:

// Handle schema evolution gracefully
try {
    GenericRecord evolved = SchemaEvolution.evolve(record, oldSchema, newSchema);
} catch (AvroRuntimeException e) {
    logger.warn("Schema evolution failed, using default values", e);
    // Apply default values or skip record
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-avro

docs

bulk-writers.md

file-io-operations.md

index.md

schema-registry-integration.md

serialization-deserialization.md

table-api-integration.md

type-system-integration.md

tile.json