CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-avro

Apache Flink SQL Avro format library that provides bundled and shaded Apache Avro dependencies for SQL usage in Flink applications.

Pending
Overview
Eval results
Files

registry.mddocs/

Schema-Encoded Message Support

Capabilities

Schema-Encoded Deserialization

Support for deserializing Avro data where schema information is embedded within the message using a configurable schema coder.

/**
 * Deserialization schema that reads schema from input stream using SchemaCoder
 * Extends AvroDeserializationSchema with schema-encoded message support
 * @param <T> Type of record it produces
 */
public class RegistryAvroDeserializationSchema<T> extends AvroDeserializationSchema<T> {
    
    /**
     * Creates schema-encoded deserialization schema
     * @param recordClazz Class to deserialize (SpecificRecord or GenericRecord)
     * @param reader Reader's Avro schema (required for GenericRecord)
     * @param schemaCoderProvider Provider for SchemaCoder that reads schema from stream
     */
    public RegistryAvroDeserializationSchema(
        Class<T> recordClazz,
        Schema reader,
        SchemaCoder.SchemaCoderProvider schemaCoderProvider);
    
    /**
     * Creates schema-encoded deserialization schema with custom encoding
     * @param recordClazz Class to deserialize (SpecificRecord or GenericRecord)
     * @param reader Reader's Avro schema (required for GenericRecord)
     * @param schemaCoderProvider Provider for SchemaCoder that reads schema from stream
     * @param encoding Avro encoding type (BINARY or JSON)
     */
    public RegistryAvroDeserializationSchema(
        Class<T> recordClazz,
        Schema reader,
        SchemaCoder.SchemaCoderProvider schemaCoderProvider,
        AvroEncoding encoding);
    
    /**
     * Deserializes message with embedded schema
     * @param message Serialized message bytes with embedded schema
     * @return Deserialized object
     * @throws IOException If deserialization or schema reading fails
     */
    public T deserialize(byte[] message) throws IOException;
    
    /**
     * Checks if element signals end of stream
     * @param nextElement The element to check
     * @return Always false for Avro records
     */
    public boolean isEndOfStream(T nextElement);
    
    /**
     * Gets the type information for produced type
     * @return TypeInformation for the produced type
     */
    public TypeInformation<T> getProducedType();
}

Schema-Encoded Serialization

Support for serializing Avro data with embedded schema information.

/**
 * Serialization schema that embeds schema information in messages using SchemaCoder
 * Extends AvroSerializationSchema with schema-encoded message support
 * @param <T> Type of record it consumes
 */
public class RegistryAvroSerializationSchema<T> extends AvroSerializationSchema<T> {
    
    /**
     * Creates schema-encoded serialization schema
     * @param subject Subject name for schema identification
     * @param recordClazz Class to serialize (SpecificRecord or GenericRecord)
     * @param writer Writer's Avro schema (required for GenericRecord)
     * @param schemaCoderProvider Provider for SchemaCoder that writes schema to stream
     */
    public RegistryAvroSerializationSchema(
        String subject,
        Class<T> recordClazz,
        Schema writer,
        SchemaCoder.SchemaCoderProvider schemaCoderProvider);
    
    /**
     * Creates schema-encoded serialization schema with custom encoding
     * @param subject Subject name for schema identification
     * @param recordClazz Class to serialize (SpecificRecord or GenericRecord)
     * @param writer Writer's Avro schema (required for GenericRecord)
     * @param schemaCoderProvider Provider for SchemaCoder that writes schema to stream
     * @param encoding Avro encoding type (BINARY or JSON)
     */
    public RegistryAvroSerializationSchema(
        String subject,
        Class<T> recordClazz,
        Schema writer,
        SchemaCoder.SchemaCoderProvider schemaCoderProvider,
        AvroEncoding encoding);
    
    /**
     * Serializes object with embedded schema
     * Message includes schema information followed by Avro data
     * @param object Object to serialize
     * @return Serialized byte array with embedded schema
     */
    public byte[] serialize(T object);
}

Schema Coder Interface

Interface for reading and writing schema information in message streams.

/**
 * Schema coder that allows reading schema embedded in serialized records
 * Used by RegistryAvroDeserializationSchema and RegistryAvroSerializationSchema
 */
public interface SchemaCoder {
    
    /**
     * Reads schema from input stream
     * @param in Input stream containing schema information
     * @return Parsed Avro schema
     * @throws IOException If schema reading fails
     */
    Schema readSchema(InputStream in) throws IOException;
    
    /**
     * Writes schema to output stream
     * @param schema Avro schema to write
     * @param out Output stream to write schema to
     * @throws IOException If schema writing fails
     */
    void writeSchema(Schema schema, OutputStream out) throws IOException;
    
    /**
     * Provider interface for creating SchemaCoder instances
     * Allows creating multiple instances in parallel operators without serialization issues
     */
    interface SchemaCoderProvider extends Serializable {
        
