Apache Flink Avro format support library providing serialization and deserialization capabilities for Apache Avro data format within Flink streaming and batch processing applications
—
Row-based serialization and deserialization for seamless integration with Flink's Table API and SQL layer. Converts between Flink's internal RowData representation and Avro format, enabling SQL operations on Avro data.
Serializes Flink RowData objects to Avro binary or JSON format for Table API integration.
public class AvroRowDataSerializationSchema implements SerializationSchema<RowData> {
// Constructors
public AvroRowDataSerializationSchema(RowType rowType);
public AvroRowDataSerializationSchema(RowType rowType, AvroEncoding encoding);
public AvroRowDataSerializationSchema(RowType rowType, AvroEncoding encoding, boolean legacyTimestampMapping);
public AvroRowDataSerializationSchema(RowType rowType, SerializationSchema<GenericRecord> nestedSchema, RowDataToAvroConverters.RowDataToAvroConverter runtimeConverter);
// Instance methods
public byte[] serialize(RowData row);
public void open(InitializationContext context) throws Exception;
}Basic Table Integration:
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
// Define table schema
RowType rowType = RowType.of(
new LogicalType[] {
DataTypes.STRING().getLogicalType(),
DataTypes.INT().getLogicalType(),
DataTypes.BOOLEAN().getLogicalType()
},
new String[] {"name", "age", "active"}
);
// Create serializer
AvroRowDataSerializationSchema serializer = new AvroRowDataSerializationSchema(rowType);
// Use in table sink
TableSink<?> sink = new CustomTableSink(serializer);With Custom Encoding:
// Use JSON encoding for debugging
AvroRowDataSerializationSchema jsonSerializer = new AvroRowDataSerializationSchema(
rowType,
AvroEncoding.JSON
);
// Control timestamp mapping behavior
AvroRowDataSerializationSchema legacySerializer = new AvroRowDataSerializationSchema(
rowType,
AvroEncoding.BINARY,
true // Use legacy timestamp mapping for compatibility
);Deserializes Avro data to Flink RowData objects for Table API operations.
public class AvroRowDataDeserializationSchema implements DeserializationSchema<RowData> {
// Constructors
public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> producedTypeInfo);
public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> producedTypeInfo, AvroEncoding encoding);
public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> producedTypeInfo, AvroEncoding encoding, boolean legacyTimestampMapping);
// Instance methods
public RowData deserialize(byte[] message) throws IOException;
public TypeInformation<RowData> getProducedType();
public boolean isEndOfStream(RowData nextElement);
}Table Source Integration:
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.RowData;
// Create type information for RowData
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(rowType);
// Create deserializer
AvroRowDataDeserializationSchema deserializer = new AvroRowDataDeserializationSchema(
rowType,
typeInfo
);
// Use in table source
TableSource<?> source = new CustomTableSource(deserializer);Kafka Table Integration:
// Create deserializer for Kafka table source
AvroRowDataDeserializationSchema kafkaDeserializer = new AvroRowDataDeserializationSchema(
rowType,
typeInfo,
AvroEncoding.BINARY
);
// Configure Kafka table
Map<String, String> properties = new HashMap<>();
properties.put("connector", "kafka");
properties.put("topic", "avro-topic");
properties.put("format", "avro");The library provides automatic format discovery for Table API through the format factory.
public class AvroFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {
public static final String IDENTIFIER = "avro";
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
public String factoryIdentifier();
public Set<ConfigOption<?>> requiredOptions();
public Set<ConfigOption<?>> optionalOptions();
}Create Table with Avro Format:
CREATE TABLE avro_table (
name STRING,
age INT,
active BOOLEAN
) WITH (
'connector' = 'kafka',
'topic' = 'avro-topic',
'format' = 'avro',
'avro.encoding' = 'binary'
);Configuration Options:
CREATE TABLE avro_table_json (
name STRING,
age INT,
created_at TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'avro-json-topic',
'format' = 'avro',
'avro.encoding' = 'json',
'avro.timestamp_mapping.legacy' = 'false'
);The library automatically converts between Flink and Avro type systems:
public class AvroSchemaConverter {
public static Schema convertToSchema(RowType rowType);
public static Schema convertToSchema(RowType rowType, boolean legacyTimestampMapping);
public static LogicalType convertToLogicalType(Schema schema);
}Flink to Avro Types:
| Flink Type | Avro Type | Notes |
|---|---|---|
| BOOLEAN | boolean | Direct mapping |
| INT | int | 32-bit signed integer |
| BIGINT | long | 64-bit signed integer |
| FLOAT | float | 32-bit floating point |
| DOUBLE | double | 64-bit floating point |
| STRING | string | UTF-8 string |
| BINARY/VARBINARY | bytes | Byte array |
| TIMESTAMP | timestamp-millis | Logical type (legacy mapping) |
| TIMESTAMP_LTZ | timestamp-millis | Logical type (correct mapping) |
| DATE | date | Logical type |
| TIME | time-millis | Logical type |
| DECIMAL | decimal | Logical type with precision/scale |
| ARRAY | array | Repeated field |
| ROW | record | Nested record |
| MAP | map | Key-value mapping |
The library supports two timestamp mapping modes:
TIMESTAMP and TIMESTAMP_LTZ map to Avro timestamp-millisavro.timestamp_mapping.legacy = trueTIMESTAMP maps to Avro local-timestamp-millisTIMESTAMP_LTZ maps to Avro timestamp-millisavro.timestamp_mapping.legacy = falseSchema Conversion Errors:
IllegalArgumentException for unsupported type conversionsRuntime Errors:
RuntimeExceptionIOExceptionInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-avro