0
# Standard Avro Format
1
2
Core Avro serialization and deserialization functionality with Confluent Schema Registry integration. Supports both generic records and generated specific record classes with comprehensive configuration options.
3
4
## Capabilities
5
6
### Generic Record Deserialization
7
8
Creates deserializers for generic Avro records using reader schema and writer schema lookup from Confluent Schema Registry.
9
10
```java { .api }
11
/**
12
* Creates deserializer for GenericRecord using provided reader schema
13
* @param schema Reader schema for produced records
14
* @param url Schema Registry URL
15
* @return Deserializer instance
16
*/
17
static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(
18
Schema schema,
19
String url
20
);
21
22
/**
23
* Creates deserializer with custom cache capacity
24
* @param schema Reader schema for produced records
25
* @param url Schema Registry URL
26
* @param identityMapCapacity Maximum cached schema versions (default: 1000)
27
* @return Deserializer instance
28
*/
29
static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(
30
Schema schema,
31
String url,
32
int identityMapCapacity
33
);
34
35
/**
36
* Creates deserializer with additional registry configurations
37
* @param schema Reader schema for produced records
38
* @param url Schema Registry URL
39
* @param registryConfigs Additional Schema Registry configs (SSL, auth)
40
* @return Deserializer instance
41
*/
42
static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(
43
Schema schema,
44
String url,
45
@Nullable Map<String, ?> registryConfigs
46
);
47
48
/**
49
* Creates deserializer with full configuration options
50
* @param schema Reader schema for produced records
51
* @param url Schema Registry URL
52
* @param identityMapCapacity Maximum cached schema versions
53
* @param registryConfigs Additional Schema Registry configs
54
* @return Deserializer instance
55
*/
56
static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(
57
Schema schema,
58
String url,
59
int identityMapCapacity,
60
@Nullable Map<String, ?> registryConfigs
61
);
62
```
63
64
**Usage Example:**
65
66
```java
67
import org.apache.avro.Schema;
68
import org.apache.avro.generic.GenericRecord;
69
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
70
71
// Parse reader schema
72
String schemaString = "{\\"type\\":\\"record\\",\\"name\\":\\"User\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"},{\\"name\\":\\"name\\",\\"type\\":\\"string\\"}]}";
73
Schema readerSchema = new Schema.Parser().parse(schemaString);
74
75
// Create deserializer with SSL configuration
76
Map<String, String> registryConfigs = new HashMap<>();
77
registryConfigs.put("schema.registry.ssl.keystore.location", "/path/to/keystore.jks");
78
registryConfigs.put("schema.registry.ssl.keystore.password", "password");
79
80
ConfluentRegistryAvroDeserializationSchema<GenericRecord> deserializer =
81
ConfluentRegistryAvroDeserializationSchema.forGeneric(
82
readerSchema,
83
"https://schema-registry.example.com",
84
1000,
85
registryConfigs
86
);
87
```
88
89
### Specific Record Deserialization
90
91
Creates deserializers for Avro-generated specific record classes.
92
93
```java { .api }
94
/**
95
* Creates deserializer for specific record class
96
* @param tClass Generated Avro record class
97
* @param url Schema Registry URL
98
* @return Deserializer instance
99
*/
100
static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
101
Class<T> tClass,
102
String url
103
);
104
105
/**
106
* Creates deserializer with custom cache capacity
107
* @param tClass Generated Avro record class
108
* @param url Schema Registry URL
109
* @param identityMapCapacity Maximum cached schema versions
110
* @return Deserializer instance
111
*/
112
static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
113
Class<T> tClass,
114
String url,
115
int identityMapCapacity
116
);
117
118
/**
119
* Creates deserializer with additional registry configurations
120
* @param tClass Generated Avro record class
121
* @param url Schema Registry URL
122
* @param registryConfigs Additional Schema Registry configs
123
* @return Deserializer instance
124
*/
125
static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
126
Class<T> tClass,
127
String url,
128
@Nullable Map<String, ?> registryConfigs
129
);
130
131
/**
132
* Creates deserializer with full configuration options
133
* @param tClass Generated Avro record class
134
* @param url Schema Registry URL
135
* @param identityMapCapacity Maximum cached schema versions
136
* @param registryConfigs Additional Schema Registry configs
137
* @return Deserializer instance
138
*/
139
static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
140
Class<T> tClass,
141
String url,
142
int identityMapCapacity,
143
@Nullable Map<String, ?> registryConfigs
144
);
145
```
146
147
**Usage Example:**
148
149
```java
150
import com.example.avro.User; // Generated Avro class
151
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
152
153
// Create deserializer for specific record class
154
ConfluentRegistryAvroDeserializationSchema<User> deserializer =
155
ConfluentRegistryAvroDeserializationSchema.forSpecific(
156
User.class,
157
"https://schema-registry.example.com"
158
);
159
```
160
161
### Generic Record Serialization
162
163
Creates serializers for generic Avro records with schema registration to Confluent Schema Registry.
164
165
```java { .api }
166
/**
167
* Creates serializer for GenericRecord
168
* @param subject Schema Registry subject name
169
* @param schema Writer schema for serialization
170
* @param schemaRegistryUrl Schema Registry URL
171
* @return Serializer instance
172
*/
173
static ConfluentRegistryAvroSerializationSchema<GenericRecord> forGeneric(
174
String subject,
175
Schema schema,
176
String schemaRegistryUrl
177
);
178
179
/**
180
* Creates serializer with additional registry configurations
181
* @param subject Schema Registry subject name
182
* @param schema Writer schema for serialization
183
* @param schemaRegistryUrl Schema Registry URL
184
* @param registryConfigs Additional Schema Registry configs
185
* @return Serializer instance
186
*/
187
static ConfluentRegistryAvroSerializationSchema<GenericRecord> forGeneric(
188
String subject,
189
Schema schema,
190
String schemaRegistryUrl,
191
@Nullable Map<String, ?> registryConfigs
192
);
193
```
194
195
**Usage Example:**
196
197
```java
198
import org.apache.avro.Schema;
199
import org.apache.avro.generic.GenericRecord;
200
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
201
202
// Parse writer schema
203
String schemaString = "{\\"type\\":\\"record\\",\\"name\\":\\"User\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"},{\\"name\\":\\"name\\",\\"type\\":\\"string\\"}]}";
204
Schema writerSchema = new Schema.Parser().parse(schemaString);
205
206
// Create serializer
207
ConfluentRegistryAvroSerializationSchema<GenericRecord> serializer =
208
ConfluentRegistryAvroSerializationSchema.forGeneric(
209
"user-topic-value",
210
writerSchema,
211
"https://schema-registry.example.com"
212
);
213
```
214
215
### Specific Record Serialization
216
217
Creates serializers for Avro-generated specific record classes.
218
219
```java { .api }
220
/**
221
* Creates serializer for specific record class
222
* @param tClass Generated Avro record class
223
* @param subject Schema Registry subject name
224
* @param schemaRegistryUrl Schema Registry URL
225
* @return Serializer instance
226
*/
227
static <T extends SpecificRecord> ConfluentRegistryAvroSerializationSchema<T> forSpecific(
228
Class<T> tClass,
229
String subject,
230
String schemaRegistryUrl
231
);
232
233
/**
234
* Creates serializer with additional registry configurations
235
* @param tClass Generated Avro record class
236
* @param subject Schema Registry subject name
237
* @param schemaRegistryUrl Schema Registry URL
238
* @param registryConfigs Additional Schema Registry configs
239
* @return Serializer instance
240
*/
241
static <T extends SpecificRecord> ConfluentRegistryAvroSerializationSchema<T> forSpecific(
242
Class<T> tClass,
243
String subject,
244
String schemaRegistryUrl,
245
@Nullable Map<String, ?> registryConfigs
246
);
247
```
248
249
**Usage Example:**
250
251
```java
252
import com.example.avro.User; // Generated Avro class
253
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
254
255
// Create serializer for specific record class
256
ConfluentRegistryAvroSerializationSchema<User> serializer =
257
ConfluentRegistryAvroSerializationSchema.forSpecific(
258
User.class,
259
"user-topic-value",
260
"https://schema-registry.example.com"
261
);
262
```
263
264
### Table Format Factory
265
266
Factory for creating runtime format instances in Flink's table API.
267
268
```java { .api }
269
/**
270
* Format factory identifier for SQL DDL
271
*/
272
String IDENTIFIER = "avro-confluent";
273
274
/**
275
* Creates decoding format for table sources
276
* @param context Table factory context
277
* @param formatOptions Configuration options
278
* @return Decoding format instance
279
*/
280
DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
281
DynamicTableFactory.Context context,
282
ReadableConfig formatOptions
283
);
284
285
/**
286
* Creates encoding format for table sinks
287
* @param context Table factory context
288
* @param formatOptions Configuration options
289
* @return Encoding format instance
290
*/
291
EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
292
DynamicTableFactory.Context context,
293
ReadableConfig formatOptions
294
);
295
296
/**
297
* Returns set of required configuration options
298
* @return Required options (URL)
299
*/
300
Set<ConfigOption<?>> requiredOptions();
301
302
/**
303
* Returns set of optional configuration options
304
* @return Optional options (SUBJECT, SCHEMA, SSL, auth options)
305
*/
306
Set<ConfigOption<?>> optionalOptions();
307
```
308
309
## SQL Table Integration
310
311
The format can be used directly in Flink SQL DDL:
312
313
```sql
314
-- Source table with explicit schema
315
CREATE TABLE user_events (
316
user_id BIGINT,
317
event_name STRING,
318
event_time TIMESTAMP(3),
319
properties MAP<STRING, STRING>
320
) WITH (
321
'connector' = 'kafka',
322
'topic' = 'user-events',
323
'properties.bootstrap.servers' = 'localhost:9092',
324
'format' = 'avro-confluent',
325
'avro-confluent.url' = 'http://localhost:8081',
326
'avro-confluent.subject' = 'user-events-value',
327
'avro-confluent.schema' = '{"type":"record","name":"UserEvent","fields":[{"name":"user_id","type":"long"},{"name":"event_name","type":"string"},{"name":"event_time","type":"long"},{"name":"properties","type":{"type":"map","values":"string"}}]}'
328
);
329
330
-- Sink table requiring subject for schema registration
331
CREATE TABLE processed_events (
332
user_id BIGINT,
333
event_count BIGINT,
334
last_event_time TIMESTAMP(3)
335
) WITH (
336
'connector' = 'kafka',
337
'topic' = 'processed-events',
338
'properties.bootstrap.servers' = 'localhost:9092',
339
'format' = 'avro-confluent',
340
'avro-confluent.url' = 'http://localhost:8081',
341
'avro-confluent.subject' = 'processed-events-value'
342
);
343
```