0
# Schema Registry Integration
1
2
Complete integration with Confluent Schema Registry supporting schema evolution, compatibility checking, and automatic serialization/deserialization for Avro, JSON Schema, and Protobuf formats.
3
4
## Capabilities
5
6
### SchemaRegistryClient
7
8
Synchronous client for interacting with Confluent Schema Registry.
9
10
```python { .api }
11
class SchemaRegistryClient:
12
def __init__(self, conf):
13
"""
14
Create SchemaRegistryClient instance.
15
16
Args:
17
conf (dict): Configuration properties including 'url' and optional auth settings
18
"""
19
20
def register_schema(self, subject_name, schema, normalize_schemas=False):
21
"""
22
Register a schema for the specified subject.
23
24
Args:
25
subject_name (str): Subject name for the schema
26
schema (Schema): Schema object to register
27
normalize_schemas (bool): Whether to normalize the schema
28
29
Returns:
30
int: Schema ID assigned by registry
31
32
Raises:
33
SchemaRegistryError: If registration fails
34
"""
35
36
def get_latest_version(self, subject_name):
37
"""
38
Get the latest schema version for a subject.
39
40
Args:
41
subject_name (str): Subject name
42
43
Returns:
44
RegisteredSchema: Latest registered schema
45
46
Raises:
47
SchemaRegistryError: If subject not found
48
"""
49
50
def get_version(self, subject_name, version):
51
"""
52
Get a specific schema version for a subject.
53
54
Args:
55
subject_name (str): Subject name
56
version (int): Schema version number
57
58
Returns:
59
RegisteredSchema: Registered schema at specified version
60
61
Raises:
62
SchemaRegistryError: If schema version not found
63
"""
64
65
def get_schema(self, schema_id, fetch_max_id=True):
66
"""
67
Get schema by ID.
68
69
Args:
70
schema_id (int): Schema ID
71
fetch_max_id (bool): Whether to fetch maximum schema ID
72
73
Returns:
74
Schema: Schema object
75
76
Raises:
77
SchemaRegistryError: If schema not found
78
"""
79
80
def get_subjects(self):
81
"""
82
Get list of all subjects.
83
84
Returns:
85
list: List of subject names
86
87
Raises:
88
SchemaRegistryError: If request fails
89
"""
90
91
def delete_subject(self, subject_name, permanent=False):
92
"""
93
Delete a subject.
94
95
Args:
96
subject_name (str): Subject name to delete
97
permanent (bool): Whether to permanently delete
98
99
Returns:
100
list: List of deleted version numbers
101
102
Raises:
103
SchemaRegistryError: If deletion fails
104
"""
105
106
def delete_version(self, subject_name, version, permanent=False):
107
"""
108
Delete a specific version of a subject.
109
110
Args:
111
subject_name (str): Subject name
112
version (int): Version number to delete
113
permanent (bool): Whether to permanently delete
114
115
Returns:
116
int: Deleted version number
117
118
Raises:
119
SchemaRegistryError: If deletion fails
120
"""
121
122
def get_compatibility(self, subject_name=None):
123
"""
124
Get compatibility level for subject or global default.
125
126
Args:
127
subject_name (str, optional): Subject name (None for global)
128
129
Returns:
130
str: Compatibility level
131
132
Raises:
133
SchemaRegistryError: If request fails
134
"""
135
136
def set_compatibility(self, subject_name=None, level=None):
137
"""
138
Set compatibility level for subject or global default.
139
140
Args:
141
subject_name (str, optional): Subject name (None for global)
142
level (str): Compatibility level to set
143
144
Returns:
145
str: Updated compatibility level
146
147
Raises:
148
SchemaRegistryError: If update fails
149
"""
150
151
def test_compatibility(self, subject_name, schema, version='latest'):
152
"""
153
Test schema compatibility with subject.
154
155
Args:
156
subject_name (str): Subject name
157
schema (Schema): Schema to test
158
version (str|int): Version to test against
159
160
Returns:
161
bool: True if compatible, False otherwise
162
163
Raises:
164
SchemaRegistryError: If test fails
165
"""
166
```
167
168
### AsyncSchemaRegistryClient
169
170
Asynchronous client for Schema Registry operations.
171
172
```python { .api }
173
class AsyncSchemaRegistryClient:
174
def __init__(self, conf):
175
"""
176
Create AsyncSchemaRegistryClient instance.
177
178
Args:
179
conf (dict): Configuration properties
180
"""
181
182
async def register_schema(self, subject_name, schema, normalize_schemas=False):
183
"""
184
Async version of register_schema.
185
186
Returns:
187
int: Schema ID
188
"""
189
190
async def get_latest_version(self, subject_name):
191
"""
192
Async version of get_latest_version.
193
194
Returns:
195
RegisteredSchema: Latest registered schema
196
"""
197
198
async def get_schema(self, schema_id, fetch_max_id=True):
199
"""
200
Async version of get_schema.
201
202
Returns:
203
Schema: Schema object
204
"""
205
206
async def close(self):
207
"""Close the async client and cleanup resources."""
208
```
209
210
### Schema Classes
211
212
#### Schema
213
214
Represents a schema with its type and definition.
215
216
```python { .api }
217
class Schema:
218
def __init__(self, schema_str, schema_type, references=None):
219
"""
220
Create Schema object.
221
222
Args:
223
schema_str (str): Schema definition string
224
schema_type (str): Schema type ('AVRO', 'JSON', 'PROTOBUF')
225
references (list, optional): List of schema references
226
"""
227
228
@property
229
def schema_str(self):
230
"""Schema definition string."""
231
232
@property
233
def schema_type(self):
234
"""Schema type."""
235
236
@property
237
def references(self):
238
"""Schema references."""
239
240
def __eq__(self, other):
241
"""Equality comparison."""
242
243
def __hash__(self):
244
"""Hash for use in sets and dicts."""
245
```
246
247
#### RegisteredSchema
248
249
Schema with registry metadata.
250
251
```python { .api }
252
class RegisteredSchema:
253
def __init__(self, schema_id, schema, version, subject):
254
"""
255
Create RegisteredSchema object.
256
257
Args:
258
schema_id (int): Schema ID in registry
259
schema (Schema): Schema object
260
version (int): Schema version
261
subject (str): Subject name
262
"""
263
264
@property
265
def schema_id(self):
266
"""Schema ID."""
267
268
@property
269
def schema(self):
270
"""Schema object."""
271
272
@property
273
def version(self):
274
"""Schema version."""
275
276
@property
277
def subject(self):
278
"""Subject name."""
279
```
280
281
### Avro Serialization
282
283
#### AvroSerializer
284
285
Serializes Python objects to Avro binary format with Schema Registry integration.
286
287
```python { .api }
288
class AvroSerializer:
289
def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None):
290
"""
291
Create AvroSerializer.
292
293
Args:
294
schema_registry_client (SchemaRegistryClient): Registry client
295
schema_str (str): Avro schema definition
296
to_dict (callable, optional): Function to convert object to dict
297
conf (dict, optional): Serializer configuration
298
"""
299
300
def __call__(self, obj, ctx):
301
"""
302
Serialize object to Avro bytes.
303
304
Args:
305
obj: Object to serialize
306
ctx (SerializationContext): Serialization context
307
308
Returns:
309
bytes: Serialized data with schema ID prefix
310
311
Raises:
312
SerializationError: If serialization fails
313
"""
314
```
315
316
#### AvroDeserializer
317
318
Deserializes Avro binary data to Python objects using Schema Registry.
319
320
```python { .api }
321
class AvroDeserializer:
322
def __init__(self, schema_registry_client, schema_str=None, from_dict=None, return_record_name=False):
323
"""
324
Create AvroDeserializer.
325
326
Args:
327
schema_registry_client (SchemaRegistryClient): Registry client
328
schema_str (str, optional): Reader schema definition
329
from_dict (callable, optional): Function to convert dict to object
330
return_record_name (bool): Whether to return record name
331
"""
332
333
def __call__(self, value, ctx):
334
"""
335
Deserialize Avro bytes to object.
336
337
Args:
338
value (bytes): Serialized data with schema ID prefix
339
ctx (SerializationContext): Serialization context
340
341
Returns:
342
object: Deserialized object
343
344
Raises:
345
SerializationError: If deserialization fails
346
"""
347
```
348
349
### JSON Schema Serialization
350
351
#### JSONSerializer
352
353
Serializes Python objects to JSON with Schema Registry integration.
354
355
```python { .api }
356
class JSONSerializer:
357
def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None):
358
"""
359
Create JSONSerializer.
360
361
Args:
362
schema_registry_client (SchemaRegistryClient): Registry client
363
schema_str (str): JSON schema definition
364
to_dict (callable, optional): Function to convert object to dict
365
conf (dict, optional): Serializer configuration
366
"""
367
368
def __call__(self, obj, ctx):
369
"""
370
Serialize object to JSON bytes.
371
372
Args:
373
obj: Object to serialize
374
ctx (SerializationContext): Serialization context
375
376
Returns:
377
bytes: Serialized JSON data with schema ID prefix
378
379
Raises:
380
SerializationError: If serialization fails
381
"""
382
```
383
384
#### JSONDeserializer
385
386
Deserializes JSON data to Python objects using Schema Registry.
387
388
```python { .api }
389
class JSONDeserializer:
390
def __init__(self, schema_registry_client, schema_str=None, from_dict=None):
391
"""
392
Create JSONDeserializer.
393
394
Args:
395
schema_registry_client (SchemaRegistryClient): Registry client
396
schema_str (str, optional): JSON schema definition
397
from_dict (callable, optional): Function to convert dict to object
398
"""
399
400
def __call__(self, value, ctx):
401
"""
402
Deserialize JSON bytes to object.
403
404
Args:
405
value (bytes): Serialized JSON data with schema ID prefix
406
ctx (SerializationContext): Serialization context
407
408
Returns:
409
object: Deserialized object
410
411
Raises:
412
SerializationError: If deserialization fails
413
"""
414
```
415
416
### Protobuf Serialization
417
418
#### ProtobufSerializer
419
420
Serializes Protobuf messages with Schema Registry integration.
421
422
```python { .api }
423
class ProtobufSerializer:
424
def __init__(self, msg_type, schema_registry_client, conf=None):
425
"""
426
Create ProtobufSerializer.
427
428
Args:
429
msg_type: Protobuf message class
430
schema_registry_client (SchemaRegistryClient): Registry client
431
conf (dict, optional): Serializer configuration
432
"""
433
434
def __call__(self, obj, ctx):
435
"""
436
Serialize Protobuf message to bytes.
437
438
Args:
439
obj: Protobuf message instance
440
ctx (SerializationContext): Serialization context
441
442
Returns:
443
bytes: Serialized data with schema ID prefix
444
445
Raises:
446
SerializationError: If serialization fails
447
"""
448
```
449
450
#### ProtobufDeserializer
451
452
Deserializes Protobuf binary data using Schema Registry.
453
454
```python { .api }
455
class ProtobufDeserializer:
456
def __init__(self, msg_type, schema_registry_client, conf=None):
457
"""
458
Create ProtobufDeserializer.
459
460
Args:
461
msg_type: Protobuf message class
462
schema_registry_client (SchemaRegistryClient): Registry client
463
conf (dict, optional): Deserializer configuration
464
"""
465
466
def __call__(self, value, ctx):
467
"""
468
Deserialize Protobuf bytes to message.
469
470
Args:
471
value (bytes): Serialized data with schema ID prefix
472
ctx (SerializationContext): Serialization context
473
474
Returns:
475
object: Deserialized Protobuf message
476
477
Raises:
478
SerializationError: If deserialization fails
479
"""
480
```
481
482
### Subject Naming Strategies
483
484
Functions for generating subject names for schemas.
485
486
```python { .api }
487
def topic_subject_name_strategy(ctx, record_name):
488
"""
489
Create subject name as {topic}-{key|value}.
490
491
Args:
492
ctx (SerializationContext): Context with topic and field info
493
record_name (str): Record name (unused)
494
495
Returns:
496
str: Subject name in format 'topic-key' or 'topic-value'
497
"""
498
499
def topic_record_subject_name_strategy(ctx, record_name):
500
"""
501
Create subject name as {topic}-{record_name}.
502
503
Args:
504
ctx (SerializationContext): Context with topic info
505
record_name (str): Record name from schema
506
507
Returns:
508
str: Subject name in format 'topic-recordname'
509
"""
510
511
def record_subject_name_strategy(ctx, record_name):
512
"""
513
Create subject name as {record_name}.
514
515
Args:
516
ctx (SerializationContext): Context (unused)
517
record_name (str): Record name from schema
518
519
Returns:
520
str: Subject name as record name
521
"""
522
```
523
524
### Schema ID Serialization
525
526
Functions for handling schema ID serialization in messages.
527
528
```python { .api }
529
def prefix_schema_id_serializer(schema_id, data):
530
"""
531
Serialize schema ID into payload prefix.
532
533
Args:
534
schema_id (int): Schema ID to serialize
535
data (bytes): Message payload
536
537
Returns:
538
bytes: Data with 5-byte schema ID prefix
539
"""
540
541
def header_schema_id_serializer(schema_id, data):
542
"""
543
Serialize schema ID into message headers.
544
545
Args:
546
schema_id (int): Schema ID to serialize
547
data (bytes): Message payload
548
549
Returns:
550
tuple: (data, headers_dict)
551
"""
552
553
def prefix_schema_id_deserializer(data):
554
"""
555
Deserialize schema ID from payload prefix.
556
557
Args:
558
data (bytes): Data with schema ID prefix
559
560
Returns:
561
tuple: (schema_id, payload)
562
"""
563
564
def dual_schema_id_deserializer(data, headers=None):
565
"""
566
Deserialize schema ID from headers or payload prefix.
567
568
Args:
569
data (bytes): Message payload
570
headers (dict, optional): Message headers
571
572
Returns:
573
tuple: (schema_id, payload)
574
"""
575
```
576
577
### Configuration Classes
578
579
#### ServerConfig
580
581
Schema Registry server configuration.
582
583
```python { .api }
584
class ServerConfig:
585
@property
586
def compatibility_level(self):
587
"""Default compatibility level."""
588
589
@property
590
def mode(self):
591
"""Schema Registry mode."""
592
```
593
594
### Enumeration Classes
595
596
```python { .api }
597
class ConfigCompatibilityLevel:
598
BACKWARD = "BACKWARD"
599
BACKWARD_TRANSITIVE = "BACKWARD_TRANSITIVE"
600
FORWARD = "FORWARD"
601
FORWARD_TRANSITIVE = "FORWARD_TRANSITIVE"
602
FULL = "FULL"
603
FULL_TRANSITIVE = "FULL_TRANSITIVE"
604
NONE = "NONE"
605
606
class RuleKind:
607
CONDITION = "CONDITION"
608
TRANSFORM = "TRANSFORM"
609
610
class RuleMode:
611
UPGRADE = "UPGRADE"
612
DOWNGRADE = "DOWNGRADE"
613
UPDOWN = "UPDOWN"
614
WRITE = "WRITE"
615
READ = "READ"
616
WRITEREAD = "WRITEREAD"
617
```
618
619
### Usage Examples
620
621
#### Basic Avro Serialization
622
623
```python
624
from confluent_kafka import SerializingProducer, DeserializingConsumer
625
from confluent_kafka.schema_registry import SchemaRegistryClient
626
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
627
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
628
629
# Schema Registry client
630
schema_registry_conf = {'url': 'http://localhost:8081'}
631
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
632
633
# Avro schema
634
value_schema_str = """
635
{
636
"type": "record",
637
"name": "User",
638
"fields": [
639
{"name": "name", "type": "string"},
640
{"name": "age", "type": "int"}
641
]
642
}
643
"""
644
645
# Create serializer
646
avro_serializer = AvroSerializer(schema_registry_client, value_schema_str)
647
648
# Producer configuration
649
producer_conf = {
650
'bootstrap.servers': 'localhost:9092',
651
'key.serializer': StringSerializer('utf_8'),
652
'value.serializer': avro_serializer
653
}
654
655
producer = SerializingProducer(producer_conf)
656
657
# Produce message
658
user_record = {'name': 'Alice', 'age': 30}
659
producer.produce(topic='users', key='user1', value=user_record)
660
producer.flush()
661
662
# Consumer configuration
663
avro_deserializer = AvroDeserializer(schema_registry_client, value_schema_str)
664
consumer_conf = {
665
'bootstrap.servers': 'localhost:9092',
666
'key.deserializer': StringDeserializer('utf_8'),
667
'value.deserializer': avro_deserializer,
668
'group.id': 'user-group',
669
'auto.offset.reset': 'earliest'
670
}
671
672
consumer = DeserializingConsumer(consumer_conf)
673
consumer.subscribe(['users'])
674
675
msg = consumer.poll(1.0)
676
if msg is not None:
677
user_object = msg.value()
678
print(f"User: {user_object['name']}, Age: {user_object['age']}")
679
680
consumer.close()
681
```
682
683
#### Schema Evolution Example
684
685
```python
686
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema
687
688
schema_registry_client = SchemaRegistryClient({'url': 'http://localhost:8081'})
689
690
# Register initial schema
691
initial_schema = Schema("""
692
{
693
"type": "record",
694
"name": "User",
695
"fields": [
696
{"name": "name", "type": "string"},
697
{"name": "age", "type": "int"}
698
]
699
}
700
""", schema_type='AVRO')
701
702
schema_id = schema_registry_client.register_schema('users-value', initial_schema)
703
print(f"Registered schema with ID: {schema_id}")
704
705
# Evolve schema (add optional field)
706
evolved_schema = Schema("""
707
{
708
"type": "record",
709
"name": "User",
710
"fields": [
711
{"name": "name", "type": "string"},
712
{"name": "age", "type": "int"},
713
{"name": "email", "type": ["null", "string"], "default": null}
714
]
715
}
716
""", schema_type='AVRO')
717
718
# Test compatibility
719
compatible = schema_registry_client.test_compatibility('users-value', evolved_schema)
720
print(f"Schema is compatible: {compatible}")
721
722
if compatible:
723
new_schema_id = schema_registry_client.register_schema('users-value', evolved_schema)
724
print(f"Registered evolved schema with ID: {new_schema_id}")
725
```
726
727
#### JSON Schema Example
728
729
```python
730
from confluent_kafka.schema_registry.json_schema import JSONSerializer, JSONDeserializer
731
732
# JSON schema
733
json_schema_str = """
734
{
735
"$schema": "http://json-schema.org/draft-07/schema#",
736
"type": "object",
737
"properties": {
738
"name": {"type": "string"},
739
"age": {"type": "integer", "minimum": 0}
740
},
741
"required": ["name", "age"]
742
}
743
"""
744
745
json_serializer = JSONSerializer(schema_registry_client, json_schema_str)
746
json_deserializer = JSONDeserializer(schema_registry_client, json_schema_str)
747
748
# Use with SerializingProducer/DeserializingConsumer
749
producer_conf = {
750
'bootstrap.servers': 'localhost:9092',
751
'value.serializer': json_serializer
752
}
753
754
consumer_conf = {
755
'bootstrap.servers': 'localhost:9092',
756
'value.deserializer': json_deserializer,
757
'group.id': 'json-group',
758
'auto.offset.reset': 'earliest'
759
}
760
```
761
762
#### Protobuf Example
763
764
```python
765
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer, ProtobufDeserializer
766
import user_pb2 # Generated from .proto file
767
768
# Protobuf serializer/deserializer
769
protobuf_serializer = ProtobufSerializer(user_pb2.User, schema_registry_client)
770
protobuf_deserializer = ProtobufDeserializer(user_pb2.User, schema_registry_client)
771
772
# Create protobuf message
773
user = user_pb2.User()
774
user.name = "Bob"
775
user.age = 25
776
777
# Use with producer/consumer
778
producer_conf = {
779
'bootstrap.servers': 'localhost:9092',
780
'value.serializer': protobuf_serializer
781
}
782
783
producer = SerializingProducer(producer_conf)
784
producer.produce('users-protobuf', value=user)
785
producer.flush()
786
```