Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.
—
Message payload compression utilities supporting multiple compression algorithms for reducing message size. Kombu provides built-in support for various compression methods including gzip, bzip2, lzma, brotli, and zstandard, with the ability to register custom compression methods.
from kombu import compression
from kombu.compression import compress, decompress, register, encoders
from kombu.compression import get_encoder, get_decoderCore functions for compressing and decompressing message payloads.
def compress(body, content_type):
"""
Compress text using specified compression method.
Parameters:
- body (AnyStr): The text to compress
- content_type (str): MIME type of compression method to use
Returns:
Tuple of (compressed_body, content_type)
"""
def decompress(body, content_type):
"""
Decompress previously compressed text.
Parameters:
- body (AnyStr): Previously compressed text to uncompress
- content_type (str): MIME type of compression method used
Returns:
Decompressed data
"""Functions for registering and managing compression methods.
def register(encoder, decoder, content_type, aliases=None):
"""
Register new compression method.
Parameters:
- encoder (Callable): Function used to compress text
- decoder (Callable): Function used to decompress text
- content_type (str): MIME type this compression method identifies as
- aliases (Sequence[str]): List of names to associate with this method
"""
def encoders() -> list:
"""Return a list of available compression methods."""
def get_encoder(content_type):
"""
Get encoder function by content type or alias.
Parameters:
- content_type (str): Content type or alias name
Returns:
Tuple of (encoder_function, resolved_content_type)
"""
def get_decoder(content_type):
"""
Get decoder function by content type or alias.
Parameters:
- content_type (str): Content type or alias name
Returns:
Decoder function
"""Kombu comes with several compression methods pre-registered:
# Available as 'application/x-gzip', 'gzip', or 'zlib'
compressed, content_type = compression.compress(b"Hello World", 'gzip')
decompressed = compression.decompress(compressed, content_type)# Available as 'application/x-bz2', 'bzip2', or 'bzip'
compressed, content_type = compression.compress(b"Hello World", 'bzip2')
decompressed = compression.decompress(compressed, content_type)# Available as 'application/x-lzma', 'lzma', or 'xz'
compressed, content_type = compression.compress(b"Hello World", 'xz')
decompressed = compression.decompress(compressed, content_type)# Available as 'application/x-brotli' or 'brotli'
# Requires: pip install brotli
compressed, content_type = compression.compress(b"Hello World", 'brotli')
decompressed = compression.decompress(compressed, content_type)# Available as 'application/zstd', 'zstd', or 'zstandard'
# Requires: pip install zstandard
compressed, content_type = compression.compress(b"Hello World", 'zstd')
decompressed = compression.decompress(compressed, content_type)from kombu.compression import compress, decompress
# Compress data
data = b"This is a long message that will benefit from compression"
compressed_data, content_type = compress(data, 'gzip')
print(f"Original size: {len(data)} bytes")
print(f"Compressed size: {len(compressed_data)} bytes")
print(f"Content type: {content_type}")
# Decompress data
decompressed_data = decompress(compressed_data, content_type)
assert data == decompressed_datafrom kombu import Connection, Exchange, Queue
from kombu.compression import compress
exchange = Exchange('compressed', type='direct')
queue = Queue('compressed_queue', exchange, routing_key='compress')
with Connection('redis://localhost:6379/0') as conn:
with conn.Producer() as producer:
# Manual compression
message_data = "Large message data that should be compressed"
compressed_body, content_type = compress(message_data.encode(), 'gzip')
producer.publish(
compressed_body,
exchange=exchange,
routing_key='compress',
headers={'compression': content_type},
declare=[queue]
)from kombu.compression import encoders
# List all available compression methods
available = encoders()
print("Available compression methods:", available)
# Output: ['application/x-gzip', 'application/x-bz2', 'application/x-lzma', ...]from kombu.compression import register
import base64
def base64_encode(data):
"""Simple base64 encoding (not real compression)."""
return base64.b64encode(data)
def base64_decode(data):
"""Simple base64 decoding."""
return base64.b64decode(data)
# Register custom compression method
register(
encoder=base64_encode,
decoder=base64_decode,
content_type='application/base64',
aliases=['base64', 'b64']
)
# Use custom compression
from kombu.compression import compress, decompress
data = b"Hello World"
compressed, content_type = compress(data, 'base64')
decompressed = decompress(compressed, content_type)
assert data == decompressedCompression operations can raise various exceptions:
from kombu.compression import compress, decompress
try:
# Try to use non-existent compression method
compress(b"data", 'nonexistent')
except KeyError as e:
print(f"Compression method not found: {e}")
try:
# Try to decompress with wrong method
decompress(b"invalid data", 'gzip')
except Exception as e:
print(f"Decompression failed: {e}")Compression works seamlessly with Kombu's serialization system:
from kombu import Connection, Exchange, Queue
exchange = Exchange('test', type='direct')
queue = Queue('test_queue', exchange, routing_key='test')
with Connection('redis://localhost:6379/0') as conn:
with conn.Producer() as producer:
# Kombu can automatically handle compression when configured
producer.publish(
{'large': 'data' * 1000},
exchange=exchange,
routing_key='test',
serializer='json',
compression='gzip', # Automatic compression
declare=[queue]
)Install with Tessl CLI
npx tessl i tessl/pypi-kombu