or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

compression.mdconnection.mdentities.mdexceptions.mdindex.mdmessaging.mdmixins.mdpools.mdserialization.mdsimple.md

compression.mddocs/

0

# Compression

1

2

Message payload compression utilities supporting multiple compression algorithms for reducing message size. Kombu provides built-in support for various compression methods including gzip, bzip2, lzma, brotli, and zstandard, with the ability to register custom compression methods.

3

4

## Core Imports

5

6

```python

7

from kombu import compression

8

from kombu.compression import compress, decompress, register, encoders

9

from kombu.compression import get_encoder, get_decoder

10

```

11

12

## Capabilities

13

14

### Compression Functions

15

16

Core functions for compressing and decompressing message payloads.

17

18

```python { .api }

19

def compress(body, content_type):

20

"""

21

Compress text using specified compression method.

22

23

Parameters:

24

- body (AnyStr): The text to compress

25

- content_type (str): MIME type of compression method to use

26

27

Returns:

28

Tuple of (compressed_body, content_type)

29

"""

30

31

def decompress(body, content_type):

32

"""

33

Decompress previously compressed text.

34

35

Parameters:

36

- body (AnyStr): Previously compressed text to uncompress

37

- content_type (str): MIME type of compression method used

38

39

Returns:

40

Decompressed data

41

"""

42

```

43

44

### Compression Registration

45

46

Functions for registering and managing compression methods.

47

48

```python { .api }

49

def register(encoder, decoder, content_type, aliases=None):

50

"""

51

Register new compression method.

52

53

Parameters:

54

- encoder (Callable): Function used to compress text

55

- decoder (Callable): Function used to decompress text

56

- content_type (str): MIME type this compression method identifies as

57

- aliases (Sequence[str]): List of names to associate with this method

58

"""

59

60

def encoders() -> list:

61

"""Return a list of available compression methods."""

62

63

def get_encoder(content_type):

64

"""

65

Get encoder function by content type or alias.

66

67

Parameters:

68

- content_type (str): Content type or alias name

69

70

Returns:

71

Tuple of (encoder_function, resolved_content_type)

72

"""

73

74

def get_decoder(content_type):

75

"""

76

Get decoder function by content type or alias.

77

78

Parameters:

79

- content_type (str): Content type or alias name

80

81

Returns:

82

Decoder function

83

"""

84

```

85

86

## Built-in Compression Methods

87

88

Kombu comes with several compression methods pre-registered:

89

90

### Gzip/Zlib Compression

91

92

```python

93

# Available as 'application/x-gzip', 'gzip', or 'zlib'

94

compressed, content_type = compression.compress(b"Hello World", 'gzip')

95

decompressed = compression.decompress(compressed, content_type)

96

```

97

98

### Bzip2 Compression

99

100

```python

101

# Available as 'application/x-bz2', 'bzip2', or 'bzip'

102

compressed, content_type = compression.compress(b"Hello World", 'bzip2')

103

decompressed = compression.decompress(compressed, content_type)

104

```

105

106

### LZMA/XZ Compression

107

108

```python

109

# Available as 'application/x-lzma', 'lzma', or 'xz'

110

compressed, content_type = compression.compress(b"Hello World", 'xz')

111

decompressed = compression.decompress(compressed, content_type)

112

```

113

114

### Brotli Compression

115

116

```python

117

# Available as 'application/x-brotli' or 'brotli'

118

# Requires: pip install brotli

119

compressed, content_type = compression.compress(b"Hello World", 'brotli')

120

decompressed = compression.decompress(compressed, content_type)

121

```

122

123

### Zstandard Compression

124

125

```python

126

# Available as 'application/zstd', 'zstd', or 'zstandard'

127

# Requires: pip install zstandard

128

compressed, content_type = compression.compress(b"Hello World", 'zstd')

129

decompressed = compression.decompress(compressed, content_type)

130

```

131

132

## Usage Examples

133

134

### Basic Compression

135

136

```python

137

from kombu.compression import compress, decompress

138

139

# Compress data

140

data = b"This is a long message that will benefit from compression"

141

compressed_data, content_type = compress(data, 'gzip')

142

143

print(f"Original size: {len(data)} bytes")

144

print(f"Compressed size: {len(compressed_data)} bytes")

145

print(f"Content type: {content_type}")

146

147

# Decompress data

148

decompressed_data = decompress(compressed_data, content_type)

149

assert data == decompressed_data

150

```

151

152

### With Message Publishing

153

154

```python

155

from kombu import Connection, Exchange, Queue

156

from kombu.compression import compress

157

158

exchange = Exchange('compressed', type='direct')

159

queue = Queue('compressed_queue', exchange, routing_key='compress')

160

161

with Connection('redis://localhost:6379/0') as conn:

162

with conn.Producer() as producer:

163

# Manual compression

164

message_data = "Large message data that should be compressed"

165

compressed_body, content_type = compress(message_data.encode(), 'gzip')

166

167

producer.publish(

168

compressed_body,

169

exchange=exchange,

170

routing_key='compress',

171

headers={'compression': content_type},

172

declare=[queue]

173

)

174

```

175

176

### Listing Available Methods

177

178

```python

179

from kombu.compression import encoders

180

181

# List all available compression methods

182

available = encoders()

183

print("Available compression methods:", available)

184

# Output: ['application/x-gzip', 'application/x-bz2', 'application/x-lzma', ...]

185

```

186

187

### Custom Compression Method

188

189

```python

190

from kombu.compression import register

191

import base64

192

193

def base64_encode(data):

194

"""Simple base64 encoding (not real compression)."""

195

return base64.b64encode(data)

196

197

def base64_decode(data):

198

"""Simple base64 decoding."""

199

return base64.b64decode(data)

200

201

# Register custom compression method

202

register(

203

encoder=base64_encode,

204

decoder=base64_decode,

205

content_type='application/base64',

206

aliases=['base64', 'b64']

207

)

208

209

# Use custom compression

210

from kombu.compression import compress, decompress

211

212

data = b"Hello World"

213

compressed, content_type = compress(data, 'base64')

214

decompressed = decompress(compressed, content_type)

215

216

assert data == decompressed

217

```

218

219

## Error Handling

220

221

Compression operations can raise various exceptions:

222

223

```python

224

from kombu.compression import compress, decompress

225

226

try:

227

# Try to use non-existent compression method

228

compress(b"data", 'nonexistent')

229

except KeyError as e:

230

print(f"Compression method not found: {e}")

231

232

try:

233

# Try to decompress with wrong method

234

decompress(b"invalid data", 'gzip')

235

except Exception as e:

236

print(f"Decompression failed: {e}")

237

```

238

239

## Integration with Serialization

240

241

Compression works seamlessly with Kombu's serialization system:

242

243

```python

244

from kombu import Connection, Exchange, Queue

245

246

exchange = Exchange('test', type='direct')

247

queue = Queue('test_queue', exchange, routing_key='test')

248

249

with Connection('redis://localhost:6379/0') as conn:

250

with conn.Producer() as producer:

251

# Kombu can automatically handle compression when configured

252

producer.publish(

253

{'large': 'data' * 1000},

254

exchange=exchange,

255

routing_key='test',

256

serializer='json',

257

compression='gzip', # Automatic compression

258

declare=[queue]

259

)

260

```