Apache Flink SQL Avro format library that provides bundled and shaded Apache Avro dependencies for SQL usage in Flink applications.
—
Integration with Flink's internal RowData format for table API and SQL processing.
/**
* Deserialization schema that converts Avro records to Flink RowData
* Marked as @PublicEvolving API
*/
public class AvroRowDataDeserializationSchema implements DeserializationSchema<RowData> {
/**
* Creates an AvroRowDataDeserializationSchema with default settings
* @param rowType The Flink RowType describing the target schema
* @param typeInfo Type information for the RowData
*/
public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> typeInfo);
/**
* Creates an AvroRowDataDeserializationSchema with custom encoding
* @param rowType The Flink RowType describing the target schema
* @param typeInfo Type information for the RowData
* @param encoding Avro encoding type (BINARY or JSON)
*/
public AvroRowDataDeserializationSchema(
RowType rowType,
TypeInformation<RowData> typeInfo,
AvroEncoding encoding);
/**
* Creates an AvroRowDataDeserializationSchema with full configuration
* @param rowType The Flink RowType describing the target schema
* @param typeInfo Type information for the RowData
* @param encoding Avro encoding type (BINARY or JSON)
* @param legacyTimestampMapping Whether to use legacy timestamp mapping
*/
public AvroRowDataDeserializationSchema(
RowType rowType,
TypeInformation<RowData> typeInfo,
AvroEncoding encoding,
boolean legacyTimestampMapping);
/**
* Creates an AvroRowDataDeserializationSchema with custom nested schema
* @param nestedSchema The nested Avro deserialization schema
* @param runtimeConverter Runtime converter for Avro to RowData conversion
* @param typeInfo Type information for the RowData
*/
public AvroRowDataDeserializationSchema(
DeserializationSchema<GenericRecord> nestedSchema,
AvroToRowDataConverters.AvroToRowDataConverter runtimeConverter,
TypeInformation<RowData> typeInfo);
/**
* Deserializes byte array to RowData
* @param message Serialized Avro message bytes
* @return RowData instance, or null if message is null
* @throws IOException If deserialization fails
*/
public RowData deserialize(byte[] message) throws IOException;
/**
* Checks if element signals end of stream
* @param nextElement The RowData element to check
* @return Always false for Avro records
*/
public boolean isEndOfStream(RowData nextElement);
/**
* Gets the type information for produced RowData
* @return TypeInformation for RowData
*/
public TypeInformation<RowData> getProducedType();
}/**
* Serialization schema that converts Flink RowData to Avro format
*/
public class AvroRowDataSerializationSchema implements SerializationSchema<RowData> {
/**
* Creates an AvroRowDataSerializationSchema with default settings
* @param rowType The Flink RowType describing the source schema
*/
public AvroRowDataSerializationSchema(RowType rowType);
/**
* Creates an AvroRowDataSerializationSchema with custom encoding
* @param rowType The Flink RowType describing the source schema
* @param encoding Avro encoding type (BINARY or JSON)
*/
public AvroRowDataSerializationSchema(RowType rowType, AvroEncoding encoding);
/**
* Creates an AvroRowDataSerializationSchema with full configuration
* @param rowType The Flink RowType describing the source schema
* @param encoding Avro encoding type (BINARY or JSON)
* @param legacyTimestampMapping Whether to use legacy timestamp mapping
*/
public AvroRowDataSerializationSchema(
RowType rowType,
AvroEncoding encoding,
boolean legacyTimestampMapping);
/**
* Creates an AvroRowDataSerializationSchema with custom nested schema
* @param rowType The Flink RowType describing the source schema
* @param nestedSchema The nested Avro serialization schema
* @param runtimeConverter Runtime converter for RowData to Avro conversion
*/
public AvroRowDataSerializationSchema(
RowType rowType,
SerializationSchema<GenericRecord> nestedSchema,
RowDataToAvroConverters.RowDataToAvroConverter runtimeConverter);
/**
* Serializes RowData to byte array
* @param rowData The RowData to serialize
* @return Serialized byte array
*/
public byte[] serialize(RowData rowData);
/**
* Opens the serializer for initialization
* @param context Initialization context
* @throws Exception If initialization fails
*/
public void open(InitializationContext context) throws Exception;
}/**
* Utilities for converting Avro records to Flink RowData
*/
public class AvroToRowDataConverters {
/**
* Interface for converting Avro records to RowData
*/
public interface AvroToRowDataConverter {
/**
* Converts an Avro record to RowData
* @param avroObject The Avro record to convert
* @return Converted RowData
*/
Object convert(Object avroObject);
}
/**
* Creates a converter for the given RowType
* @param rowType The target Flink RowType
* @return Converter instance
*/
public static AvroToRowDataConverter createRowConverter(RowType rowType);
}
/**
* Utilities for converting Flink RowData to Avro records
*/
public class RowDataToAvroConverters {
/**
* Interface for converting RowData to Avro records
*/
public interface RowDataToAvroConverter {
/**
* Converts RowData to an Avro record
* @param schema Target Avro schema
* @param rowData The RowData to convert
* @return Converted Avro record
*/
Object convert(Schema schema, RowData rowData);
}
/**
* Creates a converter for the given RowType
* @param rowType The source Flink RowType
* @return Converter instance
*/
public static RowDataToAvroConverter createConverter(RowType rowType);
}// Define Flink table schema
RowType rowType = RowType.of(
new DataType[] {
DataTypes.BIGINT(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.TIMESTAMP(3)
},
new String[] {"user_id", "username", "email", "created_at"}
);
// Create type information
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(rowType);
// Create deserializer for table processing
AvroRowDataDeserializationSchema deserializer =
new AvroRowDataDeserializationSchema(
rowType,
typeInfo,
AvroEncoding.BINARY,
false // Use correct timestamp mapping
);
// Create serializer for output
AvroRowDataSerializationSchema serializer =
new AvroRowDataSerializationSchema(
rowType,
AvroEncoding.BINARY,
false
);// Convert byte stream to RowData
DataStream<byte[]> avroStream = // ... your Avro byte stream
DataStream<RowData> rowDataStream = avroStream
.map(new MapFunction<byte[], RowData>() {
private AvroRowDataDeserializationSchema deserializer;
@Override
public void open(Configuration parameters) throws Exception {
RowType rowType = RowType.of(
new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
new String[] {"id", "name"}
);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(rowType);
deserializer = new AvroRowDataDeserializationSchema(
rowType, typeInfo, AvroEncoding.BINARY, false
);
}
@Override
public RowData map(byte[] value) throws Exception {
return deserializer.deserialize(value);
}
});
// Convert to Table for SQL processing
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv.fromDataStream(rowDataStream);
// Register as temporary table
tableEnv.createTemporaryView("my_table", table);
// Process with SQL
Table result = tableEnv.sqlQuery("SELECT id, UPPER(name) as name FROM my_table WHERE id > 100");// Create custom converter for complex types
RowType complexRowType = RowType.of(
new DataType[] {
DataTypes.ROW(
DataTypes.FIELD("street", DataTypes.STRING()),
DataTypes.FIELD("city", DataTypes.STRING()),
DataTypes.FIELD("zipcode", DataTypes.STRING())
),
DataTypes.ARRAY(DataTypes.STRING()),
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())
},
new String[] {"address", "tags", "metadata"}
);
// Create converter
AvroToRowDataConverters.AvroToRowDataConverter converter =
AvroToRowDataConverters.createRowConverter(complexRowType);
// Use converter manually
GenericRecord avroRecord = // ... your Avro record
RowData rowData = (RowData) converter.convert(avroRecord);
// For serialization direction
RowDataToAvroConverters.RowDataToAvroConverter toAvroConverter =
RowDataToAvroConverters.createConverter(complexRowType);
Schema avroSchema = AvroSchemaConverter.convertToSchema(complexRowType);
GenericRecord converted = (GenericRecord) toAvroConverter.convert(avroSchema, rowData);// Create table with Avro format and specific configuration
String createTableSQL = """
CREATE TABLE user_events (
user_id BIGINT,
event_type STRING,
event_data ROW<
action STRING,
target STRING,
metadata MAP<STRING, STRING>
>,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'avro',
'avro.encoding' = 'binary',
'avro.timestamp_mapping.legacy' = 'false'
)
""";
tableEnv.executeSql(createTableSQL);
// Query the table
Table result = tableEnv.sqlQuery("""
SELECT
user_id,
event_type,
event_data.action,
COUNT(*) as event_count,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end
FROM user_events
WHERE event_data.action = 'click'
GROUP BY
user_id,
event_type,
event_data.action,
TUMBLE(event_time, INTERVAL '1' MINUTE)
""");// Schema with nullable fields
RowType nullableRowType = RowType.of(
new DataType[] {
DataTypes.BIGINT().notNull(), // Required field
DataTypes.STRING(), // Nullable string
DataTypes.INT(), // Nullable int
DataTypes.TIMESTAMP(3) // Nullable timestamp
},
new String[] {"id", "name", "age", "last_login"}
);
// The deserializer automatically handles null values according to Avro schema
AvroRowDataDeserializationSchema deserializer =
new AvroRowDataDeserializationSchema(
nullableRowType,
InternalTypeInfo.of(nullableRowType),
AvroEncoding.BINARY,
false
);
// Access nullable fields safely
DataStream<RowData> processed = rowDataStream
.map(new MapFunction<RowData, RowData>() {
@Override
public RowData map(RowData row) throws Exception {
// Check if name field is null (field index 1)
if (!row.isNullAt(1)) {
String name = row.getString(1).toString();
// Process non-null name
}
return row;
}
});Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-avro