CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-avro

Apache Flink Avro format support library providing serialization and deserialization capabilities for Apache Avro data format within Flink streaming and batch processing applications

Pending
Overview
Eval results
Files

schema-registry-integration.mddocs/

Schema Registry Integration

Extended serialization and deserialization schemas with Confluent Schema Registry support for centralized schema management, evolution, and compatibility checking in distributed Avro data processing.

RegistryAvroSerializationSchema

Extended serialization schema that integrates with Confluent Schema Registry for centralized schema management.

public class RegistryAvroSerializationSchema<T> extends AvroSerializationSchema<T> {
    // Inherits all methods from AvroSerializationSchema
    // Additional schema registry specific functionality
}

Key Features

  • Schema Registration: Automatically registers schemas with the registry
  • Schema Evolution: Supports forward, backward, and full compatibility
  • Schema Caching: Caches schemas locally for performance
  • Subject Management: Manages schema subjects and versions
  • Compatibility Checking: Validates schema changes against compatibility rules

Usage Examples

Basic Registry Integration:

import org.apache.flink.formats.avro.RegistryAvroSerializationSchema;

// Create registry-aware serializer
// Configuration typically done through Flink configuration or environment
RegistryAvroSerializationSchema<User> registrySerializer = 
    new RegistryAvroSerializationSchema<>(User.class, registryConfig);

// Use in streaming pipeline
DataStream<User> userStream = ...;
DataStream<byte[]> serializedStream = userStream.map(registrySerializer::serialize);

With Schema Evolution:

// Register new schema version
String subject = "user-value";
Schema newUserSchema = parseNewUserSchema();

// Serializer automatically handles schema versioning
RegistryAvroSerializationSchema<User> evolvingSerializer = 
    createRegistrySerializer(User.class, subject, newUserSchema);

// Messages include schema ID for proper deserialization
DataStream<byte[]> versionedStream = userStream.map(evolvingSerializer::serialize);

RegistryAvroDeserializationSchema

Extended deserialization schema that uses Confluent Schema Registry for schema resolution and evolution handling.

public class RegistryAvroDeserializationSchema<T> extends AvroDeserializationSchema<T> {
    // Inherits all methods from AvroDeserializationSchema  
    // Additional schema registry specific functionality
}

Key Features

  • Schema Resolution: Automatically resolves schemas by ID from registry
  • Multi-Version Support: Handles messages with different schema versions
  • Reader Schema Evolution: Supports schema evolution patterns
  • Lazy Loading: Loads schemas on-demand for better performance
  • Error Recovery: Graceful handling of registry connectivity issues

Usage Examples

Registry-based Deserialization:

import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;

// Create registry-aware deserializer
RegistryAvroDeserializationSchema<User> registryDeserializer = 
    new RegistryAvroDeserializationSchema<>(User.class, registryConfig);

// Use in Kafka source
FlinkKafkaConsumer<User> consumer = new FlinkKafkaConsumer<>(
    "user-topic",
    registryDeserializer,
    kafkaProperties
);

DataStream<User> userStream = env.addSource(consumer);

Multi-Version Message Processing:

// Deserializer handles multiple schema versions automatically
RegistryAvroDeserializationSchema<User> multiVersionDeserializer = 
    createRegistryDeserializer(User.class);

// Messages with different schema versions are properly deserialized
DataStream<User> unifiedStream = rawMessageStream
    .map(bytes -> {
        try {
            return multiVersionDeserializer.deserialize(bytes);
        } catch (IOException e) {
            // Handle deserialization errors
            return null;
        }
    })
    .filter(Objects::nonNull);

Schema Registry Configuration

Connection Configuration

// Schema Registry client configuration
Map<String, Object> registryConfig = new HashMap<>();
registryConfig.put("schema.registry.url", "http://schema-registry:8081");
registryConfig.put("basic.auth.credentials.source", "USER_INFO");
registryConfig.put("basic.auth.user.info", "username:password");

// SSL configuration for secure connections
registryConfig.put("schema.registry.ssl.truststore.location", "/path/to/truststore.jks");
registryConfig.put("schema.registry.ssl.truststore.password", "truststore-password");
registryConfig.put("schema.registry.ssl.keystore.location", "/path/to/keystore.jks");
registryConfig.put("schema.registry.ssl.keystore.password", "keystore-password");

Subject and Compatibility Configuration

// Subject naming strategy
registryConfig.put("subject.name.strategy", "io.confluent.kafka.serializers.subject.RecordNameStrategy");

// Compatibility level
registryConfig.put("schema.registry.compatibility.level", "BACKWARD");

// Schema caching
registryConfig.put("schema.registry.cache.capacity", "1000");
registryConfig.put("schema.registry.cache.expiry.secs", "300");

Schema Evolution Patterns

Backward Compatibility

Adding Optional Fields:

// Original schema
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"}
  ]
}

// Evolved schema (backward compatible)
{
  "type": "record", 
  "name": "User",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": ["null", "string"], "default": null}
  ]
}

Usage with Evolution:

// Old producers can write to new schema
RegistryAvroSerializationSchema<UserV1> oldSerializer = 
    createRegistrySerializer(UserV1.class, "user-value");

// New consumers can read old messages
RegistryAvroDeserializationSchema<UserV2> newDeserializer = 
    createRegistryDeserializer(UserV2.class, "user-value");

Forward Compatibility

Removing Optional Fields:

