CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kombu

Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.

Pending
Overview
Eval results
Files

serialization.mddocs/

Serialization

Pluggable serialization system with security controls for encoding and decoding message payloads across different formats. Kombu's serialization registry provides a flexible way to handle different data formats while maintaining security.

Capabilities

Core Serialization Functions

Primary functions for encoding and decoding message data with automatic serializer selection and content type handling.

def dumps(data, serializer=None):
    """
    Encode data using specified or default serializer.

    Parameters:
    - data: Data to serialize
    - serializer (str): Serializer name ('json', 'pickle', 'yaml', 'msgpack')

    Returns:
    tuple: (serialized_data, content_type, content_encoding)
    """

def loads(data, content_type, content_encoding='utf-8', accept=None, force=False, on_unknown_serializer=None, **kwargs):
    """
    Decode data using content type information.

    Parameters:
    - data (bytes): Serialized data to decode
    - content_type (str): Content type of the data
    - content_encoding (str): Content encoding (default 'utf-8')
    - accept (list): List of accepted content types
    - force (bool): Force decoding even if content type not accepted
    - on_unknown_serializer (callable): Handler for unknown serializer

    Returns:
    Deserialized data

    Raises:
    ContentDisallowed: If content type not in accept list
    SerializationError: If deserialization fails
    """

Serializer Registry Management

Functions for managing the global serializer registry, including registration of custom serializers and security controls.

def register(name, encoder, decoder, content_type, content_encoding='utf-8'):
    """
    Register new serializer in the global registry.

    Parameters:
    - name (str): Serializer name
    - encoder (callable): Function to encode data
    - decoder (callable): Function to decode data
    - content_type (str): MIME content type
    - content_encoding (str): Content encoding

    Example:
    def my_encoder(obj):
        return json.dumps(obj).encode('utf-8')
    
    def my_decoder(data):
        return json.loads(data.decode('utf-8'))
    
    register('myjson', my_encoder, my_decoder, 'application/x-myjson')
    """

def unregister(name):
    """
    Unregister serializer from the global registry.

    Parameters:
    - name (str): Serializer name to remove

    Returns:
    bool: True if serializer was removed
    """

def prepare_accept_content(content_types, name_to_type=None):
    """
    Replace serializer name aliases with full content type names.

    Parameters:
    - content_types (list): List of content types or serializer names
    - name_to_type (dict): Mapping of names to content types

    Returns:
    set: Set of full content type names
    """

Security Controls

Functions for controlling which serializers are enabled to prevent security vulnerabilities from unsafe deserialization.

def enable_insecure_serializers(choices=None):
    """
    Enable serializers considered unsafe (pickle, yaml, msgpack).

    Parameters:
    - choices (list): Specific serializers to enable (default: all insecure)

    Warning:
    Only enable insecure serializers in trusted environments.
    Pickle and yaml can execute arbitrary code during deserialization.
    """

def disable_insecure_serializers(allowed=None):
    """
    Disable untrusted serializers, keeping only safe ones.

    Parameters:
    - allowed (list): Serializers to keep enabled (default: ['json'])

    This is the default security stance - only JSON is enabled by default.
    """

Registry Access

Access to the global serialization registry and related constants.

# Global serializer registry
registry: SerializerRegistry  # Global serializer registry instance

# Security constants
TRUSTED_CONTENT: set  # Set of trusted content types
SKIP_DECODE: set      # Content encodings to skip decoding

Usage Examples

Basic Serialization

from kombu.serialization import dumps, loads

# Serialize data with JSON (default)
data = {'message': 'hello', 'numbers': [1, 2, 3]}
serialized, content_type, encoding = dumps(data)

print(f"Content type: {content_type}")  # application/json
print(f"Encoding: {encoding}")          # utf-8
print(f"Serialized: {serialized}")      # b'{"message": "hello", "numbers": [1, 2, 3]}'

# Deserialize data
deserialized = loads(serialized, content_type, encoding)
print(f"Deserialized: {deserialized}")  # {'message': 'hello', 'numbers': [1, 2, 3]}

Using Different Serializers

from kombu.serialization import dumps, loads, enable_insecure_serializers
import pickle

# Enable pickle serializer (insecure!)
enable_insecure_serializers(['pickle'])

# Serialize with pickle
data = {'complex_object': object(), 'lambda': lambda x: x * 2}
serialized, content_type, encoding = dumps(data, serializer='pickle')

print(f"Content type: {content_type}")  # application/x-python-serialize

# Deserialize pickle data
deserialized = loads(serialized, content_type, encoding)
print(f"Deserialized: {deserialized}")

# Serialize with msgpack (if available)
try:
    enable_insecure_serializers(['msgpack'])
    serialized, content_type, encoding = dumps(data, serializer='msgpack')
    deserialized = loads(serialized, content_type, encoding)
except ImportError:
    print("msgpack not available")

Content Type Filtering

from kombu.serialization import loads
from kombu.exceptions import ContentDisallowed

# Sample serialized data
json_data = b'{"message": "hello"}'
pickle_data = b'...'  # pickle data

# Only accept JSON
accept_list = ['application/json']

try:
    # This will work
    result = loads(json_data, 'application/json', accept=accept_list)
    print(f"JSON data: {result}")
    
    # This will raise ContentDisallowed
    result = loads(pickle_data, 'application/x-python-serialize', accept=accept_list)
except ContentDisallowed as e:
    print(f"Content not allowed: {e}")

Custom Serializer Registration

from kombu.serialization import register, unregister, dumps, loads
import base64
import json

