0
# Serialization Framework
1
2
Comprehensive serialization framework providing pluggable serialization/deserialization with built-in support for common data types and high-level producer/consumer APIs with automatic serialization.
3
4
## Capabilities
5
6
### Base Serializer/Deserializer Classes
7
8
Abstract base classes for implementing custom serialization logic.
9
10
```python { .api }
11
class Serializer:
12
def __call__(self, obj, ctx=None):
13
"""
14
Serialize an object.
15
16
Args:
17
obj: Object to serialize
18
ctx (SerializationContext, optional): Serialization context
19
20
Returns:
21
bytes: Serialized data
22
23
Raises:
24
SerializationError: If serialization fails
25
"""
26
raise NotImplementedError
27
28
class Deserializer:
29
def __call__(self, value, ctx=None):
30
"""
31
Deserialize data.
32
33
Args:
34
value (bytes): Data to deserialize
35
ctx (SerializationContext, optional): Serialization context
36
37
Returns:
38
object: Deserialized object
39
40
Raises:
41
SerializationError: If deserialization fails
42
"""
43
raise NotImplementedError
44
```
45
46
### Built-in Serializers
47
48
Ready-to-use serializers for common data types.
49
50
#### StringSerializer
51
52
```python { .api }
53
class StringSerializer(Serializer):
54
def __init__(self, codec='utf_8'):
55
"""
56
Create StringSerializer.
57
58
Args:
59
codec (str): Character encoding (default: 'utf_8')
60
"""
61
62
def __call__(self, obj, ctx=None):
63
"""
64
Serialize string to bytes.
65
66
Args:
67
obj (str): String to serialize
68
ctx (SerializationContext, optional): Serialization context
69
70
Returns:
71
bytes: UTF-8 encoded string or None if obj is None
72
73
Raises:
74
SerializationError: If obj is not a string
75
"""
76
```
77
78
#### IntegerSerializer
79
80
```python { .api }
81
class IntegerSerializer(Serializer):
82
def __call__(self, obj, ctx=None):
83
"""
84
Serialize integer to int32 bytes.
85
86
Args:
87
obj (int): Integer to serialize
88
ctx (SerializationContext, optional): Serialization context
89
90
Returns:
91
bytes: 4-byte big-endian int32 or None if obj is None
92
93
Raises:
94
SerializationError: If obj is not an integer or out of int32 range
95
"""
96
```
97
98
#### DoubleSerializer
99
100
```python { .api }
101
class DoubleSerializer(Serializer):
102
def __call__(self, obj, ctx=None):
103
"""
104
Serialize float to IEEE 754 binary64 bytes.
105
106
Args:
107
obj (float): Float to serialize
108
ctx (SerializationContext, optional): Serialization context
109
110
Returns:
111
bytes: 8-byte IEEE 754 binary64 or None if obj is None
112
113
Raises:
114
SerializationError: If obj is not a float
115
"""
116
```
117
118
### Built-in Deserializers
119
120
Ready-to-use deserializers for common data types.
121
122
#### StringDeserializer
123
124
```python { .api }
125
class StringDeserializer(Deserializer):
126
def __init__(self, codec='utf_8'):
127
"""
128
Create StringDeserializer.
129
130
Args:
131
codec (str): Character encoding (default: 'utf_8')
132
"""
133
134
def __call__(self, value, ctx=None):
135
"""
136
Deserialize bytes to string.
137
138
Args:
139
value (bytes): Bytes to deserialize
140
ctx (SerializationContext, optional): Serialization context
141
142
Returns:
143
str: Decoded string or None if value is None
144
145
Raises:
146
SerializationError: If decoding fails
147
"""
148
```
149
150
#### IntegerDeserializer
151
152
```python { .api }
153
class IntegerDeserializer(Deserializer):
154
def __call__(self, value, ctx=None):
155
"""
156
Deserialize int32 bytes to integer.
157
158
Args:
159
value (bytes): 4-byte big-endian int32
160
ctx (SerializationContext, optional): Serialization context
161
162
Returns:
163
int: Deserialized integer or None if value is None
164
165
Raises:
166
SerializationError: If value is not 4 bytes
167
"""
168
```
169
170
#### DoubleDeserializer
171
172
```python { .api }
173
class DoubleDeserializer(Deserializer):
174
def __call__(self, value, ctx=None):
175
"""
176
Deserialize IEEE 754 binary64 bytes to float.
177
178
Args:
179
value (bytes): 8-byte IEEE 754 binary64
180
ctx (SerializationContext, optional): Serialization context
181
182
Returns:
183
float: Deserialized float or None if value is None
184
185
Raises:
186
SerializationError: If value is not 8 bytes
187
"""
188
```
189
190
### Serialization Context
191
192
Provides contextual information for serialization operations.
193
194
```python { .api }
195
class SerializationContext:
196
def __init__(self, topic, field, headers=None):
197
"""
198
Create SerializationContext.
199
200
Args:
201
topic (str): Topic name
202
field (MessageField): Message field being serialized
203
headers (dict, optional): Message headers
204
"""
205
206
@property
207
def topic(self):
208
"""
209
Topic name.
210
211
Returns:
212
str: Topic name
213
"""
214
215
@property
216
def field(self):
217
"""
218
Message field being serialized.
219
220
Returns:
221
MessageField: NONE, KEY, or VALUE
222
"""
223
224
@property
225
def headers(self):
226
"""
227
Message headers.
228
229
Returns:
230
dict: Message headers or None
231
"""
232
```
233
234
#### MessageField
235
236
```python { .api }
237
class MessageField:
238
NONE = 0
239
KEY = 1
240
VALUE = 2
241
```
242
243
### High-Level Producer/Consumer APIs
244
245
#### SerializingProducer
246
247
High-level producer with pluggable serialization.
248
249
```python { .api }
250
class SerializingProducer:
251
def __init__(self, conf):
252
"""
253
Create SerializingProducer.
254
255
Args:
256
conf (dict): Configuration including 'key.serializer' and 'value.serializer'
257
"""
258
259
def produce(self, topic, key=None, value=None, partition=-1, on_delivery=None, timestamp=0, headers=None):
260
"""
261
Produce message with automatic serialization.
262
263
Args:
264
topic (str): Topic to produce to
265
key: Key object (will be serialized using key.serializer)
266
value: Value object (will be serialized using value.serializer)
267
partition (int, optional): Specific partition (-1 for automatic)
268
on_delivery (callable, optional): Delivery report callback
269
timestamp (int, optional): Message timestamp (0 for current time)
270
headers (dict, optional): Message headers
271
272
Raises:
273
SerializationError: If serialization fails
274
BufferError: If local producer queue is full
275
KafkaException: For other produce errors
276
"""
277
278
def poll(self, timeout=-1):
279
"""
280
Poll for events and call registered callbacks.
281
282
Args:
283
timeout (float): Maximum time to wait in seconds (-1 for infinite)
284
285
Returns:
286
int: Number of events processed
287
"""
288
289
def flush(self, timeout=-1):
290
"""
291
Wait for all messages to be delivered.
292
293
Args:
294
timeout (float): Maximum time to wait in seconds (-1 for infinite)
295
296
Returns:
297
int: Number of messages still in queue (0 on success)
298
"""
299
300
def purge(self, in_queue=True, in_flight=True, blocking=True):
301
"""
302
Purge messages from internal queues.
303
304
Args:
305
in_queue (bool): Purge messages in local queue
306
in_flight (bool): Purge messages in flight to broker
307
blocking (bool): Block until purge is complete
308
309
Returns:
310
int: Number of messages purged
311
"""
312
313
def abort_transaction(self, timeout=-1):
314
"""Abort ongoing transaction."""
315
316
def begin_transaction(self):
317
"""Begin a new transaction."""
318
319
def commit_transaction(self, timeout=-1):
320
"""Commit current transaction."""
321
322
def init_transactions(self, timeout=-1):
323
"""Initialize transactions for this producer."""
324
```
325
326
#### DeserializingConsumer
327
328
High-level consumer with pluggable deserialization.
329
330
```python { .api }
331
class DeserializingConsumer:
332
def __init__(self, conf):
333
"""
334
Create DeserializingConsumer.
335
336
Args:
337
conf (dict): Configuration including 'key.deserializer' and 'value.deserializer'
338
"""
339
340
def subscribe(self, topics, listener=None):
341
"""
342
Subscribe to list of topics for automatic partition assignment.
343
344
Args:
345
topics (list): List of topic names to subscribe to
346
listener (RebalanceCallback, optional): Rebalance callback
347
"""
348
349
def poll(self, timeout=-1):
350
"""
351
Poll for messages with automatic deserialization.
352
353
Args:
354
timeout (float): Maximum time to wait in seconds (-1 for infinite)
355
356
Returns:
357
Message: Message with deserialized key/value or None if timeout
358
359
Note:
360
If deserialization fails, the error is stored in the message
361
and can be accessed via ConsumeError.
362
"""
363
364
def consume(self, num_messages=1, timeout=-1):
365
"""
366
Consume multiple messages (not implemented).
367
368
Raises:
369
NotImplementedError: This method is not supported
370
"""
371
372
def assign(self, partitions):
373
"""
374
Manually assign partitions to consume from.
375
376
Args:
377
partitions (list): List of TopicPartition objects
378
"""
379
380
def assignment(self):
381
"""
382
Get current partition assignment.
383
384
Returns:
385
list: List of assigned TopicPartition objects
386
"""
387
388
def unassign(self):
389
"""Remove current partition assignment."""
390
391
def commit(self, message=None, offsets=None, asynchronous=True):
392
"""
393
Commit message offset or specified offsets.
394
395
Args:
396
message (Message, optional): Commit offset for this message
397
offsets (list, optional): List of TopicPartition objects with offsets
398
asynchronous (bool): Commit asynchronously if True
399
400
Returns:
401
list: Committed offsets if synchronous, None if asynchronous
402
"""
403
404
def committed(self, partitions, timeout=-1):
405
"""
406
Get committed offsets for partitions.
407
408
Args:
409
partitions (list): List of TopicPartition objects
410
timeout (float): Maximum time to wait in seconds
411
412
Returns:
413
list: List of TopicPartition objects with committed offsets
414
"""
415
416
def position(self, partitions):
417
"""
418
Get current position (next fetch offset) for partitions.
419
420
Args:
421
partitions (list): List of TopicPartition objects
422
423
Returns:
424
list: List of TopicPartition objects with current positions
425
"""
426
427
def seek(self, partition):
428
"""
429
Seek to offset for partition.
430
431
Args:
432
partition (TopicPartition): Partition with offset to seek to
433
"""
434
435
def pause(self, partitions):
436
"""
437
Pause consumption for partitions.
438
439
Args:
440
partitions (list): List of TopicPartition objects to pause
441
"""
442
443
def resume(self, partitions):
444
"""
445
Resume consumption for partitions.
446
447
Args:
448
partitions (list): List of TopicPartition objects to resume
449
"""
450
451
def close(self):
452
"""Close the consumer and leave consumer group."""
453
454
def store_offsets(self, message=None, offsets=None):
455
"""
456
Store offset for message or specified offsets.
457
458
Args:
459
message (Message, optional): Store offset for this message
460
offsets (list, optional): List of TopicPartition objects with offsets
461
"""
462
```
463
464
### Error Classes
465
466
```python { .api }
467
class SerializationError(Exception):
468
"""Base class for serialization errors."""
469
470
def __init__(self, message, inner_exception=None):
471
"""
472
Create SerializationError.
473
474
Args:
475
message (str): Error message
476
inner_exception (Exception, optional): Underlying exception
477
"""
478
479
@property
480
def inner_exception(self):
481
"""Underlying exception that caused the serialization error."""
482
```
483
484
### Usage Examples
485
486
#### Custom Serializer/Deserializer
487
488
```python
489
from confluent_kafka.serialization import Serializer, Deserializer, SerializationError
490
import json
491
492
class JSONSerializer(Serializer):
493
"""Custom JSON serializer."""
494
495
def __call__(self, obj, ctx=None):
496
if obj is None:
497
return None
498
try:
499
return json.dumps(obj).encode('utf-8')
500
except Exception as e:
501
raise SerializationError(f"Failed to serialize to JSON: {e}")
502
503
class JSONDeserializer(Deserializer):
504
"""Custom JSON deserializer."""
505
506
def __call__(self, value, ctx=None):
507
if value is None:
508
return None
509
try:
510
return json.loads(value.decode('utf-8'))
511
except Exception as e:
512
raise SerializationError(f"Failed to deserialize JSON: {e}")
513
514
# Use custom serializers
515
from confluent_kafka import SerializingProducer, DeserializingConsumer
516
517
producer_conf = {
518
'bootstrap.servers': 'localhost:9092',
519
'key.serializer': StringSerializer('utf_8'),
520
'value.serializer': JSONSerializer()
521
}
522
523
consumer_conf = {
524
'bootstrap.servers': 'localhost:9092',
525
'key.deserializer': StringDeserializer('utf_8'),
526
'value.deserializer': JSONDeserializer(),
527
'group.id': 'json-group',
528
'auto.offset.reset': 'earliest'
529
}
530
531
producer = SerializingProducer(producer_conf)
532
consumer = DeserializingConsumer(consumer_conf)
533
```
534
535
#### Using Built-in Serializers
536
537
```python
538
from confluent_kafka import SerializingProducer, DeserializingConsumer
539
from confluent_kafka.serialization import (
540
StringSerializer, StringDeserializer,
541
IntegerSerializer, IntegerDeserializer,
542
DoubleSerializer, DoubleDeserializer
543
)
544
545
# Producer with different serializers for key and value
546
producer_conf = {
547
'bootstrap.servers': 'localhost:9092',
548
'key.serializer': StringSerializer('utf_8'),
549
'value.serializer': IntegerSerializer()
550
}
551
552
producer = SerializingProducer(producer_conf)
553
554
# Produce messages with automatic serialization
555
for i in range(10):
556
producer.produce(
557
'numbers-topic',
558
key=f'key-{i}', # String key
559
value=i * 100 # Integer value
560
)
561
562
producer.flush()
563
564
# Consumer with corresponding deserializers
565
consumer_conf = {
566
'bootstrap.servers': 'localhost:9092',
567
'key.deserializer': StringDeserializer('utf_8'),
568
'value.deserializer': IntegerDeserializer(),
569
'group.id': 'numbers-group',
570
'auto.offset.reset': 'earliest'
571
}
572
573
consumer = DeserializingConsumer(consumer_conf)
574
consumer.subscribe(['numbers-topic'])
575
576
try:
577
while True:
578
msg = consumer.poll(1.0)
579
if msg is None:
580
continue
581
582
if msg.error():
583
print(f"Consumer error: {msg.error()}")
584
continue
585
586
# Key and value are automatically deserialized
587
print(f"Key: {msg.key()} (type: {type(msg.key())})")
588
print(f"Value: {msg.value()} (type: {type(msg.value())})")
589
590
finally:
591
consumer.close()
592
```
593
594
#### Serialization Context Usage
595
596
```python
597
from confluent_kafka.serialization import SerializationContext, MessageField
598
599
class ContextAwareSerializer(Serializer):
600
"""Serializer that uses serialization context."""
601
602
def __call__(self, obj, ctx=None):
603
if obj is None:
604
return None
605
606
# Use context information
607
if ctx is not None:
608
print(f"Serializing for topic: {ctx.topic}")
609
print(f"Field: {ctx.field}")
610
if ctx.headers:
611
print(f"Headers: {ctx.headers}")
612
613
# Different serialization based on field
614
if ctx and ctx.field == MessageField.KEY:
615
# Keys serialized as uppercase strings
616
return str(obj).upper().encode('utf-8')
617
else:
618
# Values serialized as JSON
619
return json.dumps(obj).encode('utf-8')
620
621
# The SerializingProducer automatically creates SerializationContext
622
# and passes it to serializers
623
producer_conf = {
624
'bootstrap.servers': 'localhost:9092',
625
'key.serializer': ContextAwareSerializer(),
626
'value.serializer': ContextAwareSerializer()
627
}
628
629
producer = SerializingProducer(producer_conf)
630
producer.produce('my-topic', key='mykey', value={'data': 'value'})
631
```
632
633
#### Error Handling in Serialization
634
635
```python
636
from confluent_kafka import SerializingProducer
637
from confluent_kafka.serialization import SerializationError
638
from confluent_kafka.error import ProduceError, KeySerializationError, ValueSerializationError
639
640
class StrictIntegerSerializer(Serializer):
641
"""Integer serializer that raises errors for non-integers."""
642
643
def __call__(self, obj, ctx=None):
644
if obj is None:
645
return None
646
if not isinstance(obj, int):
647
raise SerializationError(f"Expected int, got {type(obj)}")
648
return obj.to_bytes(4, 'big', signed=True)
649
650
producer_conf = {
651
'bootstrap.servers': 'localhost:9092',
652
'key.serializer': StringSerializer('utf_8'),
653
'value.serializer': StrictIntegerSerializer()
654
}
655
656
producer = SerializingProducer(producer_conf)
657
658
def delivery_callback(err, msg):
659
if err is not None:
660
if isinstance(err, ValueSerializationError):
661
print(f"Value serialization failed: {err}")
662
elif isinstance(err, KeySerializationError):
663
print(f"Key serialization failed: {err}")
664
else:
665
print(f"Other error: {err}")
666
else:
667
print(f"Message delivered: {msg.topic()} [{msg.partition()}]")
668
669
try:
670
# This will succeed
671
producer.produce('numbers', key='valid', value=42, callback=delivery_callback)
672
673
# This will fail due to serialization error
674
producer.produce('numbers', key='invalid', value='not-a-number', callback=delivery_callback)
675
676
except Exception as e:
677
print(f"Immediate error: {e}")
678
679
producer.poll(0) # Process delivery callbacks
680
producer.flush()
681
```
682
683
#### Advanced Configuration
684
685
```python
686
from confluent_kafka import SerializingProducer, DeserializingConsumer
687
688
# Producer with serializer-specific configuration
689
producer_conf = {
690
'bootstrap.servers': 'localhost:9092',
691
'key.serializer': StringSerializer('utf_8'),
692
'value.serializer': StringSerializer('ascii'), # Different encoding
693
694
# Standard producer settings
695
'acks': 'all',
696
'retries': 3,
697
'batch.size': 16384,
698
'linger.ms': 5
699
}
700
701
# Consumer with error handling configuration
702
consumer_conf = {
703
'bootstrap.servers': 'localhost:9092',
704
'key.deserializer': StringDeserializer('utf_8'),
705
'value.deserializer': StringDeserializer('utf_8'),
706
707
# Standard consumer settings
708
'group.id': 'my-group',
709
'auto.offset.reset': 'earliest',
710
'enable.auto.commit': False,
711
'max.poll.interval.ms': 300000
712
}
713
714
producer = SerializingProducer(producer_conf)
715
consumer = DeserializingConsumer(consumer_conf)
716
717
# Error handling in consumer
718
try:
719
while True:
720
msg = consumer.poll(1.0)
721
if msg is None:
722
continue
723
724
if msg.error():
725
print(f"Message error: {msg.error()}")
726
continue
727
728
# Process message
729
try:
730
key = msg.key()
731
value = msg.value()
732
733
# Process deserialized data
734
print(f"Processed: key={key}, value={value}")
735
736
# Manual commit after successful processing
737
consumer.commit(message=msg)
738
739
except Exception as e:
740
print(f"Processing error: {e}")
741
# Could skip this message or handle error differently
742
743
finally:
744
consumer.close()
745
```