Apache Flink SQL table format support for Avro records with Confluent Schema Registry integration
—
Core Avro serialization and deserialization functionality with Confluent Schema Registry integration. Supports both generic records and generated specific record classes with comprehensive configuration options.
Creates deserializers for generic Avro records using reader schema and writer schema lookup from Confluent Schema Registry.
/**
* Creates deserializer for GenericRecord using provided reader schema
* @param schema Reader schema for produced records
* @param url Schema Registry URL
* @return Deserializer instance
*/
static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(
Schema schema,
String url
);
/**
* Creates deserializer with custom cache capacity
* @param schema Reader schema for produced records
* @param url Schema Registry URL
* @param identityMapCapacity Maximum cached schema versions (default: 1000)
* @return Deserializer instance
*/
static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(
Schema schema,
String url,
int identityMapCapacity
);
/**
* Creates deserializer with additional registry configurations
* @param schema Reader schema for produced records
* @param url Schema Registry URL
* @param registryConfigs Additional Schema Registry configs (SSL, auth)
* @return Deserializer instance
*/
static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(
Schema schema,
String url,
@Nullable Map<String, ?> registryConfigs
);
/**
* Creates deserializer with full configuration options
* @param schema Reader schema for produced records
* @param url Schema Registry URL
* @param identityMapCapacity Maximum cached schema versions
* @param registryConfigs Additional Schema Registry configs
* @return Deserializer instance
*/
static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(
Schema schema,
String url,
int identityMapCapacity,
@Nullable Map<String, ?> registryConfigs
);Usage Example:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
// Parse reader schema
String schemaString = "{\\"type\\":\\"record\\",\\"name\\":\\"User\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"},{\\"name\\":\\"name\\",\\"type\\":\\"string\\"}]}";
Schema readerSchema = new Schema.Parser().parse(schemaString);
// Create deserializer with SSL configuration
Map<String, String> registryConfigs = new HashMap<>();
registryConfigs.put("schema.registry.ssl.keystore.location", "/path/to/keystore.jks");
registryConfigs.put("schema.registry.ssl.keystore.password", "password");
ConfluentRegistryAvroDeserializationSchema<GenericRecord> deserializer =
ConfluentRegistryAvroDeserializationSchema.forGeneric(
readerSchema,
"https://schema-registry.example.com",
1000,
registryConfigs
);Creates deserializers for Avro-generated specific record classes.
/**
* Creates deserializer for specific record class
* @param tClass Generated Avro record class
* @param url Schema Registry URL
* @return Deserializer instance
*/
static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
Class<T> tClass,
String url
);
/**
* Creates deserializer with custom cache capacity
* @param tClass Generated Avro record class
* @param url Schema Registry URL
* @param identityMapCapacity Maximum cached schema versions
* @return Deserializer instance
*/
static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
Class<T> tClass,
String url,
int identityMapCapacity
);
/**
* Creates deserializer with additional registry configurations
* @param tClass Generated Avro record class
* @param url Schema Registry URL
* @param registryConfigs Additional Schema Registry configs
* @return Deserializer instance
*/
static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
Class<T> tClass,
String url,
@Nullable Map<String, ?> registryConfigs
);
/**
* Creates deserializer with full configuration options
* @param tClass Generated Avro record class
* @param url Schema Registry URL
* @param identityMapCapacity Maximum cached schema versions
* @param registryConfigs Additional Schema Registry configs
* @return Deserializer instance
*/
static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
Class<T> tClass,
String url,
int identityMapCapacity,
@Nullable Map<String, ?> registryConfigs
);Usage Example:
import com.example.avro.User; // Generated Avro class
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
// Create deserializer for specific record class
ConfluentRegistryAvroDeserializationSchema<User> deserializer =
ConfluentRegistryAvroDeserializationSchema.forSpecific(
User.class,
"https://schema-registry.example.com"
);Creates serializers for generic Avro records with schema registration to Confluent Schema Registry.
/**
* Creates serializer for GenericRecord
* @param subject Schema Registry subject name
* @param schema Writer schema for serialization
* @param schemaRegistryUrl Schema Registry URL
* @return Serializer instance
*/
static ConfluentRegistryAvroSerializationSchema<GenericRecord> forGeneric(
String subject,
Schema schema,
String schemaRegistryUrl
);
/**
* Creates serializer with additional registry configurations
* @param subject Schema Registry subject name
* @param schema Writer schema for serialization
* @param schemaRegistryUrl Schema Registry URL
* @param registryConfigs Additional Schema Registry configs
* @return Serializer instance
*/
static ConfluentRegistryAvroSerializationSchema<GenericRecord> forGeneric(
String subject,
Schema schema,
String schemaRegistryUrl,
@Nullable Map<String, ?> registryConfigs
);Usage Example:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
// Parse writer schema
String schemaString = "{\\"type\\":\\"record\\",\\"name\\":\\"User\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"},{\\"name\\":\\"name\\",\\"type\\":\\"string\\"}]}";
Schema writerSchema = new Schema.Parser().parse(schemaString);
// Create serializer
ConfluentRegistryAvroSerializationSchema<GenericRecord> serializer =
ConfluentRegistryAvroSerializationSchema.forGeneric(
"user-topic-value",
writerSchema,
"https://schema-registry.example.com"
);Creates serializers for Avro-generated specific record classes.
/**
* Creates serializer for specific record class
* @param tClass Generated Avro record class
* @param subject Schema Registry subject name
* @param schemaRegistryUrl Schema Registry URL
* @return Serializer instance
*/
static <T extends SpecificRecord> ConfluentRegistryAvroSerializationSchema<T> forSpecific(
Class<T> tClass,
String subject,
String schemaRegistryUrl
);
/**
* Creates serializer with additional registry configurations
* @param tClass Generated Avro record class
* @param subject Schema Registry subject name
* @param schemaRegistryUrl Schema Registry URL
* @param registryConfigs Additional Schema Registry configs
* @return Serializer instance
*/
static <T extends SpecificRecord> ConfluentRegistryAvroSerializationSchema<T> forSpecific(
Class<T> tClass,
String subject,
String schemaRegistryUrl,
@Nullable Map<String, ?> registryConfigs
);Usage Example:
import com.example.avro.User; // Generated Avro class
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
// Create serializer for specific record class
ConfluentRegistryAvroSerializationSchema<User> serializer =
ConfluentRegistryAvroSerializationSchema.forSpecific(
User.class,
"user-topic-value",
"https://schema-registry.example.com"
);Factory for creating runtime format instances in Flink's table API.
/**
* Format factory identifier for SQL DDL
*/
String IDENTIFIER = "avro-confluent";
/**
* Creates decoding format for table sources
* @param context Table factory context
* @param formatOptions Configuration options
* @return Decoding format instance
*/
DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions
);
/**
* Creates encoding format for table sinks
* @param context Table factory context
* @param formatOptions Configuration options
* @return Encoding format instance
*/
EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions
);
/**
* Returns set of required configuration options
* @return Required options (URL)
*/
Set<ConfigOption<?>> requiredOptions();
/**
* Returns set of optional configuration options
* @return Optional options (SUBJECT, SCHEMA, SSL, auth options)
*/
Set<ConfigOption<?>> optionalOptions();The format can be used directly in Flink SQL DDL:
-- Source table with explicit schema
CREATE TABLE user_events (
user_id BIGINT,
event_name STRING,
event_time TIMESTAMP(3),
properties MAP<STRING, STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://localhost:8081',
'avro-confluent.subject' = 'user-events-value',
'avro-confluent.schema' = '{"type":"record","name":"UserEvent","fields":[{"name":"user_id","type":"long"},{"name":"event_name","type":"string"},{"name":"event_time","type":"long"},{"name":"properties","type":{"type":"map","values":"string"}}]}'
);
-- Sink table requiring subject for schema registration
CREATE TABLE processed_events (
user_id BIGINT,
event_count BIGINT,
last_event_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'processed-events',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://localhost:8081',
'avro-confluent.subject' = 'processed-events-value'
);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-avro-confluent-registry