CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-avro

Apache Flink Avro format support library providing serialization and deserialization capabilities for Apache Avro data format within Flink streaming and batch processing applications

Pending
Overview
Eval results
Files

table-api-integration.mddocs/

Table API Integration

Row-based serialization and deserialization for seamless integration with Flink's Table API and SQL layer. Converts between Flink's internal RowData representation and Avro format, enabling SQL operations on Avro data.

AvroRowDataSerializationSchema

Serializes Flink RowData objects to Avro binary or JSON format for Table API integration.

public class AvroRowDataSerializationSchema implements SerializationSchema<RowData> {
    // Constructors
    public AvroRowDataSerializationSchema(RowType rowType);
    public AvroRowDataSerializationSchema(RowType rowType, AvroEncoding encoding);
    public AvroRowDataSerializationSchema(RowType rowType, AvroEncoding encoding, boolean legacyTimestampMapping);
    public AvroRowDataSerializationSchema(RowType rowType, SerializationSchema<GenericRecord> nestedSchema, RowDataToAvroConverters.RowDataToAvroConverter runtimeConverter);
    
    // Instance methods
    public byte[] serialize(RowData row);
    public void open(InitializationContext context) throws Exception;
}

Usage Examples

Basic Table Integration:

import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;

// Define table schema
RowType rowType = RowType.of(
    new LogicalType[] {
        DataTypes.STRING().getLogicalType(),
        DataTypes.INT().getLogicalType(),
        DataTypes.BOOLEAN().getLogicalType()
    },
    new String[] {"name", "age", "active"}
);

// Create serializer
AvroRowDataSerializationSchema serializer = new AvroRowDataSerializationSchema(rowType);

// Use in table sink
TableSink<?> sink = new CustomTableSink(serializer);

With Custom Encoding:

// Use JSON encoding for debugging
AvroRowDataSerializationSchema jsonSerializer = new AvroRowDataSerializationSchema(
    rowType, 
    AvroEncoding.JSON
);

// Control timestamp mapping behavior
AvroRowDataSerializationSchema legacySerializer = new AvroRowDataSerializationSchema(
    rowType,
    AvroEncoding.BINARY,
    true  // Use legacy timestamp mapping for compatibility
);

AvroRowDataDeserializationSchema

Deserializes Avro data to Flink RowData objects for Table API operations.

public class AvroRowDataDeserializationSchema implements DeserializationSchema<RowData> {
    // Constructors
    public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> producedTypeInfo);
    public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> producedTypeInfo, AvroEncoding encoding);
    public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> producedTypeInfo, AvroEncoding encoding, boolean legacyTimestampMapping);
    
    // Instance methods
    public RowData deserialize(byte[] message) throws IOException;
    public TypeInformation<RowData> getProducedType();
    public boolean isEndOfStream(RowData nextElement);
}

Usage Examples

Table Source Integration:

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.RowData;

// Create type information for RowData
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(rowType);

// Create deserializer
AvroRowDataDeserializationSchema deserializer = new AvroRowDataDeserializationSchema(
    rowType, 
    typeInfo
);

// Use in table source
TableSource<?> source = new CustomTableSource(deserializer);

Kafka Table Integration:

// Create deserializer for Kafka table source
AvroRowDataDeserializationSchema kafkaDeserializer = new AvroRowDataDeserializationSchema(
    rowType,
    typeInfo,
    AvroEncoding.BINARY
);

// Configure Kafka table
Map<String, String> properties = new HashMap<>();
properties.put("connector", "kafka");
properties.put("topic", "avro-topic");
properties.put("format", "avro");

Format Factory Integration

The library provides automatic format discovery for Table API through the format factory.

public class AvroFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {
    public static final String IDENTIFIER = "avro";
    
    public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
    public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
    public String factoryIdentifier();
    public Set<ConfigOption<?>> requiredOptions();
    public Set<ConfigOption<?>> optionalOptions();
}

SQL DDL Usage

Create Table with Avro Format:

CREATE TABLE avro_table (
    name STRING,
    age INT,
    active BOOLEAN
) WITH (
    'connector' = 'kafka',
    'topic' = 'avro-topic',
    'format' = 'avro',
    'avro.encoding' = 'binary'
);

Configuration Options:

CREATE TABLE avro_table_json (
    name STRING,
    age INT,
    created_at TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'avro-json-topic',
    'format' = 'avro',
    'avro.encoding' = 'json',
    'avro.timestamp_mapping.legacy' = 'false'
);

Schema Conversion

The library automatically converts between Flink and Avro type systems:

public class AvroSchemaConverter {
    public static Schema convertToSchema(RowType rowType);
    public static Schema convertToSchema(RowType rowType, boolean legacyTimestampMapping);
    public static LogicalType convertToLogicalType(Schema schema);
}

Type Mapping

Flink to Avro Types:

Flink TypeAvro TypeNotes
BOOLEANbooleanDirect mapping
INTint32-bit signed integer
BIGINTlong64-bit signed integer
FLOATfloat32-bit floating point
DOUBLEdouble64-bit floating point
STRINGstringUTF-8 string
BINARY/VARBINARYbytesByte array
TIMESTAMPtimestamp-millisLogical type (legacy mapping)
TIMESTAMP_LTZtimestamp-millisLogical type (correct mapping)
DATEdateLogical type
TIMEtime-millisLogical type
DECIMALdecimalLogical type with precision/scale
ARRAYarrayRepeated field
ROWrecordNested record
MAPmapKey-value mapping

Timestamp Mapping Options

The library supports two timestamp mapping modes:

Legacy Mapping (Default)

  • Both TIMESTAMP and TIMESTAMP_LTZ map to Avro timestamp-millis
  • Maintains backward compatibility with Flink versions < 1.19
  • Enable with avro.timestamp_mapping.legacy = true

Correct Mapping

  • TIMESTAMP maps to Avro local-timestamp-millis
  • TIMESTAMP_LTZ maps to Avro timestamp-millis
  • More semantically correct but may break existing pipelines
  • Enable with avro.timestamp_mapping.legacy = false

Error Handling

Schema Conversion Errors:

  • Throws IllegalArgumentException for unsupported type conversions
  • Logs warnings for lossy type conversions

Runtime Errors:

  • Serialization failures throw RuntimeException
  • Deserialization failures throw IOException
  • Schema mismatches result in detailed error messages

Performance Optimization

  • Schema Caching: Converted schemas are cached to avoid repeated conversion
  • Converter Reuse: Row-to-Avro converters are created once and reused
  • Memory Efficiency: Direct field access without object allocation where possible
  • Lazy Initialization: Schema parsing occurs only when needed

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-avro

docs

bulk-writers.md

file-io-operations.md

index.md

schema-registry-integration.md

serialization-deserialization.md

table-api-integration.md

type-system-integration.md

tile.json