CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-avro

Apache Flink SQL Avro format library that provides bundled and shaded Apache Avro dependencies for SQL usage in Flink applications.

Pending
Overview
Eval results
Files

rowdata.mddocs/

Row Data Integration

Capabilities

RowData Deserialization

Integration with Flink's internal RowData format for table API and SQL processing.

/**
 * Deserialization schema that converts Avro records to Flink RowData
 * Marked as @PublicEvolving API
 */
public class AvroRowDataDeserializationSchema implements DeserializationSchema<RowData> {
    
    /**
     * Creates an AvroRowDataDeserializationSchema with default settings
     * @param rowType The Flink RowType describing the target schema
     * @param typeInfo Type information for the RowData
     */
    public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> typeInfo);
    
    /**
     * Creates an AvroRowDataDeserializationSchema with custom encoding
     * @param rowType The Flink RowType describing the target schema
     * @param typeInfo Type information for the RowData
     * @param encoding Avro encoding type (BINARY or JSON)
     */
    public AvroRowDataDeserializationSchema(
        RowType rowType, 
        TypeInformation<RowData> typeInfo, 
        AvroEncoding encoding);
    
    /**
     * Creates an AvroRowDataDeserializationSchema with full configuration
     * @param rowType The Flink RowType describing the target schema
     * @param typeInfo Type information for the RowData
     * @param encoding Avro encoding type (BINARY or JSON)
     * @param legacyTimestampMapping Whether to use legacy timestamp mapping
     */
    public AvroRowDataDeserializationSchema(
        RowType rowType, 
        TypeInformation<RowData> typeInfo, 
        AvroEncoding encoding, 
        boolean legacyTimestampMapping);
    
    /**
     * Creates an AvroRowDataDeserializationSchema with custom nested schema
     * @param nestedSchema The nested Avro deserialization schema
     * @param runtimeConverter Runtime converter for Avro to RowData conversion
     * @param typeInfo Type information for the RowData
     */
    public AvroRowDataDeserializationSchema(
        DeserializationSchema<GenericRecord> nestedSchema,
        AvroToRowDataConverters.AvroToRowDataConverter runtimeConverter,
        TypeInformation<RowData> typeInfo);
    
    /**
     * Deserializes byte array to RowData
     * @param message Serialized Avro message bytes
     * @return RowData instance, or null if message is null
     * @throws IOException If deserialization fails
     */
    public RowData deserialize(byte[] message) throws IOException;
    
    /**
     * Checks if element signals end of stream
     * @param nextElement The RowData element to check  
     * @return Always false for Avro records
     */
    public boolean isEndOfStream(RowData nextElement);
    
    /**
     * Gets the type information for produced RowData
     * @return TypeInformation for RowData
     */
    public TypeInformation<RowData> getProducedType();
}

RowData Serialization

/**
 * Serialization schema that converts Flink RowData to Avro format
 */
public class AvroRowDataSerializationSchema implements SerializationSchema<RowData> {
    
    /**
     * Creates an AvroRowDataSerializationSchema with default settings
     * @param rowType The Flink RowType describing the source schema
     */
    public AvroRowDataSerializationSchema(RowType rowType);
    
    /**
     * Creates an AvroRowDataSerializationSchema with custom encoding
     * @param rowType The Flink RowType describing the source schema
     * @param encoding Avro encoding type (BINARY or JSON)
     */
    public AvroRowDataSerializationSchema(RowType rowType, AvroEncoding encoding);
    
    /**
     * Creates an AvroRowDataSerializationSchema with full configuration
     * @param rowType The Flink RowType describing the source schema
     * @param encoding Avro encoding type (BINARY or JSON)
     * @param legacyTimestampMapping Whether to use legacy timestamp mapping
     */
    public AvroRowDataSerializationSchema(
        RowType rowType, 
        AvroEncoding encoding, 
        boolean legacyTimestampMapping);
    
    /**
     * Creates an AvroRowDataSerializationSchema with custom nested schema
     * @param rowType The Flink RowType describing the source schema
     * @param nestedSchema The nested Avro serialization schema
     * @param runtimeConverter Runtime converter for RowData to Avro conversion
     */
    public AvroRowDataSerializationSchema(
        RowType rowType,
        SerializationSchema<GenericRecord> nestedSchema,
        RowDataToAvroConverters.RowDataToAvroConverter runtimeConverter);
    
    /**
     * Serializes RowData to byte array
     * @param rowData The RowData to serialize
     * @return Serialized byte array
     */
    public byte[] serialize(RowData rowData);
    
    /**
     * Opens the serializer for initialization
     * @param context Initialization context
     * @throws Exception If initialization fails
     */
    public void open(InitializationContext context) throws Exception;
}

Type Conversion Utilities

/**
 * Utilities for converting Avro records to Flink RowData
 */
public class AvroToRowDataConverters {
    
    /**
     * Interface for converting Avro records to RowData
     */
    public interface AvroToRowDataConverter {
        /**
         * Converts an Avro record to RowData
         * @param avroObject The Avro record to convert
         * @return Converted RowData
         */
        Object convert(Object avroObject);
    }
    
    /**
     * Creates a converter for the given RowType
     * @param rowType The target Flink RowType
     * @return Converter instance
     */
    public static AvroToRowDataConverter createRowConverter(RowType rowType);
}

/**
 * Utilities for converting Flink RowData to Avro records
 */
public class RowDataToAvroConverters {
    
    /**
     * Interface for converting RowData to Avro records
     */
    public interface RowDataToAvroConverter {
        /**
         * Converts RowData to an Avro record
         * @param schema Target Avro schema
         * @param rowData The RowData to convert
         * @return Converted Avro record
         */
        Object convert(Schema schema, RowData rowData);
    }
    
