Apache Flink Avro format support library providing serialization and deserialization capabilities for Apache Avro data format within Flink streaming and batch processing applications
—
Extended serialization and deserialization schemas with Confluent Schema Registry support for centralized schema management, evolution, and compatibility checking in distributed Avro data processing.
Extended serialization schema that integrates with Confluent Schema Registry for centralized schema management.
public class RegistryAvroSerializationSchema<T> extends AvroSerializationSchema<T> {
// Inherits all methods from AvroSerializationSchema
// Additional schema registry specific functionality
}Basic Registry Integration:
import org.apache.flink.formats.avro.RegistryAvroSerializationSchema;
// Create registry-aware serializer
// Configuration typically done through Flink configuration or environment
RegistryAvroSerializationSchema<User> registrySerializer =
new RegistryAvroSerializationSchema<>(User.class, registryConfig);
// Use in streaming pipeline
DataStream<User> userStream = ...;
DataStream<byte[]> serializedStream = userStream.map(registrySerializer::serialize);With Schema Evolution:
// Register new schema version
String subject = "user-value";
Schema newUserSchema = parseNewUserSchema();
// Serializer automatically handles schema versioning
RegistryAvroSerializationSchema<User> evolvingSerializer =
createRegistrySerializer(User.class, subject, newUserSchema);
// Messages include schema ID for proper deserialization
DataStream<byte[]> versionedStream = userStream.map(evolvingSerializer::serialize);Extended deserialization schema that uses Confluent Schema Registry for schema resolution and evolution handling.
public class RegistryAvroDeserializationSchema<T> extends AvroDeserializationSchema<T> {
// Inherits all methods from AvroDeserializationSchema
// Additional schema registry specific functionality
}Registry-based Deserialization:
import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
// Create registry-aware deserializer
RegistryAvroDeserializationSchema<User> registryDeserializer =
new RegistryAvroDeserializationSchema<>(User.class, registryConfig);
// Use in Kafka source
FlinkKafkaConsumer<User> consumer = new FlinkKafkaConsumer<>(
"user-topic",
registryDeserializer,
kafkaProperties
);
DataStream<User> userStream = env.addSource(consumer);Multi-Version Message Processing:
// Deserializer handles multiple schema versions automatically
RegistryAvroDeserializationSchema<User> multiVersionDeserializer =
createRegistryDeserializer(User.class);
// Messages with different schema versions are properly deserialized
DataStream<User> unifiedStream = rawMessageStream
.map(bytes -> {
try {
return multiVersionDeserializer.deserialize(bytes);
} catch (IOException e) {
// Handle deserialization errors
return null;
}
})
.filter(Objects::nonNull);// Schema Registry client configuration
Map<String, Object> registryConfig = new HashMap<>();
registryConfig.put("schema.registry.url", "http://schema-registry:8081");
registryConfig.put("basic.auth.credentials.source", "USER_INFO");
registryConfig.put("basic.auth.user.info", "username:password");
// SSL configuration for secure connections
registryConfig.put("schema.registry.ssl.truststore.location", "/path/to/truststore.jks");
registryConfig.put("schema.registry.ssl.truststore.password", "truststore-password");
registryConfig.put("schema.registry.ssl.keystore.location", "/path/to/keystore.jks");
registryConfig.put("schema.registry.ssl.keystore.password", "keystore-password");// Subject naming strategy
registryConfig.put("subject.name.strategy", "io.confluent.kafka.serializers.subject.RecordNameStrategy");
// Compatibility level
registryConfig.put("schema.registry.compatibility.level", "BACKWARD");
// Schema caching
registryConfig.put("schema.registry.cache.capacity", "1000");
registryConfig.put("schema.registry.cache.expiry.secs", "300");Adding Optional Fields:
// Original schema
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
]
}
// Evolved schema (backward compatible)
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}Usage with Evolution:
// Old producers can write to new schema
RegistryAvroSerializationSchema<UserV1> oldSerializer =
createRegistrySerializer(UserV1.class, "user-value");
// New consumers can read old messages
RegistryAvroDeserializationSchema<UserV2> newDeserializer =
createRegistryDeserializer(UserV2.class, "user-value");Removing Optional Fields:
// New producers write simplified schema
RegistryAvroSerializationSchema<UserV2> simplifiedSerializer =
createRegistrySerializer(UserV2.class, "user-value");
// Old consumers can still process messages
RegistryAvroDeserializationSchema<UserV1> oldDeserializer =
createRegistryDeserializer(UserV1.class, "user-value");// Schema evolution that supports both directions
public class SchemaEvolutionHandler {
public void handleEvolution(Schema writerSchema, Schema readerSchema) {
// Validate compatibility
SchemaCompatibility.checkReaderWriterCompatibility(readerSchema, writerSchema);
// Apply evolution rules
applyEvolutionRules(writerSchema, readerSchema);
}
}// Robust registry client with retry logic
public class ResilientRegistryClient {
private final SchemaRegistryClient client;
private final RetryPolicy retryPolicy;
public Schema getSchemaById(int id) throws IOException {
return retryPolicy.execute(() -> {
try {
return client.getSchemaById(id);
} catch (RestClientException e) {
if (e.getStatus() == 404) {
throw new SchemaNotFoundException("Schema not found: " + id);
}
throw new RegistryException("Registry error", e);
}
});
}
}// Handle schema validation failures
try {
byte[] serialized = registrySerializer.serialize(userRecord);
} catch (SerializationException e) {
if (e.getCause() instanceof RestClientException) {
RestClientException rce = (RestClientException) e.getCause();
if (rce.getStatus() == 409) {
logger.error("Schema compatibility violation", e);
// Handle compatibility error
}
}
throw new ProcessingException("Serialization failed", e);
}// Registry metrics collection
public class RegistryMetrics {
private final Counter schemaLookups = Counter.build()
.name("schema_registry_lookups_total")
.help("Total schema lookups")
.register();
private final Histogram lookupLatency = Histogram.build()
.name("schema_registry_lookup_duration_seconds")
.help("Schema lookup latency")
.register();
public Schema getSchemaWithMetrics(int id) throws IOException {
Timer.Sample sample = Timer.start();
try {
schemaLookups.inc();
return client.getSchemaById(id);
} finally {
sample.stop(lookupLatency);
}
}
}Evolving Schemas:
{
"type": "record",
"name": "User",
"namespace": "com.example.avro",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
// Use unions for optional fields
{"name": "email", "type": ["null", "string"], "default": null},
// Use logical types for better semantics
{"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
// Provide defaults for new fields
{"name": "version", "type": "int", "default": 1}
]
}Schema Caching:
// Configure appropriate cache sizes
Map<String, Object> config = new HashMap<>();
config.put("schema.registry.cache.capacity", "10000");
config.put("schema.registry.cache.expiry.secs", "3600");
// Use schema caching in high-throughput scenarios
CachedSchemaRegistryClient cachedClient = new CachedSchemaRegistryClient(
"http://schema-registry:8081",
10000, // cache capacity
config
);Connection Pooling:
// Reuse registry clients across serializers
public class RegistryClientFactory {
private static final SchemaRegistryClient SHARED_CLIENT =
new CachedSchemaRegistryClient(registryUrl, cacheCapacity);
public static SchemaRegistryClient getClient() {
return SHARED_CLIENT;
}
}Authentication:
// SASL authentication
registryConfig.put("basic.auth.credentials.source", "SASL_INHERIT");
registryConfig.put("sasl.mechanism", "PLAIN");
registryConfig.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"schema-registry-user\" password=\"password\";");SSL/TLS:
// SSL configuration
registryConfig.put("schema.registry.ssl.endpoint.identification.algorithm", "https");
registryConfig.put("schema.registry.ssl.protocol", "TLSv1.2");
registryConfig.put("schema.registry.ssl.enabled.protocols", "TLSv1.2");// Multiple registry URLs for failover
String registryUrls = "http://registry1:8081,http://registry2:8081,http://registry3:8081";
registryConfig.put("schema.registry.url", registryUrls);
// Connection timeout configuration
registryConfig.put("schema.registry.request.timeout.ms", "30000");
registryConfig.put("schema.registry.connect.timeout.ms", "10000");// Environment-based configuration
public class RegistryConfigFactory {
public static Map<String, Object> createConfig(Environment env) {
Map<String, Object> config = new HashMap<>();
switch (env) {
case DEVELOPMENT:
config.put("schema.registry.url", "http://localhost:8081");
config.put("schema.registry.compatibility.level", "NONE");
break;
case PRODUCTION:
config.put("schema.registry.url", "https://prod-registry:8081");
config.put("schema.registry.compatibility.level", "BACKWARD");
// Add authentication and SSL configuration
break;
}
return config;
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-avro