Apache Flink SQL table format support for Avro records with Confluent Schema Registry integration
npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-avro-confluent-registry@2.1.00
# Apache Flink SQL Avro Confluent Registry
1
2
Apache Flink SQL table format support for Avro records with Confluent Schema Registry integration. This library enables seamless serialization and deserialization of Kafka messages with centralized schema management, providing both standard Avro format and Debezium change data capture support.
3
4
## Package Information
5
6
- **Package Name**: org.apache.flink:flink-sql-avro-confluent-registry
7
- **Package Type**: Maven JAR
8
- **Language**: Java
9
- **Version**: 2.1.0
10
- **Installation**: Add Maven dependency in `pom.xml`
11
12
```xml
13
<dependency>
14
<groupId>org.apache.flink</groupId>
15
<artifactId>flink-sql-avro-confluent-registry</artifactId>
16
<version>2.1.0</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```java
23
import org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions;
24
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
25
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
26
```
27
28
For Debezium support:
29
30
```java
31
import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroFormatFactory;
32
```
33
34
## Basic Usage
35
36
### SQL Table Definition with Confluent Schema Registry
37
38
```sql
39
CREATE TABLE kafka_source (
40
user_id BIGINT,
41
user_name STRING,
42
email STRING,
43
ts TIMESTAMP(3)
44
) WITH (
45
'connector' = 'kafka',
46
'topic' = 'user-topic',
47
'properties.bootstrap.servers' = 'localhost:9092',
48
'format' = 'avro-confluent',
49
'avro-confluent.url' = 'http://localhost:8081',
50
'avro-confluent.subject' = 'user-topic-value'
51
);
52
```
53
54
### Programmatic Deserialization
55
56
```java
57
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
58
import org.apache.avro.Schema;
59
import org.apache.avro.generic.GenericRecord;
60
61
// Create schema
62
String schemaString = "{\\"type\\":\\"record\\",\\"name\\":\\"User\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"},{\\"name\\":\\"name\\",\\"type\\":\\"string\\"}]}";
63
Schema schema = new Schema.Parser().parse(schemaString);
64
65
// Create deserializer
66
ConfluentRegistryAvroDeserializationSchema<GenericRecord> deserializer =
67
ConfluentRegistryAvroDeserializationSchema.forGeneric(
68
schema,
69
"http://localhost:8081"
70
);
71
```
72
73
### Programmatic Serialization
74
75
```java
76
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
77
import org.apache.avro.generic.GenericRecord;
78
79
// Create serializer
80
ConfluentRegistryAvroSerializationSchema<GenericRecord> serializer =
81
ConfluentRegistryAvroSerializationSchema.forGeneric(
82
"user-topic-value",
83
schema,
84
"http://localhost:8081"
85
);
86
```
87
88
## Architecture
89
90
The library is built around several key components:
91
92
- **Format Factories**: SPI-based factories (`RegistryAvroFormatFactory`, `DebeziumAvroFormatFactory`) that integrate with Flink's table API
93
- **Serialization Schemas**: Type-safe serialization and deserialization classes supporting both generic and specific Avro records
94
- **Schema Registry Integration**: Confluent Schema Registry client integration with authentication and SSL support
95
- **Shaded Dependencies**: All external dependencies (Kafka client, Confluent client, Avro) are shaded to prevent conflicts
96
97
## Capabilities
98
99
### Standard Avro Format
100
101
Core Avro serialization and deserialization with Confluent Schema Registry integration. Supports both generic records and generated specific record classes.
102
103
```java { .api }
104
// Format identifier for SQL DDL
105
String IDENTIFIER = "avro-confluent";
106
107
// Generic record deserialization
108
ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(
109
Schema schema,
110
String url
111
);
112
113
// Specific record deserialization
114
<T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
115
Class<T> tClass,
116
String url
117
);
118
119
// Generic record serialization
120
ConfluentRegistryAvroSerializationSchema<GenericRecord> forGeneric(
121
String subject,
122
Schema schema,
123
String schemaRegistryUrl
124
);
125
126
// Specific record serialization
127
<T extends SpecificRecord> ConfluentRegistryAvroSerializationSchema<T> forSpecific(
128
Class<T> tClass,
129
String subject,
130
String schemaRegistryUrl
131
);
132
```
133
134
[Standard Avro Format](./avro-format.md)
135
136
### Debezium Change Data Capture
137
138
Debezium Avro format support for change data capture scenarios, handling INSERT, UPDATE, and DELETE operations with before/after record states.
139
140
```java { .api }
141
// Format identifier for SQL DDL
142
String IDENTIFIER = "debezium-avro-confluent";
143
144
// Format factory for Debezium CDC support
145
DebeziumAvroFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory;
146
```
147
148
[Debezium Format](./debezium-format.md)
149
150
### Configuration Options
151
152
Comprehensive configuration options for Schema Registry connection, SSL, authentication, and schema management.
153
154
```java { .api }
155
ConfigOption<String> URL; // Required: Schema Registry URL
156
ConfigOption<String> SUBJECT; // Schema Registry subject name
157
ConfigOption<String> SCHEMA; // Optional: Explicit schema string
158
ConfigOption<Map<String, String>> PROPERTIES; // Additional properties
159
```
160
161
[Configuration](./configuration.md)
162
163
## Types
164
165
```java { .api }
166
// Core configuration options class
167
@PublicEvolving
168
class AvroConfluentFormatOptions {
169
ConfigOption<String> URL;
170
ConfigOption<String> SUBJECT;
171
ConfigOption<String> SCHEMA;
172
ConfigOption<String> SSL_KEYSTORE_LOCATION;
173
ConfigOption<String> SSL_KEYSTORE_PASSWORD;
174
ConfigOption<String> SSL_TRUSTSTORE_LOCATION;
175
ConfigOption<String> SSL_TRUSTSTORE_PASSWORD;
176
ConfigOption<String> BASIC_AUTH_CREDENTIALS_SOURCE;
177
ConfigOption<String> BASIC_AUTH_USER_INFO;
178
ConfigOption<String> BEARER_AUTH_CREDENTIALS_SOURCE;
179
ConfigOption<String> BEARER_AUTH_TOKEN;
180
ConfigOption<Map<String, String>> PROPERTIES;
181
}
182
183
// Schema coder provider with caching support
184
class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider {
185
CachedSchemaCoderProvider(String subject, String url, int identityMapCapacity, Map<String, ?> configs);
186
SchemaCoder get();
187
}
188
189
// Schema coder for Confluent wire protocol
190
class ConfluentSchemaRegistryCoder implements SchemaCoder {
191
ConfluentSchemaRegistryCoder(String subject, SchemaRegistryClient client);
192
ConfluentSchemaRegistryCoder(SchemaRegistryClient client);
193
Schema readSchema(InputStream in) throws IOException;
194
void writeSchema(Schema schema, OutputStream out) throws IOException;
195
}
196
```