or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro-format.mdconfiguration.mddebezium-format.mdindex.md

debezium-format.mddocs/

0

# Debezium Change Data Capture Format

1

2

Debezium Avro format support for change data capture scenarios with Confluent Schema Registry integration. Handles INSERT, UPDATE, and DELETE operations with before/after record states for real-time data synchronization.

3

4

## Capabilities

5

6

### Debezium Format Factory

7

8

Factory for creating Debezium Avro format instances with full change data capture support.

9

10

```java { .api }

11

/**

12

* Format identifier for SQL DDL

13

*/

14

String IDENTIFIER = "debezium-avro-confluent";

15

16

/**

17

* Creates decoding format for change data capture sources

18

* @param context Table factory context

19

* @param formatOptions Configuration options

20

* @return Decoding format supporting CDC operations

21

*/

22

DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(

23

DynamicTableFactory.Context context,

24

ReadableConfig formatOptions

25

);

26

27

/**

28

* Creates encoding format for change data capture sinks

29

* @param context Table factory context

30

* @param formatOptions Configuration options

31

* @return Encoding format supporting CDC operations

32

*/

33

EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(

34

DynamicTableFactory.Context context,

35

ReadableConfig formatOptions

36

);

37

38

/**

39

* Returns changelog mode supporting all CDC operations

40

* @return ChangelogMode with INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE

41

*/

42

ChangelogMode getChangelogMode();

43

```

44

45

### Debezium Deserialization Schema

46

47

Deserializes Debezium change event records from Avro format, extracting row kind and data changes.

48

49

```java { .api }

50

/**

51

* Deserialization schema for Debezium Avro change events

52

* Handles before/after states and operation types

53

*/

54

@Internal

55

class DebeziumAvroDeserializationSchema implements DeserializationSchema<RowData> {

56

57

/**

58

* Constructor for Debezium deserializer

59

* @param rowType Expected row type for output

60

* @param producedTypeInfo Type information for output

61

* @param schemaRegistryURL Schema Registry URL

62

* @param schema Optional explicit schema string

63

* @param registryConfigs Additional registry configurations

64

*/

65

DebeziumAvroDeserializationSchema(

66

RowType rowType,

67

TypeInformation<RowData> producedTypeInfo,

68

String schemaRegistryURL,

69

@Nullable String schema,

70

@Nullable Map<String, ?> registryConfigs

71

);

72

}

73

```

74

75

### Debezium Serialization Schema

76

77

Serializes Flink RowData to Debezium Avro format with proper change event structure.

78

79

```java { .api }

80

/**

81

* Serialization schema for Debezium Avro change events

82

* Creates proper before/after/op structure

83

*/

84

@Internal

85

class DebeziumAvroSerializationSchema implements SerializationSchema<RowData> {

86

87

/**

88

* Constructor for Debezium serializer

89

* @param rowType Input row type

90

* @param schemaRegistryURL Schema Registry URL

91

* @param subject Schema Registry subject

92

* @param schema Optional explicit schema string

93

* @param registryConfigs Additional registry configurations

94

*/

95

DebeziumAvroSerializationSchema(

96

RowType rowType,

97

String schemaRegistryURL,

98

String subject,

99

@Nullable String schema,

100

@Nullable Map<String, ?> registryConfigs

101

);

102

}

103

```

104

105

## SQL Table Integration

106

107

### Source Table for Change Data Capture

108

109

```sql

110

CREATE TABLE debezium_source (

111

id BIGINT,

112

name STRING,

113

email STRING,

114

updated_at TIMESTAMP(3)

115

) WITH (

116

'connector' = 'kafka',

117

'topic' = 'mysql.inventory.users',

118

'properties.bootstrap.servers' = 'localhost:9092',

119

'format' = 'debezium-avro-confluent',

120

'debezium-avro-confluent.url' = 'http://localhost:8081'

121

);

122

```

123

124

### Sink Table for Change Data Capture

125

126

