0
# Schema Registry Integration
1
2
Extended serialization and deserialization schemas with Confluent Schema Registry support for centralized schema management, evolution, and compatibility checking in distributed Avro data processing.
3
4
## RegistryAvroSerializationSchema
5
6
Extended serialization schema that integrates with Confluent Schema Registry for centralized schema management.
7
8
```java { .api }
9
public class RegistryAvroSerializationSchema<T> extends AvroSerializationSchema<T> {
10
// Inherits all methods from AvroSerializationSchema
11
// Additional schema registry specific functionality
12
}
13
```
14
15
### Key Features
16
17
- **Schema Registration**: Automatically registers schemas with the registry
18
- **Schema Evolution**: Supports forward, backward, and full compatibility
19
- **Schema Caching**: Caches schemas locally for performance
20
- **Subject Management**: Manages schema subjects and versions
21
- **Compatibility Checking**: Validates schema changes against compatibility rules
22
23
### Usage Examples
24
25
**Basic Registry Integration:**
26
27
```java
28
import org.apache.flink.formats.avro.RegistryAvroSerializationSchema;
29
30
// Create registry-aware serializer
31
// Configuration typically done through Flink configuration or environment
32
RegistryAvroSerializationSchema<User> registrySerializer =
33
new RegistryAvroSerializationSchema<>(User.class, registryConfig);
34
35
// Use in streaming pipeline
36
DataStream<User> userStream = ...;
37
DataStream<byte[]> serializedStream = userStream.map(registrySerializer::serialize);
38
```
39
40
**With Schema Evolution:**
41
42
```java
43
// Register new schema version
44
String subject = "user-value";
45
Schema newUserSchema = parseNewUserSchema();
46
47
// Serializer automatically handles schema versioning
48
RegistryAvroSerializationSchema<User> evolvingSerializer =
49
createRegistrySerializer(User.class, subject, newUserSchema);
50
51
// Messages include schema ID for proper deserialization
52
DataStream<byte[]> versionedStream = userStream.map(evolvingSerializer::serialize);
53
```
54
55
## RegistryAvroDeserializationSchema
56
57
Extended deserialization schema that uses Confluent Schema Registry for schema resolution and evolution handling.
58
59
```java { .api }
60
public class RegistryAvroDeserializationSchema<T> extends AvroDeserializationSchema<T> {
61
// Inherits all methods from AvroDeserializationSchema
62
// Additional schema registry specific functionality
63
}
64
```
65
66
### Key Features
67
68
- **Schema Resolution**: Automatically resolves schemas by ID from registry
69
- **Multi-Version Support**: Handles messages with different schema versions
70
- **Reader Schema Evolution**: Supports schema evolution patterns
71
- **Lazy Loading**: Loads schemas on-demand for better performance
72
- **Error Recovery**: Graceful handling of registry connectivity issues
73
74
### Usage Examples
75
76
**Registry-based Deserialization:**
77
78
```java
79
import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
80
81
// Create registry-aware deserializer
82
RegistryAvroDeserializationSchema<User> registryDeserializer =
83
new RegistryAvroDeserializationSchema<>(User.class, registryConfig);
84
85
// Use in Kafka source
86
FlinkKafkaConsumer<User> consumer = new FlinkKafkaConsumer<>(
87
"user-topic",
88
registryDeserializer,
89
kafkaProperties
90
);
91
92
DataStream<User> userStream = env.addSource(consumer);
93
```
94
95
**Multi-Version Message Processing:**
96
97
```java
98
// Deserializer handles multiple schema versions automatically
99
RegistryAvroDeserializationSchema<User> multiVersionDeserializer =
100
createRegistryDeserializer(User.class);
101
102
// Messages with different schema versions are properly deserialized
103
DataStream<User> unifiedStream = rawMessageStream
104
.map(bytes -> {
105
try {
106
return multiVersionDeserializer.deserialize(bytes);
107
} catch (IOException e) {
108
// Handle deserialization errors
109
return null;
110
}
111
})
112
.filter(Objects::nonNull);
113
```
114
115
## Schema Registry Configuration
116
117
### Connection Configuration
118
119
```java
120
// Schema Registry client configuration
121
Map<String, Object> registryConfig = new HashMap<>();
122
registryConfig.put("schema.registry.url", "http://schema-registry:8081");
123
registryConfig.put("basic.auth.credentials.source", "USER_INFO");
124
registryConfig.put("basic.auth.user.info", "username:password");
125
126
// SSL configuration for secure connections
127
registryConfig.put("schema.registry.ssl.truststore.location", "/path/to/truststore.jks");
128
registryConfig.put("schema.registry.ssl.truststore.password", "truststore-password");
129
registryConfig.put("schema.registry.ssl.keystore.location", "/path/to/keystore.jks");
130
registryConfig.put("schema.registry.ssl.keystore.password", "keystore-password");
131
```
132
133
### Subject and Compatibility Configuration
134
135
```java
136
// Subject naming strategy
137
registryConfig.put("subject.name.strategy", "io.confluent.kafka.serializers.subject.RecordNameStrategy");
138
139
// Compatibility level
140
registryConfig.put("schema.registry.compatibility.level", "BACKWARD");
141
142
// Schema caching
143
registryConfig.put("schema.registry.cache.capacity", "1000");
144
registryConfig.put("schema.registry.cache.expiry.secs", "300");
145
```
146
147
## Schema Evolution Patterns
148
149
### Backward Compatibility
150
151
**Adding Optional Fields:**
152
153
```avro
154
// Original schema
155
{
156
"type": "record",
157
"name": "User",
158
"fields": [
159
{"name": "id", "type": "long"},
160
{"name": "name", "type": "string"}
161
]
162
}
163
164
// Evolved schema (backward compatible)
165
{
166
"type": "record",
167
"name": "User",
168
"fields": [
169
{"name": "id", "type": "long"},
170
{"name": "name", "type": "string"},
171
{"name": "email", "type": ["null", "string"], "default": null}
172
]
173
}
174
```
175
176
**Usage with Evolution:**
177
178
```java
179
// Old producers can write to new schema
180
RegistryAvroSerializationSchema<UserV1> oldSerializer =
181
createRegistrySerializer(UserV1.class, "user-value");
182
183
// New consumers can read old messages
184
RegistryAvroDeserializationSchema<UserV2> newDeserializer =
185
createRegistryDeserializer(UserV2.class, "user-value");
186
```
187
188
### Forward Compatibility
189
190
**Removing Optional Fields:**
191
192
```java
193
// New producers write simplified schema
194
RegistryAvroSerializationSchema<UserV2> simplifiedSerializer =
195
createRegistrySerializer(UserV2.class, "user-value");
196
197
// Old consumers can still process messages
198
RegistryAvroDeserializationSchema<UserV1> oldDeserializer =
199
createRegistryDeserializer(UserV1.class, "user-value");
200
```
201
202
### Full Compatibility
203
204
```java
205
// Schema evolution that supports both directions
206
public class SchemaEvolutionHandler {
207
public void handleEvolution(Schema writerSchema, Schema readerSchema) {
208
// Validate compatibility
209
SchemaCompatibility.checkReaderWriterCompatibility(readerSchema, writerSchema);
210
211
// Apply evolution rules
212
applyEvolutionRules(writerSchema, readerSchema);
213
}
214
}
215
```
216
217
## Error Handling and Monitoring
218
219
### Registry Connectivity
220
221
```java
222
// Robust registry client with retry logic
223
public class ResilientRegistryClient {
224
private final SchemaRegistryClient client;
225
private final RetryPolicy retryPolicy;
226
227
public Schema getSchemaById(int id) throws IOException {
228
return retryPolicy.execute(() -> {
229
try {
230
return client.getSchemaById(id);
231
} catch (RestClientException e) {
232
if (e.getStatus() == 404) {
233
throw new SchemaNotFoundException("Schema not found: " + id);
234
}
235
throw new RegistryException("Registry error", e);
236
}
237
});
238
}
239
}
240
```
241
242
### Schema Validation Errors
243
244
```java
245
// Handle schema validation failures
246
try {
247
byte[] serialized = registrySerializer.serialize(userRecord);
248
} catch (SerializationException e) {
249
if (e.getCause() instanceof RestClientException) {
250
RestClientException rce = (RestClientException) e.getCause();
251
if (rce.getStatus() == 409) {
252
logger.error("Schema compatibility violation", e);
253
// Handle compatibility error
254
}
255
}
256
throw new ProcessingException("Serialization failed", e);
257
}
258
```
259
260
### Monitoring and Metrics
261
262
```java
263
// Registry metrics collection
264
public class RegistryMetrics {
265
private final Counter schemaLookups = Counter.build()
266
.name("schema_registry_lookups_total")
267
.help("Total schema lookups")
268
.register();
269
270
private final Histogram lookupLatency = Histogram.build()
271
.name("schema_registry_lookup_duration_seconds")
272
.help("Schema lookup latency")
273
.register();
274
275
public Schema getSchemaWithMetrics(int id) throws IOException {
276
Timer.Sample sample = Timer.start();
277
try {
278
schemaLookups.inc();
279
return client.getSchemaById(id);
280
} finally {
281
sample.stop(lookupLatency);
282
}
283
}
284
}
285
```
286
287
## Best Practices
288
289
### Schema Design
290
291
**Evolving Schemas:**
292
293
```avro
294
{
295
"type": "record",
296
"name": "User",
297
"namespace": "com.example.avro",
298
"fields": [
299
{"name": "id", "type": "long"},
300
{"name": "name", "type": "string"},
301
// Use unions for optional fields
302
{"name": "email", "type": ["null", "string"], "default": null},
303
// Use logical types for better semantics
304
{"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
305
// Provide defaults for new fields
306
{"name": "version", "type": "int", "default": 1}
307
]
308
}
309
```
310
311
### Performance Optimization
312
313
**Schema Caching:**
314
315
```java
316
// Configure appropriate cache sizes
317
Map<String, Object> config = new HashMap<>();
318
config.put("schema.registry.cache.capacity", "10000");
319
config.put("schema.registry.cache.expiry.secs", "3600");
320
321
// Use schema caching in high-throughput scenarios
322
CachedSchemaRegistryClient cachedClient = new CachedSchemaRegistryClient(
323
"http://schema-registry:8081",
324
10000, // cache capacity
325
config
326
);
327
```
328
329
**Connection Pooling:**
330
331
```java
332
// Reuse registry clients across serializers
333
public class RegistryClientFactory {
334
private static final SchemaRegistryClient SHARED_CLIENT =
335
new CachedSchemaRegistryClient(registryUrl, cacheCapacity);
336
337
public static SchemaRegistryClient getClient() {
338
return SHARED_CLIENT;
339
}
340
}
341
```
342
343
### Security Configuration
344
345
**Authentication:**
346
347
```java
348
// SASL authentication
349
registryConfig.put("basic.auth.credentials.source", "SASL_INHERIT");
350
registryConfig.put("sasl.mechanism", "PLAIN");
351
registryConfig.put("sasl.jaas.config",
352
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
353
"username=\"schema-registry-user\" password=\"password\";");
354
```
355
356
**SSL/TLS:**
357
358
```java
359
// SSL configuration
360
registryConfig.put("schema.registry.ssl.endpoint.identification.algorithm", "https");
361
registryConfig.put("schema.registry.ssl.protocol", "TLSv1.2");
362
registryConfig.put("schema.registry.ssl.enabled.protocols", "TLSv1.2");
363
```
364
365
## Deployment Considerations
366
367
### High Availability
368
369
```java
370
// Multiple registry URLs for failover
371
String registryUrls = "http://registry1:8081,http://registry2:8081,http://registry3:8081";
372
registryConfig.put("schema.registry.url", registryUrls);
373
374
// Connection timeout configuration
375
registryConfig.put("schema.registry.request.timeout.ms", "30000");
376
registryConfig.put("schema.registry.connect.timeout.ms", "10000");
377
```
378
379
### Environment-specific Configuration
380
381
```java
382
// Environment-based configuration
383
public class RegistryConfigFactory {
384
public static Map<String, Object> createConfig(Environment env) {
385
Map<String, Object> config = new HashMap<>();
386
387
switch (env) {
388
case DEVELOPMENT:
389
config.put("schema.registry.url", "http://localhost:8081");
390
config.put("schema.registry.compatibility.level", "NONE");
391
break;
392
case PRODUCTION:
393
config.put("schema.registry.url", "https://prod-registry:8081");
394
config.put("schema.registry.compatibility.level", "BACKWARD");
395
// Add authentication and SSL configuration
396
break;
397
}
398
399
return config;
400
}
401
}
402
```