0
# Compression
1
2
Message payload compression utilities supporting multiple compression algorithms for reducing message size. Kombu provides built-in support for various compression methods including gzip, bzip2, lzma, brotli, and zstandard, with the ability to register custom compression methods.
3
4
## Core Imports
5
6
```python
7
from kombu import compression
8
from kombu.compression import compress, decompress, register, encoders
9
from kombu.compression import get_encoder, get_decoder
10
```
11
12
## Capabilities
13
14
### Compression Functions
15
16
Core functions for compressing and decompressing message payloads.
17
18
```python { .api }
19
def compress(body, content_type):
20
"""
21
Compress text using specified compression method.
22
23
Parameters:
24
- body (AnyStr): The text to compress
25
- content_type (str): MIME type of compression method to use
26
27
Returns:
28
Tuple of (compressed_body, content_type)
29
"""
30
31
def decompress(body, content_type):
32
"""
33
Decompress previously compressed text.
34
35
Parameters:
36
- body (AnyStr): Previously compressed text to uncompress
37
- content_type (str): MIME type of compression method used
38
39
Returns:
40
Decompressed data
41
"""
42
```
43
44
### Compression Registration
45
46
Functions for registering and managing compression methods.
47
48
```python { .api }
49
def register(encoder, decoder, content_type, aliases=None):
50
"""
51
Register new compression method.
52
53
Parameters:
54
- encoder (Callable): Function used to compress text
55
- decoder (Callable): Function used to decompress text
56
- content_type (str): MIME type this compression method identifies as
57
- aliases (Sequence[str]): List of names to associate with this method
58
"""
59
60
def encoders() -> list:
61
"""Return a list of available compression methods."""
62
63
def get_encoder(content_type):
64
"""
65
Get encoder function by content type or alias.
66
67
Parameters:
68
- content_type (str): Content type or alias name
69
70
Returns:
71
Tuple of (encoder_function, resolved_content_type)
72
"""
73
74
def get_decoder(content_type):
75
"""
76
Get decoder function by content type or alias.
77
78
Parameters:
79
- content_type (str): Content type or alias name
80
81
Returns:
82
Decoder function
83
"""
84
```
85
86
## Built-in Compression Methods
87
88
Kombu comes with several compression methods pre-registered:
89
90
### Gzip/Zlib Compression
91
92
```python
93
# Available as 'application/x-gzip', 'gzip', or 'zlib'
94
compressed, content_type = compression.compress(b"Hello World", 'gzip')
95
decompressed = compression.decompress(compressed, content_type)
96
```
97
98
### Bzip2 Compression
99
100
```python
101
# Available as 'application/x-bz2', 'bzip2', or 'bzip'
102
compressed, content_type = compression.compress(b"Hello World", 'bzip2')
103
decompressed = compression.decompress(compressed, content_type)
104
```
105
106
### LZMA/XZ Compression
107
108
```python
109
# Available as 'application/x-lzma', 'lzma', or 'xz'
110
compressed, content_type = compression.compress(b"Hello World", 'xz')
111
decompressed = compression.decompress(compressed, content_type)
112
```
113
114
### Brotli Compression
115
116
```python
117
# Available as 'application/x-brotli' or 'brotli'
118
# Requires: pip install brotli
119
compressed, content_type = compression.compress(b"Hello World", 'brotli')
120
decompressed = compression.decompress(compressed, content_type)
121
```
122
123
### Zstandard Compression
124
125
```python
126
# Available as 'application/zstd', 'zstd', or 'zstandard'
127
# Requires: pip install zstandard
128
compressed, content_type = compression.compress(b"Hello World", 'zstd')
129
decompressed = compression.decompress(compressed, content_type)
130
```
131
132
## Usage Examples
133
134
### Basic Compression
135
136
```python
137
from kombu.compression import compress, decompress
138
139
# Compress data
140
data = b"This is a long message that will benefit from compression"
141
compressed_data, content_type = compress(data, 'gzip')
142
143
print(f"Original size: {len(data)} bytes")
144
print(f"Compressed size: {len(compressed_data)} bytes")
145
print(f"Content type: {content_type}")
146
147
# Decompress data
148
decompressed_data = decompress(compressed_data, content_type)
149
assert data == decompressed_data
150
```
151
152
### With Message Publishing
153
154
```python
155
from kombu import Connection, Exchange, Queue
156
from kombu.compression import compress
157
158
exchange = Exchange('compressed', type='direct')
159
queue = Queue('compressed_queue', exchange, routing_key='compress')
160
161
with Connection('redis://localhost:6379/0') as conn:
162
with conn.Producer() as producer:
163
# Manual compression
164
message_data = "Large message data that should be compressed"
165
compressed_body, content_type = compress(message_data.encode(), 'gzip')
166
167
producer.publish(
168
compressed_body,
169
exchange=exchange,
170
routing_key='compress',
171
headers={'compression': content_type},
172
declare=[queue]
173
)
174
```
175
176
### Listing Available Methods
177
178
```python
179
from kombu.compression import encoders
180
181
# List all available compression methods
182
available = encoders()
183
print("Available compression methods:", available)
184
# Output: ['application/x-gzip', 'application/x-bz2', 'application/x-lzma', ...]
185
```
186
187
### Custom Compression Method
188
189
```python
190
from kombu.compression import register
191
import base64
192
193
def base64_encode(data):
194
"""Simple base64 encoding (not real compression)."""
195
return base64.b64encode(data)
196
197
def base64_decode(data):
198
"""Simple base64 decoding."""
199
return base64.b64decode(data)
200
201
# Register custom compression method
202
register(
203
encoder=base64_encode,
204
decoder=base64_decode,
205
content_type='application/base64',
206
aliases=['base64', 'b64']
207
)
208
209
# Use custom compression
210
from kombu.compression import compress, decompress
211
212
data = b"Hello World"
213
compressed, content_type = compress(data, 'base64')
214
decompressed = decompress(compressed, content_type)
215
216
assert data == decompressed
217
```
218
219
## Error Handling
220
221
Compression operations can raise various exceptions:
222
223
```python
224
from kombu.compression import compress, decompress
225
226
try:
227
# Try to use non-existent compression method
228
compress(b"data", 'nonexistent')
229
except KeyError as e:
230
print(f"Compression method not found: {e}")
231
232
try:
233
# Try to decompress with wrong method
234
decompress(b"invalid data", 'gzip')
235
except Exception as e:
236
print(f"Decompression failed: {e}")
237
```
238
239
## Integration with Serialization
240
241
Compression works seamlessly with Kombu's serialization system:
242
243
```python
244
from kombu import Connection, Exchange, Queue
245
246
exchange = Exchange('test', type='direct')
247
queue = Queue('test_queue', exchange, routing_key='test')
248
249
with Connection('redis://localhost:6379/0') as conn:
250
with conn.Producer() as producer:
251
# Kombu can automatically handle compression when configured
252
producer.publish(
253
{'large': 'data' * 1000},
254
exchange=exchange,
255
routing_key='test',
256
serializer='json',
257
compression='gzip', # Automatic compression
258
declare=[queue]
259
)
260
```