0
# Serialization
1
2
Pluggable serialization system with security controls for encoding and decoding message payloads across different formats. Kombu's serialization registry provides a flexible way to handle different data formats while maintaining security.
3
4
## Capabilities
5
6
### Core Serialization Functions
7
8
Primary functions for encoding and decoding message data with automatic serializer selection and content type handling.
9
10
```python { .api }
11
def dumps(data, serializer=None):
12
"""
13
Encode data using specified or default serializer.
14
15
Parameters:
16
- data: Data to serialize
17
- serializer (str): Serializer name ('json', 'pickle', 'yaml', 'msgpack')
18
19
Returns:
20
tuple: (serialized_data, content_type, content_encoding)
21
"""
22
23
def loads(data, content_type, content_encoding='utf-8', accept=None, force=False, on_unknown_serializer=None, **kwargs):
24
"""
25
Decode data using content type information.
26
27
Parameters:
28
- data (bytes): Serialized data to decode
29
- content_type (str): Content type of the data
30
- content_encoding (str): Content encoding (default 'utf-8')
31
- accept (list): List of accepted content types
32
- force (bool): Force decoding even if content type not accepted
33
- on_unknown_serializer (callable): Handler for unknown serializer
34
35
Returns:
36
Deserialized data
37
38
Raises:
39
ContentDisallowed: If content type not in accept list
40
SerializationError: If deserialization fails
41
"""
42
```
43
44
### Serializer Registry Management
45
46
Functions for managing the global serializer registry, including registration of custom serializers and security controls.
47
48
```python { .api }
49
def register(name, encoder, decoder, content_type, content_encoding='utf-8'):
50
"""
51
Register new serializer in the global registry.
52
53
Parameters:
54
- name (str): Serializer name
55
- encoder (callable): Function to encode data
56
- decoder (callable): Function to decode data
57
- content_type (str): MIME content type
58
- content_encoding (str): Content encoding
59
60
Example:
61
def my_encoder(obj):
62
return json.dumps(obj).encode('utf-8')
63
64
def my_decoder(data):
65
return json.loads(data.decode('utf-8'))
66
67
register('myjson', my_encoder, my_decoder, 'application/x-myjson')
68
"""
69
70
def unregister(name):
71
"""
72
Unregister serializer from the global registry.
73
74
Parameters:
75
- name (str): Serializer name to remove
76
77
Returns:
78
bool: True if serializer was removed
79
"""
80
81
def prepare_accept_content(content_types, name_to_type=None):
82
"""
83
Replace serializer name aliases with full content type names.
84
85
Parameters:
86
- content_types (list): List of content types or serializer names
87
- name_to_type (dict): Mapping of names to content types
88
89
Returns:
90
set: Set of full content type names
91
"""
92
```
93
94
### Security Controls
95
96
Functions for controlling which serializers are enabled to prevent security vulnerabilities from unsafe deserialization.
97
98
```python { .api }
99
def enable_insecure_serializers(choices=None):
100
"""
101
Enable serializers considered unsafe (pickle, yaml, msgpack).
102
103
Parameters:
104
- choices (list): Specific serializers to enable (default: all insecure)
105
106
Warning:
107
Only enable insecure serializers in trusted environments.
108
Pickle and yaml can execute arbitrary code during deserialization.
109
"""
110
111
def disable_insecure_serializers(allowed=None):
112
"""
113
Disable untrusted serializers, keeping only safe ones.
114
115
Parameters:
116
- allowed (list): Serializers to keep enabled (default: ['json'])
117
118
This is the default security stance - only JSON is enabled by default.
119
"""
120
```
121
122
### Registry Access
123
124
Access to the global serialization registry and related constants.
125
126
```python { .api }
127
# Global serializer registry
128
registry: SerializerRegistry # Global serializer registry instance
129
130
# Security constants
131
TRUSTED_CONTENT: set # Set of trusted content types
132
SKIP_DECODE: set # Content encodings to skip decoding
133
```
134
135
## Usage Examples
136
137
### Basic Serialization
138
139
```python
140
from kombu.serialization import dumps, loads
141
142
# Serialize data with JSON (default)
143
data = {'message': 'hello', 'numbers': [1, 2, 3]}
144
serialized, content_type, encoding = dumps(data)
145
146
print(f"Content type: {content_type}") # application/json
147
print(f"Encoding: {encoding}") # utf-8
148
print(f"Serialized: {serialized}") # b'{"message": "hello", "numbers": [1, 2, 3]}'
149
150
# Deserialize data
151
deserialized = loads(serialized, content_type, encoding)
152
print(f"Deserialized: {deserialized}") # {'message': 'hello', 'numbers': [1, 2, 3]}
153
```
154
155
### Using Different Serializers
156
157
```python
158
from kombu.serialization import dumps, loads, enable_insecure_serializers
159
import pickle
160
161
# Enable pickle serializer (insecure!)
162
enable_insecure_serializers(['pickle'])
163
164
# Serialize with pickle
165
data = {'complex_object': object(), 'lambda': lambda x: x * 2}
166
serialized, content_type, encoding = dumps(data, serializer='pickle')
167
168
print(f"Content type: {content_type}") # application/x-python-serialize
169
170
# Deserialize pickle data
171
deserialized = loads(serialized, content_type, encoding)
172
print(f"Deserialized: {deserialized}")
173
174
# Serialize with msgpack (if available)
175
try:
176
enable_insecure_serializers(['msgpack'])
177
serialized, content_type, encoding = dumps(data, serializer='msgpack')
178
deserialized = loads(serialized, content_type, encoding)
179
except ImportError:
180
print("msgpack not available")
181
```
182
183
### Content Type Filtering
184
185
```python
186
from kombu.serialization import loads
187
from kombu.exceptions import ContentDisallowed
188
189
# Sample serialized data
190
json_data = b'{"message": "hello"}'
191
pickle_data = b'...' # pickle data
192
193
# Only accept JSON
194
accept_list = ['application/json']
195
196
try:
197
# This will work
198
result = loads(json_data, 'application/json', accept=accept_list)
199
print(f"JSON data: {result}")
200
201
# This will raise ContentDisallowed
202
result = loads(pickle_data, 'application/x-python-serialize', accept=accept_list)
203
except ContentDisallowed as e:
204
print(f"Content not allowed: {e}")
205
```
206
207
### Custom Serializer Registration
208
209
```python
210
from kombu.serialization import register, unregister, dumps, loads
211
import base64
212
import json
213
214
def base64_json_encoder(obj):
215
"""Encode as JSON then base64"""
216
json_data = json.dumps(obj).encode('utf-8')
217
return base64.b64encode(json_data)
218
219
def base64_json_decoder(data):
220
"""Decode base64 then JSON"""
221
json_data = base64.b64decode(data)
222
return json.loads(json_data.decode('utf-8'))
223
224
# Register custom serializer
225
register(
226
name='base64json',
227
encoder=base64_json_encoder,
228
decoder=base64_json_decoder,
229
content_type='application/x-base64-json'
230
)
231
232
# Use custom serializer
233
data = {'encoded': 'data', 'numbers': [1, 2, 3]}
234
serialized, content_type, encoding = dumps(data, serializer='base64json')
235
236
print(f"Content type: {content_type}") # application/x-base64-json
237
print(f"Serialized (base64): {serialized}")
238
239
# Deserialize
240
deserialized = loads(serialized, content_type, encoding)
241
print(f"Deserialized: {deserialized}")
242
243
# Clean up
244
unregister('base64json')
245
```
246
247
### Security Configuration
248
249
```python
250
from kombu.serialization import enable_insecure_serializers, disable_insecure_serializers
251
252
# Default: only JSON is enabled (secure)
253
print("Default secure configuration")
254
255
# Enable specific insecure serializers
256
enable_insecure_serializers(['pickle'])
257
print("Enabled pickle serializer")
258
259
# Enable all insecure serializers (dangerous!)
260
enable_insecure_serializers()
261
print("Enabled all insecure serializers")
262
263
# Disable all insecure serializers, keep only JSON
264
disable_insecure_serializers()
265
print("Back to secure configuration")
266
267
# Allow JSON and msgpack only
268
disable_insecure_serializers(['json', 'msgpack'])
269
print("JSON and msgpack only")
270
```
271
272
### Error Handling
273
274
```python
275
from kombu.serialization import loads
276
from kombu.exceptions import SerializationError, ContentDisallowed
277
278
def safe_deserialize(data, content_type, encoding='utf-8'):
279
"""Safely deserialize data with error handling"""
280
try:
281
return loads(
282
data,
283
content_type,
284
encoding,
285
accept=['application/json'], # Only accept JSON
286
force=False
287
)
288
except ContentDisallowed as e:
289
print(f"Content type not allowed: {e}")
290
return None
291
except SerializationError as e:
292
print(f"Failed to deserialize: {e}")
293
return None
294
except Exception as e:
295
print(f"Unexpected error: {e}")
296
return None
297
298
# Test with good data
299
good_data = b'{"status": "ok"}'
300
result = safe_deserialize(good_data, 'application/json')
301
print(f"Good data result: {result}")
302
303
# Test with disallowed content type
304
pickle_data = b'some pickle data'
305
result = safe_deserialize(pickle_data, 'application/x-python-serialize')
306
print(f"Pickle data result: {result}") # None
307
308
# Test with malformed data
309
bad_data = b'{"invalid": json}'
310
result = safe_deserialize(bad_data, 'application/json')
311
print(f"Bad data result: {result}") # None
312
```
313
314
### Registry Inspection
315
316
```python
317
from kombu.serialization import registry
318
319
# List available serializers
320
print("Available serializers:")
321
for name in registry._encoders.keys():
322
encoder_info = registry._encoders[name]
323
print(f" {name}: {encoder_info.content_type}")
324
325
# Check if a serializer is available
326
if 'json' in registry._encoders:
327
print("JSON serializer is available")
328
329
if 'pickle' in registry._encoders:
330
print("Pickle serializer is available")
331
else:
332
print("Pickle serializer not available (good for security)")
333
```
334
335
### Producer/Consumer Serialization Integration
336
337
```python
338
from kombu import Connection, Producer, Consumer, Queue
339
from kombu.serialization import enable_insecure_serializers
340
341
# Enable msgpack for better performance
342
enable_insecure_serializers(['msgpack'])
343
344
with Connection('redis://localhost:6379/0') as conn:
345
# Producer with specific serializer
346
producer = Producer(
347
conn.channel(),
348
serializer='msgpack',
349
compression='gzip'
350
)
351
352
# Publish with different serializers
353
producer.publish(
354
{'data': list(range(1000))},
355
routing_key='large_data',
356
serializer='msgpack' # Override default
357
)
358
359
producer.publish(
360
{'simple': 'message'},
361
routing_key='simple_data',
362
serializer='json' # Use JSON for simple data
363
)
364
365
# Consumer accepting multiple formats
366
def process_message(body, message):
367
print(f"Received: {type(body)} - {len(str(body))} chars")
368
print(f"Content type: {message.content_type}")
369
message.ack()
370
371
queue = Queue('mixed_format_queue')
372
consumer = Consumer(
373
conn.channel(),
374
[queue],
375
callbacks=[process_message],
376
accept=['application/json', 'application/x-msgpack']
377
)
378
379
consumer.consume()
380
# Process messages...
381
```
382
383
### Performance Comparison
384
385
```python
386
from kombu.serialization import dumps, loads, enable_insecure_serializers
387
import time
388
import sys
389
390
# Enable all serializers for testing
391
enable_insecure_serializers()
392
393
# Test data
394
test_data = {
395
'users': [{'id': i, 'name': f'user_{i}', 'active': i % 2 == 0} for i in range(1000)],
396
'metadata': {'timestamp': time.time(), 'version': '1.0'}
397
}
398
399
serializers = ['json', 'pickle', 'msgpack']
400
401
for serializer in serializers:
402
try:
403
# Serialize
404
start = time.time()
405
serialized, content_type, encoding = dumps(test_data, serializer=serializer)
406
serialize_time = time.time() - start
407
408
# Deserialize
409
start = time.time()
410
deserialized = loads(serialized, content_type, encoding)
411
deserialize_time = time.time() - start
412
413
print(f"{serializer.upper()}:")
414
print(f" Size: {len(serialized)} bytes")
415
print(f" Serialize: {serialize_time:.4f}s")
416
print(f" Deserialize: {deserialize_time:.4f}s")
417
print(f" Total: {serialize_time + deserialize_time:.4f}s")
418
print()
419
420
except ImportError:
421
print(f"{serializer.upper()}: Not available")
422
except Exception as e:
423
print(f"{serializer.upper()}: Error - {e}")
424
```