def base64_json_encoder(obj):
    """Encode as JSON then base64"""
    json_data = json.dumps(obj).encode('utf-8')
    return base64.b64encode(json_data)

def base64_json_decoder(data):
    """Decode base64 then JSON"""
    json_data = base64.b64decode(data)
    return json.loads(json_data.decode('utf-8'))

# Register custom serializer
register(
    name='base64json',
    encoder=base64_json_encoder,
    decoder=base64_json_decoder,
    content_type='application/x-base64-json'
)

# Use custom serializer
data = {'encoded': 'data', 'numbers': [1, 2, 3]}
serialized, content_type, encoding = dumps(data, serializer='base64json')

print(f"Content type: {content_type}")  # application/x-base64-json
print(f"Serialized (base64): {serialized}")

# Deserialize
deserialized = loads(serialized, content_type, encoding)
print(f"Deserialized: {deserialized}")

# Clean up
unregister('base64json')

Security Configuration

from kombu.serialization import enable_insecure_serializers, disable_insecure_serializers

# Default: only JSON is enabled (secure)
print("Default secure configuration")

# Enable specific insecure serializers
enable_insecure_serializers(['pickle'])
print("Enabled pickle serializer")

# Enable all insecure serializers (dangerous!)
enable_insecure_serializers()
print("Enabled all insecure serializers")

# Disable all insecure serializers, keep only JSON
disable_insecure_serializers()
print("Back to secure configuration")

# Allow JSON and msgpack only
disable_insecure_serializers(['json', 'msgpack'])
print("JSON and msgpack only")

Error Handling

from kombu.serialization import loads
from kombu.exceptions import SerializationError, ContentDisallowed

def safe_deserialize(data, content_type, encoding='utf-8'):
    """Safely deserialize data with error handling"""
    try:
        return loads(
            data, 
            content_type, 
            encoding, 
            accept=['application/json'],  # Only accept JSON
            force=False
        )
    except ContentDisallowed as e:
        print(f"Content type not allowed: {e}")
        return None
    except SerializationError as e:
        print(f"Failed to deserialize: {e}")
        return None
    except Exception as e:
        print(f"Unexpected error: {e}")
        return None

# Test with good data
good_data = b'{"status": "ok"}'
result = safe_deserialize(good_data, 'application/json')
print(f"Good data result: {result}")

# Test with disallowed content type
pickle_data = b'some pickle data'
result = safe_deserialize(pickle_data, 'application/x-python-serialize')
print(f"Pickle data result: {result}")  # None

# Test with malformed data
bad_data = b'{"invalid": json}'
result = safe_deserialize(bad_data, 'application/json')
print(f"Bad data result: {result}")  # None

Registry Inspection

from kombu.serialization import registry

# List available serializers
print("Available serializers:")
for name in registry._encoders.keys():
    encoder_info = registry._encoders[name]
    print(f"  {name}: {encoder_info.content_type}")

# Check if a serializer is available
if 'json' in registry._encoders:
    print("JSON serializer is available")

if 'pickle' in registry._encoders:
    print("Pickle serializer is available")
else:
    print("Pickle serializer not available (good for security)")

Producer/Consumer Serialization Integration

from kombu import Connection, Producer, Consumer, Queue
from kombu.serialization import enable_insecure_serializers

# Enable msgpack for better performance
enable_insecure_serializers(['msgpack'])

with Connection('redis://localhost:6379/0') as conn:
    # Producer with specific serializer
    producer = Producer(
        conn.channel(),
        serializer='msgpack',
        compression='gzip'
    )
    
    # Publish with different serializers
    producer.publish(
        {'data': list(range(1000))},
        routing_key='large_data',
        serializer='msgpack'  # Override default
    )
    
    producer.publish(
        {'simple': 'message'},
        routing_key='simple_data',
        serializer='json'  # Use JSON for simple data
    )
    
    # Consumer accepting multiple formats
    def process_message(body, message):
        print(f"Received: {type(body)} - {len(str(body))} chars")
        print(f"Content type: {message.content_type}")
        message.ack()
    
    queue = Queue('mixed_format_queue')
    consumer = Consumer(
        conn.channel(),
        [queue],
        callbacks=[process_message],
        accept=['application/json', 'application/x-msgpack']
    )
    
    consumer.consume()
    # Process messages...

Performance Comparison

from kombu.serialization import dumps, loads, enable_insecure_serializers
import time
import sys

# Enable all serializers for testing
enable_insecure_serializers()

# Test data
test_data = {
    'users': [{'id': i, 'name': f'user_{i}', 'active': i % 2 == 0} for i in range(1000)],
    'metadata': {'timestamp': time.time(), 'version': '1.0'}
}

serializers = ['json', 'pickle', 'msgpack']

for serializer in serializers:
    try:
        # Serialize
        start = time.time()
        serialized, content_type, encoding = dumps(test_data, serializer=serializer)
        serialize_time = time.time() - start
        
        # Deserialize
        start = time.time()
        deserialized = loads(serialized, content_type, encoding)
        deserialize_time = time.time() - start
        
        print(f"{serializer.upper()}:")
        print(f"  Size: {len(serialized)} bytes")
        print(f"  Serialize: {serialize_time:.4f}s")
        print(f"  Deserialize: {deserialize_time:.4f}s")
        print(f"  Total: {serialize_time + deserialize_time:.4f}s")
        print()
        
    except ImportError:
        print(f"{serializer.upper()}: Not available")
    except Exception as e:
        print(f"{serializer.upper()}: Error - {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-kombu

docs

compression.md

connection.md

entities.md

exceptions.md

index.md

messaging.md

mixins.md

pools.md

serialization.md

simple.md

tile.json