or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

avro-format.mdconfiguration.mddebezium-format.mdindex.md
tile.json

tessl/maven-org-apache-flink--flink-sql-avro-confluent-registry

Apache Flink SQL table format support for Avro records with Confluent Schema Registry integration

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-sql-avro-confluent-registry@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-avro-confluent-registry@2.1.0

index.mddocs/

Apache Flink SQL Avro Confluent Registry

Apache Flink SQL table format support for Avro records with Confluent Schema Registry integration. This library enables seamless serialization and deserialization of Kafka messages with centralized schema management, providing both standard Avro format and Debezium change data capture support.

Package Information

  • Package Name: org.apache.flink:flink-sql-avro-confluent-registry
  • Package Type: Maven JAR
  • Language: Java
  • Version: 2.1.0
  • Installation: Add Maven dependency in pom.xml
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-sql-avro-confluent-registry</artifactId>
  <version>2.1.0</version>
</dependency>

Core Imports

import org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;

For Debezium support:

import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroFormatFactory;

Basic Usage

SQL Table Definition with Confluent Schema Registry

CREATE TABLE kafka_source (
  user_id BIGINT,
  user_name STRING,
  email STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'kafka',
  'topic' = 'user-topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'avro-confluent',
  'avro-confluent.url' = 'http://localhost:8081',
  'avro-confluent.subject' = 'user-topic-value'
);

Programmatic Deserialization

import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

// Create schema
String schemaString = "{\\"type\\":\\"record\\",\\"name\\":\\"User\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"},{\\"name\\":\\"name\\",\\"type\\":\\"string\\"}]}";
Schema schema = new Schema.Parser().parse(schemaString);

// Create deserializer
ConfluentRegistryAvroDeserializationSchema<GenericRecord> deserializer = 
    ConfluentRegistryAvroDeserializationSchema.forGeneric(
        schema, 
        "http://localhost:8081"
    );

Programmatic Serialization

import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
import org.apache.avro.generic.GenericRecord;

// Create serializer
ConfluentRegistryAvroSerializationSchema<GenericRecord> serializer = 
    ConfluentRegistryAvroSerializationSchema.forGeneric(
        "user-topic-value",
        schema,
        "http://localhost:8081"
    );

Architecture

The library is built around several key components:

  • Format Factories: SPI-based factories (RegistryAvroFormatFactory, DebeziumAvroFormatFactory) that integrate with Flink's table API
  • Serialization Schemas: Type-safe serialization and deserialization classes supporting both generic and specific Avro records
  • Schema Registry Integration: Confluent Schema Registry client integration with authentication and SSL support
  • Shaded Dependencies: All external dependencies (Kafka client, Confluent client, Avro) are shaded to prevent conflicts

Capabilities

Standard Avro Format

Core Avro serialization and deserialization with Confluent Schema Registry integration. Supports both generic records and generated specific record classes.

// Format identifier for SQL DDL
String IDENTIFIER = "avro-confluent";

// Generic record deserialization
ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(
    Schema schema, 
    String url
);

// Specific record deserialization  
<T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
    Class<T> tClass, 
    String url
);

// Generic record serialization
ConfluentRegistryAvroSerializationSchema<GenericRecord> forGeneric(
    String subject, 
    Schema schema, 
    String schemaRegistryUrl
);

// Specific record serialization
<T extends SpecificRecord> ConfluentRegistryAvroSerializationSchema<T> forSpecific(
    Class<T> tClass, 
    String subject, 
    String schemaRegistryUrl
);

Standard Avro Format

Debezium Change Data Capture

Debezium Avro format support for change data capture scenarios, handling INSERT, UPDATE, and DELETE operations with before/after record states.

// Format identifier for SQL DDL
String IDENTIFIER = "debezium-avro-confluent";

// Format factory for Debezium CDC support
DebeziumAvroFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory;

Debezium Format

Configuration Options

Comprehensive configuration options for Schema Registry connection, SSL, authentication, and schema management.

ConfigOption<String> URL; // Required: Schema Registry URL
ConfigOption<String> SUBJECT; // Schema Registry subject name  
ConfigOption<String> SCHEMA; // Optional: Explicit schema string
ConfigOption<Map<String, String>> PROPERTIES; // Additional properties

Configuration

Types

// Core configuration options class
@PublicEvolving
class AvroConfluentFormatOptions {
    ConfigOption<String> URL;
    ConfigOption<String> SUBJECT;
    ConfigOption<String> SCHEMA;
    ConfigOption<String> SSL_KEYSTORE_LOCATION;
    ConfigOption<String> SSL_KEYSTORE_PASSWORD;
    ConfigOption<String> SSL_TRUSTSTORE_LOCATION;
    ConfigOption<String> SSL_TRUSTSTORE_PASSWORD;
    ConfigOption<String> BASIC_AUTH_CREDENTIALS_SOURCE;
    ConfigOption<String> BASIC_AUTH_USER_INFO;
    ConfigOption<String> BEARER_AUTH_CREDENTIALS_SOURCE;
    ConfigOption<String> BEARER_AUTH_TOKEN;
    ConfigOption<Map<String, String>> PROPERTIES;
}

// Schema coder provider with caching support
class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider {
    CachedSchemaCoderProvider(String subject, String url, int identityMapCapacity, Map<String, ?> configs);
    SchemaCoder get();
}

// Schema coder for Confluent wire protocol
class ConfluentSchemaRegistryCoder implements SchemaCoder {
    ConfluentSchemaRegistryCoder(String subject, SchemaRegistryClient client);
    ConfluentSchemaRegistryCoder(SchemaRegistryClient client);
    Schema readSchema(InputStream in) throws IOException;
    void writeSchema(Schema schema, OutputStream out) throws IOException;
}