CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-avro-confluent-registry

Apache Flink SQL table format support for Avro records with Confluent Schema Registry integration

Pending
Overview
Eval results
Files

debezium-format.mddocs/

Debezium Change Data Capture Format

Debezium Avro format support for change data capture scenarios with Confluent Schema Registry integration. Handles INSERT, UPDATE, and DELETE operations with before/after record states for real-time data synchronization.

Capabilities

Debezium Format Factory

Factory for creating Debezium Avro format instances with full change data capture support.

/**
 * Format identifier for SQL DDL
 */
String IDENTIFIER = "debezium-avro-confluent";

/**
 * Creates decoding format for change data capture sources
 * @param context Table factory context
 * @param formatOptions Configuration options
 * @return Decoding format supporting CDC operations
 */
DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
    DynamicTableFactory.Context context, 
    ReadableConfig formatOptions
);

/**
 * Creates encoding format for change data capture sinks
 * @param context Table factory context  
 * @param formatOptions Configuration options
 * @return Encoding format supporting CDC operations
 */
EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
    DynamicTableFactory.Context context, 
    ReadableConfig formatOptions
);

/**
 * Returns changelog mode supporting all CDC operations
 * @return ChangelogMode with INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE
 */
ChangelogMode getChangelogMode();

Debezium Deserialization Schema

Deserializes Debezium change event records from Avro format, extracting row kind and data changes.

/**
 * Deserialization schema for Debezium Avro change events
 * Handles before/after states and operation types
 */
@Internal
class DebeziumAvroDeserializationSchema implements DeserializationSchema<RowData> {
    
    /**
     * Constructor for Debezium deserializer
     * @param rowType Expected row type for output
     * @param producedTypeInfo Type information for output
     * @param schemaRegistryURL Schema Registry URL
     * @param schema Optional explicit schema string
     * @param registryConfigs Additional registry configurations
     */
    DebeziumAvroDeserializationSchema(
        RowType rowType,
        TypeInformation<RowData> producedTypeInfo,
        String schemaRegistryURL,
        @Nullable String schema,
        @Nullable Map<String, ?> registryConfigs
    );
}

Debezium Serialization Schema

Serializes Flink RowData to Debezium Avro format with proper change event structure.

/**
 * Serialization schema for Debezium Avro change events
 * Creates proper before/after/op structure
 */
@Internal  
class DebeziumAvroSerializationSchema implements SerializationSchema<RowData> {
    
    /**
     * Constructor for Debezium serializer
     * @param rowType Input row type
     * @param schemaRegistryURL Schema Registry URL
     * @param subject Schema Registry subject
     * @param schema Optional explicit schema string
     * @param registryConfigs Additional registry configurations
     */
    DebeziumAvroSerializationSchema(
        RowType rowType,
        String schemaRegistryURL,
        String subject,
        @Nullable String schema,
        @Nullable Map<String, ?> registryConfigs
    );
}

SQL Table Integration

Source Table for Change Data Capture

CREATE TABLE debezium_source (
  id BIGINT,
  name STRING,
  email STRING,
  updated_at TIMESTAMP(3)
) WITH (
  'connector' = 'kafka',
  'topic' = 'mysql.inventory.users',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'debezium-avro-confluent',
  'debezium-avro-confluent.url' = 'http://localhost:8081'
);

Sink Table for Change Data Capture

CREATE TABLE debezium_sink (
  id BIGINT,
  name STRING,
  email STRING,
  updated_at TIMESTAMP(3)
) WITH (
  'connector' = 'kafka',
  'topic' = 'processed.users',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'debezium-avro-confluent',
  'debezium-avro-confluent.url' = 'http://localhost:8081',
  'debezium-avro-confluent.subject' = 'processed.users-value'
);

Debezium Schema Structure

The Debezium format expects or produces Avro records with the following structure:

{
  "type": "record",
  "name": "Envelope",
  "fields": [
    {
      "name": "before",
      "type": ["null", {
        "type": "record",
        "name": "User",
        "fields": [
          {"name": "id", "type": "long"},
          {"name": "name", "type": ["null", "string"]},
          {"name": "email", "type": ["null", "string"]}
        ]
      }],
      "default": null
    },
    {
      "name": "after", 
      "type": ["null", "User"],
      "default": null
    },
    {
      "name": "op",
      "type": "string"
    }
  ]
}

Change Event Operations

The format handles the following Debezium operation types and their Flink RowKind mappings:

  • "c" (CREATE/INSERT): after contains new record, before is null → Produces RowKind.INSERT
  • "u" (UPDATE): Both before and after contain record states → Produces two records: RowKind.UPDATE_BEFORE (from before) and RowKind.UPDATE_AFTER (from after)
  • "d" (DELETE): before contains deleted record, after is null → Produces RowKind.DELETE (from before)
  • "r" (READ): Initial snapshot record, after contains data → Produces RowKind.INSERT

Important: For UPDATE and DELETE operations, the before field must not be null. If it is null, the deserializer throws an IllegalStateException with a message about PostgreSQL REPLICA IDENTITY settings.

Serialization RowKind Mappings

When serializing from Flink RowData to Debezium format, the following RowKind mappings apply:

  • RowKind.INSERTbefore = null, after = rowData, op = "c"
  • RowKind.UPDATE_AFTERbefore = null, after = rowData, op = "c"
  • RowKind.UPDATE_BEFOREbefore = rowData, after = null, op = "d"
  • RowKind.DELETEbefore = rowData, after = null, op = "d"

Usage Example

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Create source table for Debezium CDC
tableEnv.executeSql(
    "CREATE TABLE user_changes (" +
    "  id BIGINT," +
    "  name STRING," + 
    "  email STRING," +
    "  updated_at TIMESTAMP(3)" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'mysql.inventory.users'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'format' = 'debezium-avro-confluent'," +
    "  'debezium-avro-confluent.url' = 'http://localhost:8081'" +
    ")"
);

// Process change events
tableEnv.executeSql(
    "INSERT INTO processed_users " +
    "SELECT id, UPPER(name) as name, email, updated_at " +
    "FROM user_changes " +
    "WHERE name IS NOT NULL"
);

Configuration Notes

  • The subject parameter is required for serialization to register schemas
  • Schema Registry authentication and SSL options are inherited from AvroConfluentFormatOptions
  • The format automatically handles schema evolution through the Schema Registry
  • All Debezium metadata fields (source, ts_ms, etc.) are available if included in the schema

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-avro-confluent-registry

docs

avro-format.md

configuration.md

debezium-format.md

index.md

tile.json