Apache Flink SQL table format support for Avro records with Confluent Schema Registry integration
npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-avro-confluent-registry@2.1.0Apache Flink SQL table format support for Avro records with Confluent Schema Registry integration. This library enables seamless serialization and deserialization of Kafka messages with centralized schema management, providing both standard Avro format and Debezium change data capture support.
pom.xml<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-avro-confluent-registry</artifactId>
<version>2.1.0</version>
</dependency>import org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;For Debezium support:
import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroFormatFactory;CREATE TABLE kafka_source (
user_id BIGINT,
user_name STRING,
email STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://localhost:8081',
'avro-confluent.subject' = 'user-topic-value'
);import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
// Create schema
String schemaString = "{\\"type\\":\\"record\\",\\"name\\":\\"User\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"},{\\"name\\":\\"name\\",\\"type\\":\\"string\\"}]}";
Schema schema = new Schema.Parser().parse(schemaString);
// Create deserializer
ConfluentRegistryAvroDeserializationSchema<GenericRecord> deserializer =
ConfluentRegistryAvroDeserializationSchema.forGeneric(
schema,
"http://localhost:8081"
);import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
import org.apache.avro.generic.GenericRecord;
// Create serializer
ConfluentRegistryAvroSerializationSchema<GenericRecord> serializer =
ConfluentRegistryAvroSerializationSchema.forGeneric(
"user-topic-value",
schema,
"http://localhost:8081"
);The library is built around several key components:
RegistryAvroFormatFactory, DebeziumAvroFormatFactory) that integrate with Flink's table APICore Avro serialization and deserialization with Confluent Schema Registry integration. Supports both generic records and generated specific record classes.
// Format identifier for SQL DDL
String IDENTIFIER = "avro-confluent";
// Generic record deserialization
ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(
Schema schema,
String url
);
// Specific record deserialization
<T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
Class<T> tClass,
String url
);
// Generic record serialization
ConfluentRegistryAvroSerializationSchema<GenericRecord> forGeneric(
String subject,
Schema schema,
String schemaRegistryUrl
);
// Specific record serialization
<T extends SpecificRecord> ConfluentRegistryAvroSerializationSchema<T> forSpecific(
Class<T> tClass,
String subject,
String schemaRegistryUrl
);Debezium Avro format support for change data capture scenarios, handling INSERT, UPDATE, and DELETE operations with before/after record states.
// Format identifier for SQL DDL
String IDENTIFIER = "debezium-avro-confluent";
// Format factory for Debezium CDC support
DebeziumAvroFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory;Comprehensive configuration options for Schema Registry connection, SSL, authentication, and schema management.
ConfigOption<String> URL; // Required: Schema Registry URL
ConfigOption<String> SUBJECT; // Schema Registry subject name
ConfigOption<String> SCHEMA; // Optional: Explicit schema string
ConfigOption<Map<String, String>> PROPERTIES; // Additional properties// Core configuration options class
@PublicEvolving
class AvroConfluentFormatOptions {
ConfigOption<String> URL;
ConfigOption<String> SUBJECT;
ConfigOption<String> SCHEMA;
ConfigOption<String> SSL_KEYSTORE_LOCATION;
ConfigOption<String> SSL_KEYSTORE_PASSWORD;
ConfigOption<String> SSL_TRUSTSTORE_LOCATION;
ConfigOption<String> SSL_TRUSTSTORE_PASSWORD;
ConfigOption<String> BASIC_AUTH_CREDENTIALS_SOURCE;
ConfigOption<String> BASIC_AUTH_USER_INFO;
ConfigOption<String> BEARER_AUTH_CREDENTIALS_SOURCE;
ConfigOption<String> BEARER_AUTH_TOKEN;
ConfigOption<Map<String, String>> PROPERTIES;
}
// Schema coder provider with caching support
class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider {
CachedSchemaCoderProvider(String subject, String url, int identityMapCapacity, Map<String, ?> configs);
SchemaCoder get();
}
// Schema coder for Confluent wire protocol
class ConfluentSchemaRegistryCoder implements SchemaCoder {
ConfluentSchemaRegistryCoder(String subject, SchemaRegistryClient client);
ConfluentSchemaRegistryCoder(SchemaRegistryClient client);
Schema readSchema(InputStream in) throws IOException;
void writeSchema(Schema schema, OutputStream out) throws IOException;
}