Apache Flink SQL Avro format library that provides bundled and shaded Apache Avro dependencies for SQL usage in Flink applications.
—
Support for deserializing Avro data where schema information is embedded within the message using a configurable schema coder.
/**
* Deserialization schema that reads schema from input stream using SchemaCoder
* Extends AvroDeserializationSchema with schema-encoded message support
* @param <T> Type of record it produces
*/
public class RegistryAvroDeserializationSchema<T> extends AvroDeserializationSchema<T> {
/**
* Creates schema-encoded deserialization schema
* @param recordClazz Class to deserialize (SpecificRecord or GenericRecord)
* @param reader Reader's Avro schema (required for GenericRecord)
* @param schemaCoderProvider Provider for SchemaCoder that reads schema from stream
*/
public RegistryAvroDeserializationSchema(
Class<T> recordClazz,
Schema reader,
SchemaCoder.SchemaCoderProvider schemaCoderProvider);
/**
* Creates schema-encoded deserialization schema with custom encoding
* @param recordClazz Class to deserialize (SpecificRecord or GenericRecord)
* @param reader Reader's Avro schema (required for GenericRecord)
* @param schemaCoderProvider Provider for SchemaCoder that reads schema from stream
* @param encoding Avro encoding type (BINARY or JSON)
*/
public RegistryAvroDeserializationSchema(
Class<T> recordClazz,
Schema reader,
SchemaCoder.SchemaCoderProvider schemaCoderProvider,
AvroEncoding encoding);
/**
* Deserializes message with embedded schema
* @param message Serialized message bytes with embedded schema
* @return Deserialized object
* @throws IOException If deserialization or schema reading fails
*/
public T deserialize(byte[] message) throws IOException;
/**
* Checks if element signals end of stream
* @param nextElement The element to check
* @return Always false for Avro records
*/
public boolean isEndOfStream(T nextElement);
/**
* Gets the type information for produced type
* @return TypeInformation for the produced type
*/
public TypeInformation<T> getProducedType();
}Support for serializing Avro data with embedded schema information.
/**
* Serialization schema that embeds schema information in messages using SchemaCoder
* Extends AvroSerializationSchema with schema-encoded message support
* @param <T> Type of record it consumes
*/
public class RegistryAvroSerializationSchema<T> extends AvroSerializationSchema<T> {
/**
* Creates schema-encoded serialization schema
* @param subject Subject name for schema identification
* @param recordClazz Class to serialize (SpecificRecord or GenericRecord)
* @param writer Writer's Avro schema (required for GenericRecord)
* @param schemaCoderProvider Provider for SchemaCoder that writes schema to stream
*/
public RegistryAvroSerializationSchema(
String subject,
Class<T> recordClazz,
Schema writer,
SchemaCoder.SchemaCoderProvider schemaCoderProvider);
/**
* Creates schema-encoded serialization schema with custom encoding
* @param subject Subject name for schema identification
* @param recordClazz Class to serialize (SpecificRecord or GenericRecord)
* @param writer Writer's Avro schema (required for GenericRecord)
* @param schemaCoderProvider Provider for SchemaCoder that writes schema to stream
* @param encoding Avro encoding type (BINARY or JSON)
*/
public RegistryAvroSerializationSchema(
String subject,
Class<T> recordClazz,
Schema writer,
SchemaCoder.SchemaCoderProvider schemaCoderProvider,
AvroEncoding encoding);
/**
* Serializes object with embedded schema
* Message includes schema information followed by Avro data
* @param object Object to serialize
* @return Serialized byte array with embedded schema
*/
public byte[] serialize(T object);
}Interface for reading and writing schema information in message streams.
/**
* Schema coder that allows reading schema embedded in serialized records
* Used by RegistryAvroDeserializationSchema and RegistryAvroSerializationSchema
*/
public interface SchemaCoder {
/**
* Reads schema from input stream
* @param in Input stream containing schema information
* @return Parsed Avro schema
* @throws IOException If schema reading fails
*/
Schema readSchema(InputStream in) throws IOException;
/**
* Writes schema to output stream
* @param schema Avro schema to write
* @param out Output stream to write schema to
* @throws IOException If schema writing fails
*/
void writeSchema(Schema schema, OutputStream out) throws IOException;
/**
* Provider interface for creating SchemaCoder instances
* Allows creating multiple instances in parallel operators without serialization issues
*/
interface SchemaCoderProvider extends Serializable {
/**
* Creates a new instance of SchemaCoder
* Each call should create a new instance for use in different nodes
* @return New SchemaCoder instance
*/
SchemaCoder get();
}
}// Configuration for schema registry
Map<String, String> registryConfig = new HashMap<>();
registryConfig.put("schema.registry.url", "http://localhost:8081");
registryConfig.put("schema.registry.subject.strategy", "topic-value");
// For authentication (if required)
registryConfig.put("schema.registry.username", "registry-user");
registryConfig.put("schema.registry.password", "registry-password");
// Create registry-aware deserializer
RegistryAvroDeserializationSchema<GenericRecord> deserializer =
RegistryAvroDeserializationSchema.<GenericRecord>builder()
.setRegistryConfig(registryConfig)
.setSubject("user-events-value")
.build();
// Create registry-aware serializer
RegistryAvroSerializationSchema<GenericRecord> serializer =
RegistryAvroSerializationSchema.<GenericRecord>builder()
.setRegistryConfig(registryConfig)
.setSubject("processed-events-value")
.setSchema(outputSchema)
.build();// Kafka properties with schema registry configuration
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "avro-consumer-group");
// Schema registry properties
Properties schemaRegistryProps = new Properties();
schemaRegistryProps.setProperty("schema.registry.url", "http://localhost:8081");
// Consumer with registry-aware deserialization
KafkaSource<GenericRecord> kafkaSource = KafkaSource.<GenericRecord>builder()
.setBootstrapServers("localhost:9092")
.setTopics("user-events")
.setGroupId("avro-consumer-group")
.setValueDeserializer(KafkaRecordDeserializationSchema.valueOnly(
RegistryAvroDeserializationSchema.<GenericRecord>builder()
.setRegistryConfig(schemaRegistryProps)
.setSubject("user-events-value")
.build()
))
.build();
DataStream<GenericRecord> stream = env.fromSource(
kafkaSource,
WatermarkStrategy.noWatermarks(),
"Kafka Avro Source"
);
// Producer with registry-aware serialization
KafkaSink<GenericRecord> kafkaSink = KafkaSink.<GenericRecord>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.<GenericRecord>builder()
.setTopic("processed-events")
.setValueSerializer(
RegistryAvroSerializationSchema.<GenericRecord>builder()
.setRegistryConfig(schemaRegistryProps)
.setSubject("processed-events-value")
.build()
)
.build())
.build();
processedStream.sinkTo(kafkaSink);// Consumer that handles schema evolution automatically
RegistryAvroDeserializationSchema<GenericRecord> evolutionDeserializer =
RegistryAvroDeserializationSchema.<GenericRecord>builder()
.setRegistryConfig(registryConfig)
.setSubject("user-events-value")
.setReaderSchema(readerSchema) // Optional: specify expected schema
.setSchemaEvolutionEnabled(true)
.build();
DataStream<GenericRecord> stream = env
.addSource(new FlinkKafkaConsumer<>("user-events", evolutionDeserializer, kafkaProps));
// Handle records that may have different schemas
DataStream<ProcessedEvent> processed = stream
.map(new MapFunction<GenericRecord, ProcessedEvent>() {
@Override
public ProcessedEvent map(GenericRecord record) throws Exception {
ProcessedEvent event = new ProcessedEvent();
// Required fields (present in all schema versions)
event.setUserId((Long) record.get("user_id"));
event.setEventType(record.get("event_type").toString());
// Optional fields (may not exist in older schema versions)
Object sessionId = record.get("session_id");
if (sessionId != null) {
event.setSessionId(sessionId.toString());
}
// New fields (may not exist in older records)
Object deviceInfo = record.get("device_info");
if (deviceInfo != null) {
event.setDeviceInfo(deviceInfo.toString());
}
return event;
}
});// Different schemas for different types of events
Map<String, RegistryAvroDeserializationSchema<GenericRecord>> deserializers = new HashMap<>();
// User events deserializer
deserializers.put("user-events",
RegistryAvroDeserializationSchema.<GenericRecord>builder()
.setRegistryConfig(registryConfig)
.setSubject("user-events-value")
.build());
// System events deserializer
deserializers.put("system-events",
RegistryAvroDeserializationSchema.<GenericRecord>builder()
.setRegistryConfig(registryConfig)
.setSubject("system-events-value")
.build());
// Transaction events deserializer
deserializers.put("transaction-events",
RegistryAvroDeserializationSchema.<GenericRecord>builder()
.setRegistryConfig(registryConfig)
.setSubject("transaction-events-value")
.build());
// Process different event types
DataStream<String> allEvents = env
.addSource(new FlinkKafkaConsumer<>(
Arrays.asList("user-events", "system-events", "transaction-events"),
new KafkaDeserializationSchema<String>() {
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
String topic = record.topic();
RegistryAvroDeserializationSchema<GenericRecord> deserializer =
deserializers.get(topic);
if (deserializer != null) {
GenericRecord avroRecord = deserializer.deserialize(record.value());
return processEventByType(topic, avroRecord);
}
return null;
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
},
kafkaProps));// Registry deserializer with validation and error handling
RegistryAvroDeserializationSchema<GenericRecord> validatingDeserializer =
RegistryAvroDeserializationSchema.<GenericRecord>builder()
.setRegistryConfig(registryConfig)
.setSubject("user-events-value")
.setValidationEnabled(true)
.setErrorMode(ErrorMode.IGNORE) // or FAIL, LOG_AND_CONTINUE
.build();
DataStream<GenericRecord> validatedStream = stream
.map(new MapFunction<byte[], GenericRecord>() {
@Override
public GenericRecord map(byte[] value) throws Exception {
try {
return validatingDeserializer.deserialize(value);
} catch (SchemaNotFoundException e) {
// Handle unknown schema ID
logger.warn("Unknown schema ID in message: " + e.getSchemaId());
return null;
} catch (IncompatibleSchemaException e) {
// Handle schema compatibility issues
logger.error("Schema compatibility error: " + e.getMessage());
return null;
} catch (IOException e) {
// Handle general deserialization errors
logger.error("Deserialization error: " + e.getMessage());
return null;
}
}
})
.filter(Objects::nonNull); // Remove failed deserializations// Create table with schema registry integration
String createTableSQL = """
CREATE TABLE user_events (
user_id BIGINT,
event_type STRING,
event_data ROW<
action STRING,
target STRING
>,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'avro',
'avro.schema-registry.url' = 'http://localhost:8081',
'avro.schema-registry.subject' = 'user-events-value'
)
""";
tableEnv.executeSql(createTableSQL);
// Output table with schema registry
String createOutputTableSQL = """
CREATE TABLE processed_events (
user_id BIGINT,
event_count BIGINT,
last_event_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'processed-events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'avro',
'avro.schema-registry.url' = 'http://localhost:8081',
'avro.schema-registry.subject' = 'processed-events-value'
)
""";
tableEnv.executeSql(createOutputTableSQL);
// Process with SQL
String processingSQL = """
INSERT INTO processed_events
SELECT
user_id,
COUNT(*) as event_count,
MAX(event_time) as last_event_time
FROM user_events
WHERE event_time >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR
GROUP BY user_id
""";
tableEnv.executeSql(processingSQL);// Utility methods for schema registry management
public class SchemaRegistryUtils {
/**
* Register a new schema version
*/
public static int registerSchema(String registryUrl, String subject, Schema schema)
throws IOException {
// Implementation for registering schemas
SchemaRegistryClient client = new CachedSchemaRegistryClient(registryUrl, 100);
return client.register(subject, schema);
}
/**
* Get latest schema for subject
*/
public static Schema getLatestSchema(String registryUrl, String subject)
throws IOException {
SchemaRegistryClient client = new CachedSchemaRegistryClient(registryUrl, 100);
return client.getLatestSchemaMetadata(subject).getSchema();
}
/**
* Check schema compatibility
*/
public static boolean isCompatible(String registryUrl, String subject, Schema newSchema)
throws IOException {
SchemaRegistryClient client = new CachedSchemaRegistryClient(registryUrl, 100);
return client.testCompatibility(subject, newSchema);
}
}
// Example usage
Schema newSchema = // ... your new schema
boolean compatible = SchemaRegistryUtils.isCompatible(
"http://localhost:8081",
"user-events-value",
newSchema
);
if (compatible) {
int schemaId = SchemaRegistryUtils.registerSchema(
"http://localhost:8081",
"user-events-value",
newSchema
);
System.out.println("Registered new schema with ID: " + schemaId);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-avro