        /**
         * Creates a new instance of SchemaCoder
         * Each call should create a new instance for use in different nodes
         * @return New SchemaCoder instance
         */
        SchemaCoder get();
    }
}

Usage Examples

Basic Schema Registry Setup

// Configuration for schema registry
Map<String, String> registryConfig = new HashMap<>();
registryConfig.put("schema.registry.url", "http://localhost:8081");
registryConfig.put("schema.registry.subject.strategy", "topic-value");

// For authentication (if required)
registryConfig.put("schema.registry.username", "registry-user");
registryConfig.put("schema.registry.password", "registry-password");

// Create registry-aware deserializer
RegistryAvroDeserializationSchema<GenericRecord> deserializer = 
    RegistryAvroDeserializationSchema.<GenericRecord>builder()
        .setRegistryConfig(registryConfig)
        .setSubject("user-events-value")
        .build();

// Create registry-aware serializer
RegistryAvroSerializationSchema<GenericRecord> serializer = 
    RegistryAvroSerializationSchema.<GenericRecord>builder()
        .setRegistryConfig(registryConfig)
        .setSubject("processed-events-value")
        .setSchema(outputSchema)
        .build();

Kafka Integration with Schema Registry

// Kafka properties with schema registry configuration
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "avro-consumer-group");

// Schema registry properties
Properties schemaRegistryProps = new Properties();
schemaRegistryProps.setProperty("schema.registry.url", "http://localhost:8081");

// Consumer with registry-aware deserialization
KafkaSource<GenericRecord> kafkaSource = KafkaSource.<GenericRecord>builder()
    .setBootstrapServers("localhost:9092")  
    .setTopics("user-events")
    .setGroupId("avro-consumer-group")
    .setValueDeserializer(KafkaRecordDeserializationSchema.valueOnly(
        RegistryAvroDeserializationSchema.<GenericRecord>builder()
            .setRegistryConfig(schemaRegistryProps)
            .setSubject("user-events-value")
            .build()
    ))
    .build();

DataStream<GenericRecord> stream = env.fromSource(
    kafkaSource, 
    WatermarkStrategy.noWatermarks(), 
    "Kafka Avro Source"
);

// Producer with registry-aware serialization
KafkaSink<GenericRecord> kafkaSink = KafkaSink.<GenericRecord>builder()
    .setBootstrapServers("localhost:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.<GenericRecord>builder()
        .setTopic("processed-events")
        .setValueSerializer(
            RegistryAvroSerializationSchema.<GenericRecord>builder()
                .setRegistryConfig(schemaRegistryProps)
                .setSubject("processed-events-value")
                .build()
        )
        .build())
    .build();

processedStream.sinkTo(kafkaSink);

Schema Evolution Handling

// Consumer that handles schema evolution automatically
RegistryAvroDeserializationSchema<GenericRecord> evolutionDeserializer = 
    RegistryAvroDeserializationSchema.<GenericRecord>builder()
        .setRegistryConfig(registryConfig)
        .setSubject("user-events-value")
        .setReaderSchema(readerSchema) // Optional: specify expected schema
        .setSchemaEvolutionEnabled(true)
        .build();

DataStream<GenericRecord> stream = env
    .addSource(new FlinkKafkaConsumer<>("user-events", evolutionDeserializer, kafkaProps));

// Handle records that may have different schemas
DataStream<ProcessedEvent> processed = stream
    .map(new MapFunction<GenericRecord, ProcessedEvent>() {
        @Override
        public ProcessedEvent map(GenericRecord record) throws Exception {
            ProcessedEvent event = new ProcessedEvent();
            
            // Required fields (present in all schema versions)
            event.setUserId((Long) record.get("user_id"));
            event.setEventType(record.get("event_type").toString());
            
            // Optional fields (may not exist in older schema versions)
            Object sessionId = record.get("session_id");
            if (sessionId != null) {
                event.setSessionId(sessionId.toString());
            }
            
            // New fields (may not exist in older records)
            Object deviceInfo = record.get("device_info");
            if (deviceInfo != null) {
                event.setDeviceInfo(deviceInfo.toString());
            }
            
            return event;
        }
    });

Multi-Subject Registry Usage

// Different schemas for different types of events
Map<String, RegistryAvroDeserializationSchema<GenericRecord>> deserializers = new HashMap<>();

// User events deserializer
deserializers.put("user-events", 
    RegistryAvroDeserializationSchema.<GenericRecord>builder()
        .setRegistryConfig(registryConfig)
        .setSubject("user-events-value")
        .build());

// System events deserializer  
deserializers.put("system-events",
    RegistryAvroDeserializationSchema.<GenericRecord>builder()
        .setRegistryConfig(registryConfig)
        .setSubject("system-events-value")
        .build());

// Transaction events deserializer
deserializers.put("transaction-events",
    RegistryAvroDeserializationSchema.<GenericRecord>builder()
        .setRegistryConfig(registryConfig)
        .setSubject("transaction-events-value")
        .build());

