Apache Flink SQL table format support for Avro records with Confluent Schema Registry integration
—
Debezium Avro format support for change data capture scenarios with Confluent Schema Registry integration. Handles INSERT, UPDATE, and DELETE operations with before/after record states for real-time data synchronization.
Factory for creating Debezium Avro format instances with full change data capture support.
/**
* Format identifier for SQL DDL
*/
String IDENTIFIER = "debezium-avro-confluent";
/**
* Creates decoding format for change data capture sources
* @param context Table factory context
* @param formatOptions Configuration options
* @return Decoding format supporting CDC operations
*/
DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions
);
/**
* Creates encoding format for change data capture sinks
* @param context Table factory context
* @param formatOptions Configuration options
* @return Encoding format supporting CDC operations
*/
EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions
);
/**
* Returns changelog mode supporting all CDC operations
* @return ChangelogMode with INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE
*/
ChangelogMode getChangelogMode();Deserializes Debezium change event records from Avro format, extracting row kind and data changes.
/**
* Deserialization schema for Debezium Avro change events
* Handles before/after states and operation types
*/
@Internal
class DebeziumAvroDeserializationSchema implements DeserializationSchema<RowData> {
/**
* Constructor for Debezium deserializer
* @param rowType Expected row type for output
* @param producedTypeInfo Type information for output
* @param schemaRegistryURL Schema Registry URL
* @param schema Optional explicit schema string
* @param registryConfigs Additional registry configurations
*/
DebeziumAvroDeserializationSchema(
RowType rowType,
TypeInformation<RowData> producedTypeInfo,
String schemaRegistryURL,
@Nullable String schema,
@Nullable Map<String, ?> registryConfigs
);
}Serializes Flink RowData to Debezium Avro format with proper change event structure.
/**
* Serialization schema for Debezium Avro change events
* Creates proper before/after/op structure
*/
@Internal
class DebeziumAvroSerializationSchema implements SerializationSchema<RowData> {
/**
* Constructor for Debezium serializer
* @param rowType Input row type
* @param schemaRegistryURL Schema Registry URL
* @param subject Schema Registry subject
* @param schema Optional explicit schema string
* @param registryConfigs Additional registry configurations
*/
DebeziumAvroSerializationSchema(
RowType rowType,
String schemaRegistryURL,
String subject,
@Nullable String schema,
@Nullable Map<String, ?> registryConfigs
);
}CREATE TABLE debezium_source (
id BIGINT,
name STRING,
email STRING,
updated_at TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'mysql.inventory.users',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.url' = 'http://localhost:8081'
);CREATE TABLE debezium_sink (
id BIGINT,
name STRING,
email STRING,
updated_at TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'processed.users',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.url' = 'http://localhost:8081',
'debezium-avro-confluent.subject' = 'processed.users-value'
);The Debezium format expects or produces Avro records with the following structure:
{
"type": "record",
"name": "Envelope",
"fields": [
{
"name": "before",
"type": ["null", {
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": ["null", "string"]},
{"name": "email", "type": ["null", "string"]}
]
}],
"default": null
},
{
"name": "after",
"type": ["null", "User"],
"default": null
},
{
"name": "op",
"type": "string"
}
]
}The format handles the following Debezium operation types and their Flink RowKind mappings:
after contains new record, before is null → Produces RowKind.INSERTbefore and after contain record states → Produces two records: RowKind.UPDATE_BEFORE (from before) and RowKind.UPDATE_AFTER (from after)before contains deleted record, after is null → Produces RowKind.DELETE (from before)after contains data → Produces RowKind.INSERTImportant: For UPDATE and DELETE operations, the before field must not be null. If it is null, the deserializer throws an IllegalStateException with a message about PostgreSQL REPLICA IDENTITY settings.
When serializing from Flink RowData to Debezium format, the following RowKind mappings apply:
RowKind.INSERT → before = null, after = rowData, op = "c"RowKind.UPDATE_AFTER → before = null, after = rowData, op = "c"RowKind.UPDATE_BEFORE → before = rowData, after = null, op = "d"RowKind.DELETE → before = rowData, after = null, op = "d"import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Create source table for Debezium CDC
tableEnv.executeSql(
"CREATE TABLE user_changes (" +
" id BIGINT," +
" name STRING," +
" email STRING," +
" updated_at TIMESTAMP(3)" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'mysql.inventory.users'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'debezium-avro-confluent'," +
" 'debezium-avro-confluent.url' = 'http://localhost:8081'" +
")"
);
// Process change events
tableEnv.executeSql(
"INSERT INTO processed_users " +
"SELECT id, UPPER(name) as name, email, updated_at " +
"FROM user_changes " +
"WHERE name IS NOT NULL"
);subject parameter is required for serialization to register schemasAvroConfluentFormatOptionsInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-avro-confluent-registry