0
# Schema-Encoded Message Support
1
2
## Capabilities
3
4
### Schema-Encoded Deserialization
5
6
Support for deserializing Avro data where schema information is embedded within the message using a configurable schema coder.
7
8
```java { .api }
9
/**
10
* Deserialization schema that reads schema from input stream using SchemaCoder
11
* Extends AvroDeserializationSchema with schema-encoded message support
12
* @param <T> Type of record it produces
13
*/
14
public class RegistryAvroDeserializationSchema<T> extends AvroDeserializationSchema<T> {
15
16
/**
17
* Creates schema-encoded deserialization schema
18
* @param recordClazz Class to deserialize (SpecificRecord or GenericRecord)
19
* @param reader Reader's Avro schema (required for GenericRecord)
20
* @param schemaCoderProvider Provider for SchemaCoder that reads schema from stream
21
*/
22
public RegistryAvroDeserializationSchema(
23
Class<T> recordClazz,
24
Schema reader,
25
SchemaCoder.SchemaCoderProvider schemaCoderProvider);
26
27
/**
28
* Creates schema-encoded deserialization schema with custom encoding
29
* @param recordClazz Class to deserialize (SpecificRecord or GenericRecord)
30
* @param reader Reader's Avro schema (required for GenericRecord)
31
* @param schemaCoderProvider Provider for SchemaCoder that reads schema from stream
32
* @param encoding Avro encoding type (BINARY or JSON)
33
*/
34
public RegistryAvroDeserializationSchema(
35
Class<T> recordClazz,
36
Schema reader,
37
SchemaCoder.SchemaCoderProvider schemaCoderProvider,
38
AvroEncoding encoding);
39
40
/**
41
* Deserializes message with embedded schema
42
* @param message Serialized message bytes with embedded schema
43
* @return Deserialized object
44
* @throws IOException If deserialization or schema reading fails
45
*/
46
public T deserialize(byte[] message) throws IOException;
47
48
/**
49
* Checks if element signals end of stream
50
* @param nextElement The element to check
51
* @return Always false for Avro records
52
*/
53
public boolean isEndOfStream(T nextElement);
54
55
/**
56
* Gets the type information for produced type
57
* @return TypeInformation for the produced type
58
*/
59
public TypeInformation<T> getProducedType();
60
}
61
```
62
63
### Schema-Encoded Serialization
64
65
Support for serializing Avro data with embedded schema information.
66
67
```java { .api }
68
/**
69
* Serialization schema that embeds schema information in messages using SchemaCoder
70
* Extends AvroSerializationSchema with schema-encoded message support
71
* @param <T> Type of record it consumes
72
*/
73
public class RegistryAvroSerializationSchema<T> extends AvroSerializationSchema<T> {
74
75
/**
76
* Creates schema-encoded serialization schema
77
* @param subject Subject name for schema identification
78
* @param recordClazz Class to serialize (SpecificRecord or GenericRecord)
79
* @param writer Writer's Avro schema (required for GenericRecord)
80
* @param schemaCoderProvider Provider for SchemaCoder that writes schema to stream
81
*/
82
public RegistryAvroSerializationSchema(
83
String subject,
84
Class<T> recordClazz,
85
Schema writer,
86
SchemaCoder.SchemaCoderProvider schemaCoderProvider);
87
88
/**
89
* Creates schema-encoded serialization schema with custom encoding
90
* @param subject Subject name for schema identification
91
* @param recordClazz Class to serialize (SpecificRecord or GenericRecord)
92
* @param writer Writer's Avro schema (required for GenericRecord)
93
* @param schemaCoderProvider Provider for SchemaCoder that writes schema to stream
94
* @param encoding Avro encoding type (BINARY or JSON)
95
*/
96
public RegistryAvroSerializationSchema(
97
String subject,
98
Class<T> recordClazz,
99
Schema writer,
100
SchemaCoder.SchemaCoderProvider schemaCoderProvider,
101
AvroEncoding encoding);
102
103
/**
104
* Serializes object with embedded schema
105
* Message includes schema information followed by Avro data
106
* @param object Object to serialize
107
* @return Serialized byte array with embedded schema
108
*/
109
public byte[] serialize(T object);
110
}
111
```
112
113
### Schema Coder Interface
114
115
Interface for reading and writing schema information in message streams.
116
117
```java { .api }
118
/**
119
* Schema coder that allows reading schema embedded in serialized records
120
* Used by RegistryAvroDeserializationSchema and RegistryAvroSerializationSchema
121
*/
122
public interface SchemaCoder {
123
124
/**
125
* Reads schema from input stream
126
* @param in Input stream containing schema information
127
* @return Parsed Avro schema
128
* @throws IOException If schema reading fails
129
*/
130
Schema readSchema(InputStream in) throws IOException;
131
132
/**
133
* Writes schema to output stream
134
* @param schema Avro schema to write
135
* @param out Output stream to write schema to
136
* @throws IOException If schema writing fails
137
*/
138
void writeSchema(Schema schema, OutputStream out) throws IOException;
139
140
/**
141
* Provider interface for creating SchemaCoder instances
142
* Allows creating multiple instances in parallel operators without serialization issues
143
*/
144
interface SchemaCoderProvider extends Serializable {
145
146
/**
147
* Creates a new instance of SchemaCoder
148
* Each call should create a new instance for use in different nodes
149
* @return New SchemaCoder instance
150
*/
151
SchemaCoder get();
152
}
153
}
154
```
155
156
## Usage Examples
157
158
### Basic Schema Registry Setup
159
160
```java
161
// Configuration for schema registry
162
Map<String, String> registryConfig = new HashMap<>();
163
registryConfig.put("schema.registry.url", "http://localhost:8081");
164
registryConfig.put("schema.registry.subject.strategy", "topic-value");
165
166
// For authentication (if required)
167
registryConfig.put("schema.registry.username", "registry-user");
168
registryConfig.put("schema.registry.password", "registry-password");
169
170
// Create registry-aware deserializer
171
RegistryAvroDeserializationSchema<GenericRecord> deserializer =
172
RegistryAvroDeserializationSchema.<GenericRecord>builder()
173
.setRegistryConfig(registryConfig)
174
.setSubject("user-events-value")
175
.build();
176
177
// Create registry-aware serializer
178
RegistryAvroSerializationSchema<GenericRecord> serializer =
179
RegistryAvroSerializationSchema.<GenericRecord>builder()
180
.setRegistryConfig(registryConfig)
181
.setSubject("processed-events-value")
182
.setSchema(outputSchema)
183
.build();
184
```
185
186
### Kafka Integration with Schema Registry
187
188
```java
189
// Kafka properties with schema registry configuration
190
Properties kafkaProps = new Properties();
191
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
192
kafkaProps.setProperty("group.id", "avro-consumer-group");
193
194
// Schema registry properties
195
Properties schemaRegistryProps = new Properties();
196
schemaRegistryProps.setProperty("schema.registry.url", "http://localhost:8081");
197
198
// Consumer with registry-aware deserialization
199
KafkaSource<GenericRecord> kafkaSource = KafkaSource.<GenericRecord>builder()
200
.setBootstrapServers("localhost:9092")
201
.setTopics("user-events")
202
.setGroupId("avro-consumer-group")
203
.setValueDeserializer(KafkaRecordDeserializationSchema.valueOnly(
204
RegistryAvroDeserializationSchema.<GenericRecord>builder()
205
.setRegistryConfig(schemaRegistryProps)
206
.setSubject("user-events-value")
207
.build()
208
))
209
.build();
210
211
DataStream<GenericRecord> stream = env.fromSource(
212
kafkaSource,
213
WatermarkStrategy.noWatermarks(),
214
"Kafka Avro Source"
215
);
216
217
// Producer with registry-aware serialization
218
KafkaSink<GenericRecord> kafkaSink = KafkaSink.<GenericRecord>builder()
219
.setBootstrapServers("localhost:9092")
220
.setRecordSerializer(KafkaRecordSerializationSchema.<GenericRecord>builder()
221
.setTopic("processed-events")
222
.setValueSerializer(
223
RegistryAvroSerializationSchema.<GenericRecord>builder()
224
.setRegistryConfig(schemaRegistryProps)
225
.setSubject("processed-events-value")
226
.build()
227
)
228
.build())
229
.build();
230
231
processedStream.sinkTo(kafkaSink);
232
```
233
234
### Schema Evolution Handling
235
236
```java
237
// Consumer that handles schema evolution automatically
238
RegistryAvroDeserializationSchema<GenericRecord> evolutionDeserializer =
239
RegistryAvroDeserializationSchema.<GenericRecord>builder()
240
.setRegistryConfig(registryConfig)
241
.setSubject("user-events-value")
242
.setReaderSchema(readerSchema) // Optional: specify expected schema
243
.setSchemaEvolutionEnabled(true)
244
.build();
245
246
DataStream<GenericRecord> stream = env
247
.addSource(new FlinkKafkaConsumer<>("user-events", evolutionDeserializer, kafkaProps));
248
249
// Handle records that may have different schemas
250
DataStream<ProcessedEvent> processed = stream
251
.map(new MapFunction<GenericRecord, ProcessedEvent>() {
252
@Override
253
public ProcessedEvent map(GenericRecord record) throws Exception {
254
ProcessedEvent event = new ProcessedEvent();
255
256
// Required fields (present in all schema versions)
257
event.setUserId((Long) record.get("user_id"));
258
event.setEventType(record.get("event_type").toString());
259
260
// Optional fields (may not exist in older schema versions)
261
Object sessionId = record.get("session_id");
262
if (sessionId != null) {
263
event.setSessionId(sessionId.toString());
264
}
265
266
// New fields (may not exist in older records)
267
Object deviceInfo = record.get("device_info");
268
if (deviceInfo != null) {
269
event.setDeviceInfo(deviceInfo.toString());
270
}
271
272
return event;
273
}
274
});
275
```
276
277
### Multi-Subject Registry Usage
278
279
```java
280
// Different schemas for different types of events
281
Map<String, RegistryAvroDeserializationSchema<GenericRecord>> deserializers = new HashMap<>();
282
283
// User events deserializer
284
deserializers.put("user-events",
285
RegistryAvroDeserializationSchema.<GenericRecord>builder()
286
.setRegistryConfig(registryConfig)
287
.setSubject("user-events-value")
288
.build());
289
290
// System events deserializer
291
deserializers.put("system-events",
292
RegistryAvroDeserializationSchema.<GenericRecord>builder()
293
.setRegistryConfig(registryConfig)
294
.setSubject("system-events-value")
295
.build());
296
297
// Transaction events deserializer
298
deserializers.put("transaction-events",
299
RegistryAvroDeserializationSchema.<GenericRecord>builder()
300
.setRegistryConfig(registryConfig)
301
.setSubject("transaction-events-value")
302
.build());
303
304
// Process different event types
305
DataStream<String> allEvents = env
306
.addSource(new FlinkKafkaConsumer<>(
307
Arrays.asList("user-events", "system-events", "transaction-events"),
308
new KafkaDeserializationSchema<String>() {
309
@Override
310
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
311
String topic = record.topic();
312
RegistryAvroDeserializationSchema<GenericRecord> deserializer =
313
deserializers.get(topic);
314
315
if (deserializer != null) {
316
GenericRecord avroRecord = deserializer.deserialize(record.value());
317
return processEventByType(topic, avroRecord);
318
}
319
return null;
320
}
321
322
@Override
323
public boolean isEndOfStream(String nextElement) {
324
return false;
325
}
326
327
@Override
328
public TypeInformation<String> getProducedType() {
329
return BasicTypeInfo.STRING_TYPE_INFO;
330
}
331
},
332
kafkaProps));
333
```
334
335
### Schema Validation and Error Handling
336
337
```java
338
// Registry deserializer with validation and error handling
339
RegistryAvroDeserializationSchema<GenericRecord> validatingDeserializer =
340
RegistryAvroDeserializationSchema.<GenericRecord>builder()
341
.setRegistryConfig(registryConfig)
342
.setSubject("user-events-value")
343
.setValidationEnabled(true)
344
.setErrorMode(ErrorMode.IGNORE) // or FAIL, LOG_AND_CONTINUE
345
.build();
346
347
DataStream<GenericRecord> validatedStream = stream
348
.map(new MapFunction<byte[], GenericRecord>() {
349
@Override
350
public GenericRecord map(byte[] value) throws Exception {
351
try {
352
return validatingDeserializer.deserialize(value);
353
} catch (SchemaNotFoundException e) {
354
// Handle unknown schema ID
355
logger.warn("Unknown schema ID in message: " + e.getSchemaId());
356
return null;
357
} catch (IncompatibleSchemaException e) {
358
// Handle schema compatibility issues
359
logger.error("Schema compatibility error: " + e.getMessage());
360
return null;
361
} catch (IOException e) {
362
// Handle general deserialization errors
363
logger.error("Deserialization error: " + e.getMessage());
364
return null;
365
}
366
}
367
})
368
.filter(Objects::nonNull); // Remove failed deserializations
369
```
370
371
### Table API with Schema Registry
372
373
```java
374
// Create table with schema registry integration
375
String createTableSQL = """
376
CREATE TABLE user_events (
377
user_id BIGINT,
378
event_type STRING,
379
event_data ROW<
380
action STRING,
381
target STRING
382
>,
383
event_time TIMESTAMP(3)
384
) WITH (
385
'connector' = 'kafka',
386
'topic' = 'user-events',
387
'properties.bootstrap.servers' = 'localhost:9092',
388
'format' = 'avro',
389
'avro.schema-registry.url' = 'http://localhost:8081',
390
'avro.schema-registry.subject' = 'user-events-value'
391
)
392
""";
393
394
tableEnv.executeSql(createTableSQL);
395
396
// Output table with schema registry
397
String createOutputTableSQL = """
398
CREATE TABLE processed_events (
399
user_id BIGINT,
400
event_count BIGINT,
401
last_event_time TIMESTAMP(3)
402
) WITH (
403
'connector' = 'kafka',
404
'topic' = 'processed-events',
405
'properties.bootstrap.servers' = 'localhost:9092',
406
'format' = 'avro',
407
'avro.schema-registry.url' = 'http://localhost:8081',
408
'avro.schema-registry.subject' = 'processed-events-value'
409
)
410
""";
411
412
tableEnv.executeSql(createOutputTableSQL);
413
414
// Process with SQL
415
String processingSQL = """
416
INSERT INTO processed_events
417
SELECT
418
user_id,
419
COUNT(*) as event_count,
420
MAX(event_time) as last_event_time
421
FROM user_events
422
WHERE event_time >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR
423
GROUP BY user_id
424
""";
425
426
tableEnv.executeSql(processingSQL);
427
```
428
429
### Schema Registry Administration
430
431
```java
432
// Utility methods for schema registry management
433
public class SchemaRegistryUtils {
434
435
/**
436
* Register a new schema version
437
*/
438
public static int registerSchema(String registryUrl, String subject, Schema schema)
439
throws IOException {
440
// Implementation for registering schemas
441
SchemaRegistryClient client = new CachedSchemaRegistryClient(registryUrl, 100);
442
return client.register(subject, schema);
443
}
444
445
/**
446
* Get latest schema for subject
447
*/
448
public static Schema getLatestSchema(String registryUrl, String subject)
449
throws IOException {
450
SchemaRegistryClient client = new CachedSchemaRegistryClient(registryUrl, 100);
451
return client.getLatestSchemaMetadata(subject).getSchema();
452
}
453
454
/**
455
* Check schema compatibility
456
*/
457
public static boolean isCompatible(String registryUrl, String subject, Schema newSchema)
458
throws IOException {
459
SchemaRegistryClient client = new CachedSchemaRegistryClient(registryUrl, 100);
460
return client.testCompatibility(subject, newSchema);
461
}
462
}
463
464
// Example usage
465
Schema newSchema = // ... your new schema
466
boolean compatible = SchemaRegistryUtils.isCompatible(
467
"http://localhost:8081",
468
"user-events-value",
469
newSchema
470
);
471
472
if (compatible) {
473
int schemaId = SchemaRegistryUtils.registerSchema(
474
"http://localhost:8081",
475
"user-events-value",
476
newSchema
477
);
478
System.out.println("Registered new schema with ID: " + schemaId);
479
}
480
```