    /**
     * Creates a converter for the given RowType
     * @param rowType The source Flink RowType
     * @return Converter instance
     */
    public static RowDataToAvroConverter createConverter(RowType rowType);
}

Usage Examples

Table API Integration

// Define Flink table schema
RowType rowType = RowType.of(
    new DataType[] {
        DataTypes.BIGINT(),
        DataTypes.STRING(),
        DataTypes.STRING(),
        DataTypes.TIMESTAMP(3)
    },
    new String[] {"user_id", "username", "email", "created_at"}
);

// Create type information
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(rowType);

// Create deserializer for table processing
AvroRowDataDeserializationSchema deserializer = 
    new AvroRowDataDeserializationSchema(
        rowType, 
        typeInfo, 
        AvroEncoding.BINARY, 
        false // Use correct timestamp mapping
    );

// Create serializer for output
AvroRowDataSerializationSchema serializer = 
    new AvroRowDataSerializationSchema(
        rowType, 
        AvroEncoding.BINARY, 
        false
    );

DataStream to Table Conversion

// Convert byte stream to RowData
DataStream<byte[]> avroStream = // ... your Avro byte stream

DataStream<RowData> rowDataStream = avroStream
    .map(new MapFunction<byte[], RowData>() {
        private AvroRowDataDeserializationSchema deserializer;
        
        @Override
        public void open(Configuration parameters) throws Exception {
            RowType rowType = RowType.of(
                new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
                new String[] {"id", "name"}
            );
            TypeInformation<RowData> typeInfo = InternalTypeInfo.of(rowType);
            
            deserializer = new AvroRowDataDeserializationSchema(
                rowType, typeInfo, AvroEncoding.BINARY, false
            );
        }
        
        @Override
        public RowData map(byte[] value) throws Exception {
            return deserializer.deserialize(value);
        }
    });

// Convert to Table for SQL processing
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv.fromDataStream(rowDataStream);

// Register as temporary table
tableEnv.createTemporaryView("my_table", table);

// Process with SQL
Table result = tableEnv.sqlQuery("SELECT id, UPPER(name) as name FROM my_table WHERE id > 100");

Custom Type Conversions

// Create custom converter for complex types
RowType complexRowType = RowType.of(
    new DataType[] {
        DataTypes.ROW(
            DataTypes.FIELD("street", DataTypes.STRING()),
            DataTypes.FIELD("city", DataTypes.STRING()),
            DataTypes.FIELD("zipcode", DataTypes.STRING())
        ),
        DataTypes.ARRAY(DataTypes.STRING()),
        DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())
    },
    new String[] {"address", "tags", "metadata"}
);

// Create converter
AvroToRowDataConverters.AvroToRowDataConverter converter = 
    AvroToRowDataConverters.createRowConverter(complexRowType);

// Use converter manually
GenericRecord avroRecord = // ... your Avro record
RowData rowData = (RowData) converter.convert(avroRecord);

// For serialization direction
RowDataToAvroConverters.RowDataToAvroConverter toAvroConverter = 
    RowDataToAvroConverters.createConverter(complexRowType);

Schema avroSchema = AvroSchemaConverter.convertToSchema(complexRowType);
GenericRecord converted = (GenericRecord) toAvroConverter.convert(avroSchema, rowData);

SQL Table with Custom Schema

// Create table with Avro format and specific configuration
String createTableSQL = """
    CREATE TABLE user_events (
        user_id BIGINT,
        event_type STRING,
        event_data ROW<
            action STRING,
            target STRING,
            metadata MAP<STRING, STRING>
        >,
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'user-events',
        'properties.bootstrap.servers' = 'localhost:9092',
        'format' = 'avro',
        'avro.encoding' = 'binary',
        'avro.timestamp_mapping.legacy' = 'false'
    )
    """;

tableEnv.executeSql(createTableSQL);

// Query the table
Table result = tableEnv.sqlQuery("""
    SELECT 
        user_id,
        event_type,
        event_data.action,
        COUNT(*) as event_count,
        TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end
    FROM user_events
    WHERE event_data.action = 'click'
    GROUP BY 
        user_id, 
        event_type, 
        event_data.action,
        TUMBLE(event_time, INTERVAL '1' MINUTE)
    """);

Handling Nullable Fields

// Schema with nullable fields
RowType nullableRowType = RowType.of(
    new DataType[] {
        DataTypes.BIGINT().notNull(),           // Required field
        DataTypes.STRING(),                     // Nullable string
        DataTypes.INT(),                        // Nullable int
        DataTypes.TIMESTAMP(3)                  // Nullable timestamp
    },
    new String[] {"id", "name", "age", "last_login"}
);

// The deserializer automatically handles null values according to Avro schema
AvroRowDataDeserializationSchema deserializer = 
    new AvroRowDataDeserializationSchema(
        nullableRowType,
        InternalTypeInfo.of(nullableRowType),
        AvroEncoding.BINARY,
        false
    );

// Access nullable fields safely
DataStream<RowData> processed = rowDataStream
    .map(new MapFunction<RowData, RowData>() {
        @Override
        public RowData map(RowData row) throws Exception {
            // Check if name field is null (field index 1)
            if (!row.isNullAt(1)) {
                String name = row.getString(1).toString();
                // Process non-null name
            }
            return row;
        }
    });

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-avro

docs

configuration.md

filesystem.md

index.md

registry.md

rowdata.md

schemas.md

utilities.md

tile.json