0
# Serialization
1
2
Abstract base classes for implementing custom key and value serializers/deserializers for converting between Python objects and bytes. Provides the foundation for data transformation in Kafka producers and consumers.
3
4
## Capabilities
5
6
### Abstract Base Classes
7
8
Foundation classes for implementing custom serialization logic.
9
10
```python { .api }
11
class Serializer:
12
"""
13
Abstract base class for key and value serializers.
14
15
Serializers convert Python objects to bytes for transmission to Kafka.
16
"""
17
18
def serialize(self, topic: str, value):
19
"""
20
Serialize a value to bytes.
21
22
Args:
23
topic (str): Topic name (can be used for topic-specific serialization)
24
value: Python object to serialize
25
26
Returns:
27
bytes: Serialized bytes, or None if value is None
28
29
Raises:
30
SerializationError: If serialization fails
31
"""
32
33
def close(self):
34
"""
35
Close the serializer and clean up resources.
36
37
Called when the producer/consumer is closed.
38
"""
39
40
class Deserializer:
41
"""
42
Abstract base class for key and value deserializers.
43
44
Deserializers convert bytes received from Kafka back to Python objects.
45
"""
46
47
def deserialize(self, topic: str, bytes_: bytes):
48
"""
49
Deserialize bytes to a Python object.
50
51
Args:
52
topic (str): Topic name (can be used for topic-specific deserialization)
53
bytes_ (bytes): Bytes to deserialize
54
55
Returns:
56
object: Deserialized Python object, or None if bytes_ is None
57
58
Raises:
59
SerializationError: If deserialization fails
60
"""
61
62
def close(self):
63
"""
64
Close the deserializer and clean up resources.
65
66
Called when the consumer is closed.
67
"""
68
```
69
70
### Serialization Errors
71
72
Exception class for serialization-related errors.
73
74
```python { .api }
75
class SerializationError(KafkaError):
76
"""Error occurred during serialization or deserialization."""
77
```
78
79
## Usage Examples
80
81
### String Serialization
82
83
```python
84
from kafka.serializer import Serializer, Deserializer
85
86
class StringSerializer(Serializer):
87
def __init__(self, encoding='utf-8'):
88
self.encoding = encoding
89
90
def serialize(self, topic, value):
91
if value is None:
92
return None
93
if isinstance(value, str):
94
return value.encode(self.encoding)
95
elif isinstance(value, bytes):
96
return value
97
else:
98
return str(value).encode(self.encoding)
99
100
def close(self):
101
pass
102
103
class StringDeserializer(Deserializer):
104
def __init__(self, encoding='utf-8'):
105
self.encoding = encoding
106
107
def deserialize(self, topic, bytes_):
108
if bytes_ is None:
109
return None
110
return bytes_.decode(self.encoding)
111
112
def close(self):
113
pass
114
115
# Usage with producer/consumer
116
from kafka import KafkaProducer, KafkaConsumer
117
118
producer = KafkaProducer(
119
bootstrap_servers=['localhost:9092'],
120
key_serializer=StringSerializer(),
121
value_serializer=StringSerializer()
122
)
123
124
consumer = KafkaConsumer(
125
'my-topic',
126
bootstrap_servers=['localhost:9092'],
127
key_deserializer=StringDeserializer(),
128
value_deserializer=StringDeserializer()
129
)
130
```
131
132
### JSON Serialization
133
134
```python
135
import json
136
from kafka.serializer import Serializer, Deserializer
137
from kafka.errors import SerializationError
138
139
class JSONSerializer(Serializer):
140
def __init__(self, encoding='utf-8'):
141
self.encoding = encoding
142
143
def serialize(self, topic, value):
144
if value is None:
145
return None
146
try:
147
return json.dumps(value).encode(self.encoding)
148
except (TypeError, ValueError) as e:
149
raise SerializationError(f"JSON serialization failed: {e}")
150
151
def close(self):
152
pass
153
154
class JSONDeserializer(Deserializer):
155
def __init__(self, encoding='utf-8'):
156
self.encoding = encoding
157
158
def deserialize(self, topic, bytes_):
159
if bytes_ is None:
160
return None
161
try:
162
return json.loads(bytes_.decode(self.encoding))
163
except (ValueError, UnicodeDecodeError) as e:
164
raise SerializationError(f"JSON deserialization failed: {e}")
165
166
def close(self):
167
pass
168
169
# Usage
170
producer = KafkaProducer(
171
bootstrap_servers=['localhost:9092'],
172
value_serializer=JSONSerializer()
173
)
174
175
# Send Python objects as JSON
176
producer.send('events', {
177
'user_id': 123,
178
'action': 'login',
179
'timestamp': '2024-01-01T12:00:00Z'
180
})
181
182
consumer = KafkaConsumer(
183
'events',
184
bootstrap_servers=['localhost:9092'],
185
value_deserializer=JSONDeserializer()
186
)
187
188
for message in consumer:
189
event = message.value # Already deserialized to Python dict
190
print(f"User {event['user_id']} performed {event['action']}")
191
```
192
193
### Avro Serialization
194
195
```python
196
import avro.schema
197
import avro.io
198
import io
199
from kafka.serializer import Serializer, Deserializer
200
from kafka.errors import SerializationError
201
202
class AvroSerializer(Serializer):
203
def __init__(self, schema_str):
204
self.schema = avro.schema.parse(schema_str)
205
206
def serialize(self, topic, value):
207
if value is None:
208
return None
209
try:
210
writer = avro.io.DatumWriter(self.schema)
211
bytes_writer = io.BytesIO()
212
encoder = avro.io.BinaryEncoder(bytes_writer)
213
writer.write(value, encoder)
214
return bytes_writer.getvalue()
215
except Exception as e:
216
raise SerializationError(f"Avro serialization failed: {e}")
217
218
def close(self):
219
pass
220
221
class AvroDeserializer(Deserializer):
222
def __init__(self, schema_str):
223
self.schema = avro.schema.parse(schema_str)
224
225
def deserialize(self, topic, bytes_):
226
if bytes_ is None:
227
return None
228
try:
229
reader = avro.io.DatumReader(self.schema)
230
bytes_reader = io.BytesIO(bytes_)
231
decoder = avro.io.BinaryDecoder(bytes_reader)
232
return reader.read(decoder)
233
except Exception as e:
234
raise SerializationError(f"Avro deserialization failed: {e}")
235
236
def close(self):
237
pass
238
239
# Schema definition
240
schema_str = """
241
{
242
"type": "record",
243
"name": "User",
244
"fields": [
245
{"name": "id", "type": "int"},
246
{"name": "name", "type": "string"},
247
{"name": "email", "type": "string"}
248
]
249
}
250
"""
251
252
producer = KafkaProducer(
253
bootstrap_servers=['localhost:9092'],
254
value_serializer=AvroSerializer(schema_str)
255
)
256
257
# Send Avro record
258
producer.send('users', {
259
'id': 123,
260
'name': 'Alice',
261
'email': 'alice@example.com'
262
})
263
```
264
265
### Protobuf Serialization
266
267
```python
268
from kafka.serializer import Serializer, Deserializer
269
from kafka.errors import SerializationError
270
# Assuming you have generated protobuf classes
271
272
class ProtobufSerializer(Serializer):
273
def __init__(self, protobuf_class):
274
self.protobuf_class = protobuf_class
275
276
def serialize(self, topic, value):
277
if value is None:
278
return None
279
try:
280
if isinstance(value, self.protobuf_class):
281
return value.SerializeToString()
282
else:
283
# Convert dict to protobuf object
284
pb_obj = self.protobuf_class()
285
for field, val in value.items():
286
setattr(pb_obj, field, val)
287
return pb_obj.SerializeToString()
288
except Exception as e:
289
raise SerializationError(f"Protobuf serialization failed: {e}")
290
291
def close(self):
292
pass
293
294
class ProtobufDeserializer(Deserializer):
295
def __init__(self, protobuf_class):
296
self.protobuf_class = protobuf_class
297
298
def deserialize(self, topic, bytes_):
299
if bytes_ is None:
300
return None
301
try:
302
pb_obj = self.protobuf_class()
303
pb_obj.ParseFromString(bytes_)
304
return pb_obj
305
except Exception as e:
306
raise SerializationError(f"Protobuf deserialization failed: {e}")
307
308
def close(self):
309
pass
310
```
311
312
### Topic-Specific Serialization
313
314
```python
315
from kafka.serializer import Serializer, Deserializer
316
import json
317
318
class TopicAwareJSONSerializer(Serializer):
319
def __init__(self):
320
self.topic_schemas = {
321
'user-events': ['user_id', 'action', 'timestamp'],
322
'order-events': ['order_id', 'status', 'amount'],
323
}
324
325
def serialize(self, topic, value):
326
if value is None:
327
return None
328
329
# Validate required fields for topic
330
if topic in self.topic_schemas:
331
required_fields = self.topic_schemas[topic]
332
for field in required_fields:
333
if field not in value:
334
raise SerializationError(f"Missing required field '{field}' for topic '{topic}'")
335
336
return json.dumps(value).encode('utf-8')
337
338
def close(self):
339
pass
340
341
producer = KafkaProducer(
342
bootstrap_servers=['localhost:9092'],
343
value_serializer=TopicAwareJSONSerializer()
344
)
345
346
# This will validate that required fields are present
347
producer.send('user-events', {
348
'user_id': 123,
349
'action': 'login',
350
'timestamp': '2024-01-01T12:00:00Z'
351
})
352
```
353
354
### Compression-Aware Serialization
355
356
```python
357
import gzip
358
import json
359
from kafka.serializer import Serializer, Deserializer
360
361
class CompressedJSONSerializer(Serializer):
362
def __init__(self, compression_threshold=1024):
363
self.compression_threshold = compression_threshold
364
365
def serialize(self, topic, value):
366
if value is None:
367
return None
368
369
json_bytes = json.dumps(value).encode('utf-8')
370
371
# Compress if data is large enough
372
if len(json_bytes) > self.compression_threshold:
373
# Add compression marker
374
return b'\x01' + gzip.compress(json_bytes)
375
else:
376
# No compression marker
377
return b'\x00' + json_bytes
378
379
def close(self):
380
pass
381
382
class CompressedJSONDeserializer(Deserializer):
383
def deserialize(self, topic, bytes_):
384
if bytes_ is None or len(bytes_) == 0:
385
return None
386
387
# Check compression marker
388
if bytes_[0] == 1: # Compressed
389
json_bytes = gzip.decompress(bytes_[1:])
390
else: # Uncompressed
391
json_bytes = bytes_[1:]
392
393
return json.loads(json_bytes.decode('utf-8'))
394
395
def close(self):
396
pass
397
```
398
399
### Error Handling in Serializers
400
401
```python
402
from kafka.serializer import Serializer, Deserializer
403
from kafka.errors import SerializationError
404
import json
405
import logging
406
407
logger = logging.getLogger(__name__)
408
409
class RobustJSONSerializer(Serializer):
410
def __init__(self, encoding='utf-8', strict=True):
411
self.encoding = encoding
412
self.strict = strict
413
414
def serialize(self, topic, value):
415
if value is None:
416
return None
417
418
try:
419
return json.dumps(value, ensure_ascii=False).encode(self.encoding)
420
except (TypeError, ValueError) as e:
421
if self.strict:
422
raise SerializationError(f"JSON serialization failed for topic '{topic}': {e}")
423
else:
424
# Fallback: serialize as string
425
logger.warning(f"JSON serialization failed for topic '{topic}', falling back to string: {e}")
426
return str(value).encode(self.encoding)
427
428
def close(self):
429
pass
430
431
class RobustJSONDeserializer(Deserializer):
432
def __init__(self, encoding='utf-8', strict=True):
433
self.encoding = encoding
434
self.strict = strict
435
436
def deserialize(self, topic, bytes_):
437
if bytes_ is None:
438
return None
439
440
try:
441
return json.loads(bytes_.decode(self.encoding))
442
except (ValueError, UnicodeDecodeError) as e:
443
if self.strict:
444
raise SerializationError(f"JSON deserialization failed for topic '{topic}': {e}")
445
else:
446
# Fallback: return raw string
447
logger.warning(f"JSON deserialization failed for topic '{topic}', returning raw string: {e}")
448
return bytes_.decode(self.encoding, errors='replace')
449
450
def close(self):
451
pass
452
```
453
454
### Lambda Function Serializers
455
456
```python
457
from kafka import KafkaProducer, KafkaConsumer
458
import json
459
import pickle
460
461
# Simple lambda serializers
462
producer = KafkaProducer(
463
bootstrap_servers=['localhost:9092'],
464
key_serializer=lambda k: str(k).encode('utf-8') if k is not None else None,
465
value_serializer=lambda v: json.dumps(v).encode('utf-8') if v is not None else None
466
)
467
468
consumer = KafkaConsumer(
469
'my-topic',
470
bootstrap_servers=['localhost:9092'],
471
key_deserializer=lambda k: k.decode('utf-8') if k is not None else None,
472
value_deserializer=lambda v: json.loads(v.decode('utf-8')) if v is not None else None
473
)
474
475
# Pickle serialization for Python objects (use with caution)
476
pickle_producer = KafkaProducer(
477
bootstrap_servers=['localhost:9092'],
478
value_serializer=lambda v: pickle.dumps(v) if v is not None else None
479
)
480
481
pickle_consumer = KafkaConsumer(
482
'pickle-topic',
483
bootstrap_servers=['localhost:9092'],
484
value_deserializer=lambda v: pickle.loads(v) if v is not None else None
485
)
486
```