Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.
—
Pluggable serialization system with security controls for encoding and decoding message payloads across different formats. Kombu's serialization registry provides a flexible way to handle different data formats while maintaining security.
Primary functions for encoding and decoding message data with automatic serializer selection and content type handling.
def dumps(data, serializer=None):
"""
Encode data using specified or default serializer.
Parameters:
- data: Data to serialize
- serializer (str): Serializer name ('json', 'pickle', 'yaml', 'msgpack')
Returns:
tuple: (serialized_data, content_type, content_encoding)
"""
def loads(data, content_type, content_encoding='utf-8', accept=None, force=False, on_unknown_serializer=None, **kwargs):
"""
Decode data using content type information.
Parameters:
- data (bytes): Serialized data to decode
- content_type (str): Content type of the data
- content_encoding (str): Content encoding (default 'utf-8')
- accept (list): List of accepted content types
- force (bool): Force decoding even if content type not accepted
- on_unknown_serializer (callable): Handler for unknown serializer
Returns:
Deserialized data
Raises:
ContentDisallowed: If content type not in accept list
SerializationError: If deserialization fails
"""Functions for managing the global serializer registry, including registration of custom serializers and security controls.
def register(name, encoder, decoder, content_type, content_encoding='utf-8'):
"""
Register new serializer in the global registry.
Parameters:
- name (str): Serializer name
- encoder (callable): Function to encode data
- decoder (callable): Function to decode data
- content_type (str): MIME content type
- content_encoding (str): Content encoding
Example:
def my_encoder(obj):
return json.dumps(obj).encode('utf-8')
def my_decoder(data):
return json.loads(data.decode('utf-8'))
register('myjson', my_encoder, my_decoder, 'application/x-myjson')
"""
def unregister(name):
"""
Unregister serializer from the global registry.
Parameters:
- name (str): Serializer name to remove
Returns:
bool: True if serializer was removed
"""
def prepare_accept_content(content_types, name_to_type=None):
"""
Replace serializer name aliases with full content type names.
Parameters:
- content_types (list): List of content types or serializer names
- name_to_type (dict): Mapping of names to content types
Returns:
set: Set of full content type names
"""Functions for controlling which serializers are enabled to prevent security vulnerabilities from unsafe deserialization.
def enable_insecure_serializers(choices=None):
"""
Enable serializers considered unsafe (pickle, yaml, msgpack).
Parameters:
- choices (list): Specific serializers to enable (default: all insecure)
Warning:
Only enable insecure serializers in trusted environments.
Pickle and yaml can execute arbitrary code during deserialization.
"""
def disable_insecure_serializers(allowed=None):
"""
Disable untrusted serializers, keeping only safe ones.
Parameters:
- allowed (list): Serializers to keep enabled (default: ['json'])
This is the default security stance - only JSON is enabled by default.
"""Access to the global serialization registry and related constants.
# Global serializer registry
registry: SerializerRegistry # Global serializer registry instance
# Security constants
TRUSTED_CONTENT: set # Set of trusted content types
SKIP_DECODE: set # Content encodings to skip decodingfrom kombu.serialization import dumps, loads
# Serialize data with JSON (default)
data = {'message': 'hello', 'numbers': [1, 2, 3]}
serialized, content_type, encoding = dumps(data)
print(f"Content type: {content_type}") # application/json
print(f"Encoding: {encoding}") # utf-8
print(f"Serialized: {serialized}") # b'{"message": "hello", "numbers": [1, 2, 3]}'
# Deserialize data
deserialized = loads(serialized, content_type, encoding)
print(f"Deserialized: {deserialized}") # {'message': 'hello', 'numbers': [1, 2, 3]}from kombu.serialization import dumps, loads, enable_insecure_serializers
import pickle
# Enable pickle serializer (insecure!)
enable_insecure_serializers(['pickle'])
# Serialize with pickle
data = {'complex_object': object(), 'lambda': lambda x: x * 2}
serialized, content_type, encoding = dumps(data, serializer='pickle')
print(f"Content type: {content_type}") # application/x-python-serialize
# Deserialize pickle data
deserialized = loads(serialized, content_type, encoding)
print(f"Deserialized: {deserialized}")
# Serialize with msgpack (if available)
try:
enable_insecure_serializers(['msgpack'])
serialized, content_type, encoding = dumps(data, serializer='msgpack')
deserialized = loads(serialized, content_type, encoding)
except ImportError:
print("msgpack not available")from kombu.serialization import loads
from kombu.exceptions import ContentDisallowed
# Sample serialized data
json_data = b'{"message": "hello"}'
pickle_data = b'...' # pickle data
# Only accept JSON
accept_list = ['application/json']
try:
# This will work
result = loads(json_data, 'application/json', accept=accept_list)
print(f"JSON data: {result}")
# This will raise ContentDisallowed
result = loads(pickle_data, 'application/x-python-serialize', accept=accept_list)
except ContentDisallowed as e:
print(f"Content not allowed: {e}")from kombu.serialization import register, unregister, dumps, loads
import base64
import json
def base64_json_encoder(obj):
"""Encode as JSON then base64"""
json_data = json.dumps(obj).encode('utf-8')
return base64.b64encode(json_data)
def base64_json_decoder(data):
"""Decode base64 then JSON"""
json_data = base64.b64decode(data)
return json.loads(json_data.decode('utf-8'))
# Register custom serializer
register(
name='base64json',
encoder=base64_json_encoder,
decoder=base64_json_decoder,
content_type='application/x-base64-json'
)
# Use custom serializer
data = {'encoded': 'data', 'numbers': [1, 2, 3]}
serialized, content_type, encoding = dumps(data, serializer='base64json')
print(f"Content type: {content_type}") # application/x-base64-json
print(f"Serialized (base64): {serialized}")
# Deserialize
deserialized = loads(serialized, content_type, encoding)
print(f"Deserialized: {deserialized}")
# Clean up
unregister('base64json')from kombu.serialization import enable_insecure_serializers, disable_insecure_serializers
# Default: only JSON is enabled (secure)
print("Default secure configuration")
# Enable specific insecure serializers
enable_insecure_serializers(['pickle'])
print("Enabled pickle serializer")
# Enable all insecure serializers (dangerous!)
enable_insecure_serializers()
print("Enabled all insecure serializers")
# Disable all insecure serializers, keep only JSON
disable_insecure_serializers()
print("Back to secure configuration")
# Allow JSON and msgpack only
disable_insecure_serializers(['json', 'msgpack'])
print("JSON and msgpack only")from kombu.serialization import loads
from kombu.exceptions import SerializationError, ContentDisallowed
def safe_deserialize(data, content_type, encoding='utf-8'):
"""Safely deserialize data with error handling"""
try:
return loads(
data,
content_type,
encoding,
accept=['application/json'], # Only accept JSON
force=False
)
except ContentDisallowed as e:
print(f"Content type not allowed: {e}")
return None
except SerializationError as e:
print(f"Failed to deserialize: {e}")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None
# Test with good data
good_data = b'{"status": "ok"}'
result = safe_deserialize(good_data, 'application/json')
print(f"Good data result: {result}")
# Test with disallowed content type
pickle_data = b'some pickle data'
result = safe_deserialize(pickle_data, 'application/x-python-serialize')
print(f"Pickle data result: {result}") # None
# Test with malformed data
bad_data = b'{"invalid": json}'
result = safe_deserialize(bad_data, 'application/json')
print(f"Bad data result: {result}") # Nonefrom kombu.serialization import registry
# List available serializers
print("Available serializers:")
for name in registry._encoders.keys():
encoder_info = registry._encoders[name]
print(f" {name}: {encoder_info.content_type}")
# Check if a serializer is available
if 'json' in registry._encoders:
print("JSON serializer is available")
if 'pickle' in registry._encoders:
print("Pickle serializer is available")
else:
print("Pickle serializer not available (good for security)")from kombu import Connection, Producer, Consumer, Queue
from kombu.serialization import enable_insecure_serializers
# Enable msgpack for better performance
enable_insecure_serializers(['msgpack'])
with Connection('redis://localhost:6379/0') as conn:
# Producer with specific serializer
producer = Producer(
conn.channel(),
serializer='msgpack',
compression='gzip'
)
# Publish with different serializers
producer.publish(
{'data': list(range(1000))},
routing_key='large_data',
serializer='msgpack' # Override default
)
producer.publish(
{'simple': 'message'},
routing_key='simple_data',
serializer='json' # Use JSON for simple data
)
# Consumer accepting multiple formats
def process_message(body, message):
print(f"Received: {type(body)} - {len(str(body))} chars")
print(f"Content type: {message.content_type}")
message.ack()
queue = Queue('mixed_format_queue')
consumer = Consumer(
conn.channel(),
[queue],
callbacks=[process_message],
accept=['application/json', 'application/x-msgpack']
)
consumer.consume()
# Process messages...from kombu.serialization import dumps, loads, enable_insecure_serializers
import time
import sys
# Enable all serializers for testing
enable_insecure_serializers()
# Test data
test_data = {
'users': [{'id': i, 'name': f'user_{i}', 'active': i % 2 == 0} for i in range(1000)],
'metadata': {'timestamp': time.time(), 'version': '1.0'}
}
serializers = ['json', 'pickle', 'msgpack']
for serializer in serializers:
try:
# Serialize
start = time.time()
serialized, content_type, encoding = dumps(test_data, serializer=serializer)
serialize_time = time.time() - start
# Deserialize
start = time.time()
deserialized = loads(serialized, content_type, encoding)
deserialize_time = time.time() - start
print(f"{serializer.upper()}:")
print(f" Size: {len(serialized)} bytes")
print(f" Serialize: {serialize_time:.4f}s")
print(f" Deserialize: {deserialize_time:.4f}s")
print(f" Total: {serialize_time + deserialize_time:.4f}s")
print()
except ImportError:
print(f"{serializer.upper()}: Not available")
except Exception as e:
print(f"{serializer.upper()}: Error - {e}")Install with Tessl CLI
npx tessl i tessl/pypi-kombu