// New producers write simplified schema
RegistryAvroSerializationSchema<UserV2> simplifiedSerializer = 
    createRegistrySerializer(UserV2.class, "user-value");

// Old consumers can still process messages
RegistryAvroDeserializationSchema<UserV1> oldDeserializer = 
    createRegistryDeserializer(UserV1.class, "user-value");

Full Compatibility

// Schema evolution that supports both directions
public class SchemaEvolutionHandler {
    public void handleEvolution(Schema writerSchema, Schema readerSchema) {
        // Validate compatibility
        SchemaCompatibility.checkReaderWriterCompatibility(readerSchema, writerSchema);
        
        // Apply evolution rules
        applyEvolutionRules(writerSchema, readerSchema);
    }
}

Error Handling and Monitoring

Registry Connectivity

// Robust registry client with retry logic
public class ResilientRegistryClient {
    private final SchemaRegistryClient client;
    private final RetryPolicy retryPolicy;
    
    public Schema getSchemaById(int id) throws IOException {
        return retryPolicy.execute(() -> {
            try {
                return client.getSchemaById(id);
            } catch (RestClientException e) {
                if (e.getStatus() == 404) {
                    throw new SchemaNotFoundException("Schema not found: " + id);
                }
                throw new RegistryException("Registry error", e);
            }
        });
    }
}

Schema Validation Errors

// Handle schema validation failures
try {
    byte[] serialized = registrySerializer.serialize(userRecord);
} catch (SerializationException e) {
    if (e.getCause() instanceof RestClientException) {
        RestClientException rce = (RestClientException) e.getCause();
        if (rce.getStatus() == 409) {
            logger.error("Schema compatibility violation", e);
            // Handle compatibility error
        }
    }
    throw new ProcessingException("Serialization failed", e);
}

Monitoring and Metrics

// Registry metrics collection
public class RegistryMetrics {
    private final Counter schemaLookups = Counter.build()
        .name("schema_registry_lookups_total")
        .help("Total schema lookups")
        .register();
        
    private final Histogram lookupLatency = Histogram.build()
        .name("schema_registry_lookup_duration_seconds")
        .help("Schema lookup latency")
        .register();
        
    public Schema getSchemaWithMetrics(int id) throws IOException {
        Timer.Sample sample = Timer.start();
        try {
            schemaLookups.inc();
            return client.getSchemaById(id);
        } finally {
            sample.stop(lookupLatency);
        }
    }
}

Best Practices

Schema Design

Evolving Schemas:

{
  "type": "record",
  "name": "User",
  "namespace": "com.example.avro",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    // Use unions for optional fields
    {"name": "email", "type": ["null", "string"], "default": null},
    // Use logical types for better semantics
    {"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
    // Provide defaults for new fields
    {"name": "version", "type": "int", "default": 1}
  ]
}

Performance Optimization

Schema Caching:

// Configure appropriate cache sizes
Map<String, Object> config = new HashMap<>();
config.put("schema.registry.cache.capacity", "10000");
config.put("schema.registry.cache.expiry.secs", "3600");

// Use schema caching in high-throughput scenarios
CachedSchemaRegistryClient cachedClient = new CachedSchemaRegistryClient(
    "http://schema-registry:8081", 
    10000,  // cache capacity
    config
);

Connection Pooling:

// Reuse registry clients across serializers
public class RegistryClientFactory {
    private static final SchemaRegistryClient SHARED_CLIENT = 
        new CachedSchemaRegistryClient(registryUrl, cacheCapacity);
        
    public static SchemaRegistryClient getClient() {
        return SHARED_CLIENT;
    }
}

Security Configuration

Authentication:

// SASL authentication
registryConfig.put("basic.auth.credentials.source", "SASL_INHERIT");
registryConfig.put("sasl.mechanism", "PLAIN");
registryConfig.put("sasl.jaas.config", 
    "org.apache.kafka.common.security.plain.PlainLoginModule required " +
    "username=\"schema-registry-user\" password=\"password\";");

SSL/TLS:

// SSL configuration
registryConfig.put("schema.registry.ssl.endpoint.identification.algorithm", "https");
registryConfig.put("schema.registry.ssl.protocol", "TLSv1.2");
registryConfig.put("schema.registry.ssl.enabled.protocols", "TLSv1.2");

Deployment Considerations

High Availability

// Multiple registry URLs for failover
String registryUrls = "http://registry1:8081,http://registry2:8081,http://registry3:8081";
registryConfig.put("schema.registry.url", registryUrls);

// Connection timeout configuration
registryConfig.put("schema.registry.request.timeout.ms", "30000");
registryConfig.put("schema.registry.connect.timeout.ms", "10000");

Environment-specific Configuration

// Environment-based configuration
public class RegistryConfigFactory {
    public static Map<String, Object> createConfig(Environment env) {
        Map<String, Object> config = new HashMap<>();
        
        switch (env) {
            case DEVELOPMENT:
                config.put("schema.registry.url", "http://localhost:8081");
                config.put("schema.registry.compatibility.level", "NONE");
                break;
            case PRODUCTION:
                config.put("schema.registry.url", "https://prod-registry:8081");
                config.put("schema.registry.compatibility.level", "BACKWARD");
                // Add authentication and SSL configuration
                break;
        }
        
        return config;
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-avro

docs

bulk-writers.md

file-io-operations.md

index.md

schema-registry-integration.md

serialization-deserialization.md

table-api-integration.md

type-system-integration.md

tile.json