```sql

127

CREATE TABLE debezium_sink (

128

id BIGINT,

129

name STRING,

130

email STRING,

131

updated_at TIMESTAMP(3)

132

) WITH (

133

'connector' = 'kafka',

134

'topic' = 'processed.users',

135

'properties.bootstrap.servers' = 'localhost:9092',

136

'format' = 'debezium-avro-confluent',

137

'debezium-avro-confluent.url' = 'http://localhost:8081',

138

'debezium-avro-confluent.subject' = 'processed.users-value'

139

);

140

```

141

142

## Debezium Schema Structure

143

144

The Debezium format expects or produces Avro records with the following structure:

145

146

```json

147

{

148

"type": "record",

149

"name": "Envelope",

150

"fields": [

151

{

152

"name": "before",

153

"type": ["null", {

154

"type": "record",

155

"name": "User",

156

"fields": [

157

{"name": "id", "type": "long"},

158

{"name": "name", "type": ["null", "string"]},

159

{"name": "email", "type": ["null", "string"]}

160

]

161

}],

162

"default": null

163

},

164

{

165

"name": "after",

166

"type": ["null", "User"],

167

"default": null

168

},

169

{

170

"name": "op",

171

"type": "string"

172

}

173

]

174

}

175

```

176

177

## Change Event Operations

178

179

The format handles the following Debezium operation types and their Flink RowKind mappings:

180

181

- **"c" (CREATE/INSERT)**: `after` contains new record, `before` is null → Produces `RowKind.INSERT`

182

- **"u" (UPDATE)**: Both `before` and `after` contain record states → Produces two records: `RowKind.UPDATE_BEFORE` (from `before`) and `RowKind.UPDATE_AFTER` (from `after`)

183

- **"d" (DELETE)**: `before` contains deleted record, `after` is null → Produces `RowKind.DELETE` (from `before`)

184

- **"r" (READ)**: Initial snapshot record, `after` contains data → Produces `RowKind.INSERT`

185

186

**Important:** For UPDATE and DELETE operations, the `before` field must not be null. If it is null, the deserializer throws an IllegalStateException with a message about PostgreSQL REPLICA IDENTITY settings.

187

188

### Serialization RowKind Mappings

189

190

When serializing from Flink RowData to Debezium format, the following RowKind mappings apply:

191

192

- **`RowKind.INSERT`**`before` = null, `after` = rowData, `op` = "c"

193

- **`RowKind.UPDATE_AFTER`**`before` = null, `after` = rowData, `op` = "c"

194

- **`RowKind.UPDATE_BEFORE`**`before` = rowData, `after` = null, `op` = "d"

195

- **`RowKind.DELETE`**`before` = rowData, `after` = null, `op` = "d"

196

197

## Usage Example

198

199

```java

200

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

201

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

202

203

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

204

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

205

206

// Create source table for Debezium CDC

207

tableEnv.executeSql(

208

"CREATE TABLE user_changes (" +

209

" id BIGINT," +

210

" name STRING," +

211

" email STRING," +

212

" updated_at TIMESTAMP(3)" +

213

") WITH (" +

214

" 'connector' = 'kafka'," +

215

" 'topic' = 'mysql.inventory.users'," +

216

" 'properties.bootstrap.servers' = 'localhost:9092'," +

217

" 'format' = 'debezium-avro-confluent'," +

218

" 'debezium-avro-confluent.url' = 'http://localhost:8081'" +

219

")"

220

);

221

222

// Process change events

223

tableEnv.executeSql(

224

"INSERT INTO processed_users " +

225

"SELECT id, UPPER(name) as name, email, updated_at " +

226

"FROM user_changes " +

227

"WHERE name IS NOT NULL"

228

);

229

```

230

231

## Configuration Notes

232

233

- The **`subject`** parameter is required for serialization to register schemas

234

- Schema Registry authentication and SSL options are inherited from `AvroConfluentFormatOptions`

235

- The format automatically handles schema evolution through the Schema Registry

236

- All Debezium metadata fields (source, ts_ms, etc.) are available if included in the schema