// Process different event types
DataStream<String> allEvents = env
    .addSource(new FlinkKafkaConsumer<>(
        Arrays.asList("user-events", "system-events", "transaction-events"),
        new KafkaDeserializationSchema<String>() {
            @Override
            public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
                String topic = record.topic();
                RegistryAvroDeserializationSchema<GenericRecord> deserializer = 
                    deserializers.get(topic);
                
                if (deserializer != null) {
                    GenericRecord avroRecord = deserializer.deserialize(record.value());
                    return processEventByType(topic, avroRecord);
                }
                return null;
            }
            
            @Override
            public boolean isEndOfStream(String nextElement) {
                return false;
            }
            
            @Override
            public TypeInformation<String> getProducedType() {
                return BasicTypeInfo.STRING_TYPE_INFO;
            }
        },
        kafkaProps));

Schema Validation and Error Handling

// Registry deserializer with validation and error handling
RegistryAvroDeserializationSchema<GenericRecord> validatingDeserializer = 
    RegistryAvroDeserializationSchema.<GenericRecord>builder()
        .setRegistryConfig(registryConfig)
        .setSubject("user-events-value")
        .setValidationEnabled(true)
        .setErrorMode(ErrorMode.IGNORE) // or FAIL, LOG_AND_CONTINUE
        .build();

DataStream<GenericRecord> validatedStream = stream
    .map(new MapFunction<byte[], GenericRecord>() {
        @Override
        public GenericRecord map(byte[] value) throws Exception {
            try {
                return validatingDeserializer.deserialize(value);
            } catch (SchemaNotFoundException e) {
                // Handle unknown schema ID
                logger.warn("Unknown schema ID in message: " + e.getSchemaId());
                return null;
            } catch (IncompatibleSchemaException e) {
                // Handle schema compatibility issues
                logger.error("Schema compatibility error: " + e.getMessage());
                return null;
            } catch (IOException e) {
                // Handle general deserialization errors
                logger.error("Deserialization error: " + e.getMessage());
                return null;
            }
        }
    })
    .filter(Objects::nonNull); // Remove failed deserializations

Table API with Schema Registry

// Create table with schema registry integration
String createTableSQL = """
    CREATE TABLE user_events (
        user_id BIGINT,
        event_type STRING,
        event_data ROW<
            action STRING,
            target STRING
        >,
        event_time TIMESTAMP(3)
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'user-events',
        'properties.bootstrap.servers' = 'localhost:9092',
        'format' = 'avro',
        'avro.schema-registry.url' = 'http://localhost:8081',
        'avro.schema-registry.subject' = 'user-events-value'
    )
    """;

tableEnv.executeSql(createTableSQL);

// Output table with schema registry
String createOutputTableSQL = """
    CREATE TABLE processed_events (
        user_id BIGINT,
        event_count BIGINT,
        last_event_time TIMESTAMP(3)
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'processed-events',
        'properties.bootstrap.servers' = 'localhost:9092',
        'format' = 'avro',
        'avro.schema-registry.url' = 'http://localhost:8081',
        'avro.schema-registry.subject' = 'processed-events-value'
    )
    """;

tableEnv.executeSql(createOutputTableSQL);

// Process with SQL
String processingSQL = """
    INSERT INTO processed_events
    SELECT 
        user_id,
        COUNT(*) as event_count,
        MAX(event_time) as last_event_time
    FROM user_events
    WHERE event_time >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR
    GROUP BY user_id
    """;

tableEnv.executeSql(processingSQL);

Schema Registry Administration

// Utility methods for schema registry management
public class SchemaRegistryUtils {
    
    /**
     * Register a new schema version
     */
    public static int registerSchema(String registryUrl, String subject, Schema schema) 
            throws IOException {
        // Implementation for registering schemas
        SchemaRegistryClient client = new CachedSchemaRegistryClient(registryUrl, 100);
        return client.register(subject, schema);
    }
    
    /**
     * Get latest schema for subject
     */
    public static Schema getLatestSchema(String registryUrl, String subject) 
            throws IOException {
        SchemaRegistryClient client = new CachedSchemaRegistryClient(registryUrl, 100);
        return client.getLatestSchemaMetadata(subject).getSchema();
    }
    
    /**
     * Check schema compatibility
     */
    public static boolean isCompatible(String registryUrl, String subject, Schema newSchema) 
            throws IOException {
        SchemaRegistryClient client = new CachedSchemaRegistryClient(registryUrl, 100);
        return client.testCompatibility(subject, newSchema);
    }
}

// Example usage
Schema newSchema = // ... your new schema
boolean compatible = SchemaRegistryUtils.isCompatible(
    "http://localhost:8081", 
    "user-events-value", 
    newSchema
);

if (compatible) {
    int schemaId = SchemaRegistryUtils.registerSchema(
        "http://localhost:8081",
        "user-events-value", 
        newSchema
    );
    System.out.println("Registered new schema with ID: " + schemaId);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-avro

docs

configuration.md

filesystem.md

index.md

registry.md

rowdata.md

schemas.md

utilities.md

tile.json