CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kombu

Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.

Pending
Overview
Eval results
Files

compression.mddocs/

Compression

Message payload compression utilities supporting multiple compression algorithms for reducing message size. Kombu provides built-in support for various compression methods including gzip, bzip2, lzma, brotli, and zstandard, with the ability to register custom compression methods.

Core Imports

from kombu import compression
from kombu.compression import compress, decompress, register, encoders
from kombu.compression import get_encoder, get_decoder

Capabilities

Compression Functions

Core functions for compressing and decompressing message payloads.

def compress(body, content_type):
    """
    Compress text using specified compression method.
    
    Parameters:
    - body (AnyStr): The text to compress
    - content_type (str): MIME type of compression method to use
    
    Returns:
    Tuple of (compressed_body, content_type)
    """

def decompress(body, content_type):
    """
    Decompress previously compressed text.
    
    Parameters:
    - body (AnyStr): Previously compressed text to uncompress
    - content_type (str): MIME type of compression method used
    
    Returns:
    Decompressed data
    """

Compression Registration

Functions for registering and managing compression methods.

def register(encoder, decoder, content_type, aliases=None):
    """
    Register new compression method.
    
    Parameters:
    - encoder (Callable): Function used to compress text
    - decoder (Callable): Function used to decompress text
    - content_type (str): MIME type this compression method identifies as
    - aliases (Sequence[str]): List of names to associate with this method
    """

def encoders() -> list:
    """Return a list of available compression methods."""

def get_encoder(content_type):
    """
    Get encoder function by content type or alias.
    
    Parameters:
    - content_type (str): Content type or alias name
    
    Returns:
    Tuple of (encoder_function, resolved_content_type)
    """

def get_decoder(content_type):
    """
    Get decoder function by content type or alias.
    
    Parameters:
    - content_type (str): Content type or alias name
    
    Returns:
    Decoder function
    """

Built-in Compression Methods

Kombu comes with several compression methods pre-registered:

Gzip/Zlib Compression

# Available as 'application/x-gzip', 'gzip', or 'zlib'
compressed, content_type = compression.compress(b"Hello World", 'gzip')
decompressed = compression.decompress(compressed, content_type)

Bzip2 Compression

# Available as 'application/x-bz2', 'bzip2', or 'bzip'
compressed, content_type = compression.compress(b"Hello World", 'bzip2')
decompressed = compression.decompress(compressed, content_type)

LZMA/XZ Compression

# Available as 'application/x-lzma', 'lzma', or 'xz'
compressed, content_type = compression.compress(b"Hello World", 'xz')
decompressed = compression.decompress(compressed, content_type)

Brotli Compression

# Available as 'application/x-brotli' or 'brotli'
# Requires: pip install brotli
compressed, content_type = compression.compress(b"Hello World", 'brotli')
decompressed = compression.decompress(compressed, content_type)

Zstandard Compression

# Available as 'application/zstd', 'zstd', or 'zstandard'  
# Requires: pip install zstandard
compressed, content_type = compression.compress(b"Hello World", 'zstd')
decompressed = compression.decompress(compressed, content_type)

Usage Examples

Basic Compression

from kombu.compression import compress, decompress

# Compress data
data = b"This is a long message that will benefit from compression"
compressed_data, content_type = compress(data, 'gzip')

print(f"Original size: {len(data)} bytes")
print(f"Compressed size: {len(compressed_data)} bytes")
print(f"Content type: {content_type}")

# Decompress data
decompressed_data = decompress(compressed_data, content_type)
assert data == decompressed_data

With Message Publishing

from kombu import Connection, Exchange, Queue
from kombu.compression import compress

exchange = Exchange('compressed', type='direct')
queue = Queue('compressed_queue', exchange, routing_key='compress')

with Connection('redis://localhost:6379/0') as conn:
    with conn.Producer() as producer:
        # Manual compression
        message_data = "Large message data that should be compressed"
        compressed_body, content_type = compress(message_data.encode(), 'gzip')
        
        producer.publish(
            compressed_body,
            exchange=exchange,
            routing_key='compress',
            headers={'compression': content_type},
            declare=[queue]
        )

Listing Available Methods

from kombu.compression import encoders

# List all available compression methods
available = encoders()
print("Available compression methods:", available)
# Output: ['application/x-gzip', 'application/x-bz2', 'application/x-lzma', ...]

Custom Compression Method

from kombu.compression import register
import base64

def base64_encode(data):
    """Simple base64 encoding (not real compression)."""
    return base64.b64encode(data)

def base64_decode(data):
    """Simple base64 decoding."""
    return base64.b64decode(data)

# Register custom compression method
register(
    encoder=base64_encode,
    decoder=base64_decode,
    content_type='application/base64',
    aliases=['base64', 'b64']
)

# Use custom compression
from kombu.compression import compress, decompress

data = b"Hello World"
compressed, content_type = compress(data, 'base64')
decompressed = decompress(compressed, content_type)

assert data == decompressed

Error Handling

Compression operations can raise various exceptions:

from kombu.compression import compress, decompress

try:
    # Try to use non-existent compression method
    compress(b"data", 'nonexistent')
except KeyError as e:
    print(f"Compression method not found: {e}")

try:
    # Try to decompress with wrong method
    decompress(b"invalid data", 'gzip')
except Exception as e:
    print(f"Decompression failed: {e}")

Integration with Serialization

Compression works seamlessly with Kombu's serialization system:

from kombu import Connection, Exchange, Queue

exchange = Exchange('test', type='direct')
queue = Queue('test_queue', exchange, routing_key='test')

with Connection('redis://localhost:6379/0') as conn:
    with conn.Producer() as producer:
        # Kombu can automatically handle compression when configured
        producer.publish(
            {'large': 'data' * 1000},
            exchange=exchange,
            routing_key='test',
            serializer='json',
            compression='gzip',  # Automatic compression
            declare=[queue]
        )

Install with Tessl CLI

npx tessl i tessl/pypi-kombu

docs

compression.md

connection.md

entities.md

exceptions.md

index.md

messaging.md

mixins.md

pools.md

serialization.md

simple.md

tile.json