or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

compression.mdconnection.mdentities.mdexceptions.mdindex.mdmessaging.mdmixins.mdpools.mdserialization.mdsimple.md

serialization.mddocs/

0

# Serialization

1

2

Pluggable serialization system with security controls for encoding and decoding message payloads across different formats. Kombu's serialization registry provides a flexible way to handle different data formats while maintaining security.

3

4

## Capabilities

5

6

### Core Serialization Functions

7

8

Primary functions for encoding and decoding message data with automatic serializer selection and content type handling.

9

10

```python { .api }

11

def dumps(data, serializer=None):

12

"""

13

Encode data using specified or default serializer.

14

15

Parameters:

16

- data: Data to serialize

17

- serializer (str): Serializer name ('json', 'pickle', 'yaml', 'msgpack')

18

19

Returns:

20

tuple: (serialized_data, content_type, content_encoding)

21

"""

22

23

def loads(data, content_type, content_encoding='utf-8', accept=None, force=False, on_unknown_serializer=None, **kwargs):

24

"""

25

Decode data using content type information.

26

27

Parameters:

28

- data (bytes): Serialized data to decode

29

- content_type (str): Content type of the data

30

- content_encoding (str): Content encoding (default 'utf-8')

31

- accept (list): List of accepted content types

32

- force (bool): Force decoding even if content type not accepted

33

- on_unknown_serializer (callable): Handler for unknown serializer

34

35

Returns:

36

Deserialized data

37

38

Raises:

39

ContentDisallowed: If content type not in accept list

40

SerializationError: If deserialization fails

41

"""

42

```

43

44

### Serializer Registry Management

45

46

Functions for managing the global serializer registry, including registration of custom serializers and security controls.

47

48

```python { .api }

49

def register(name, encoder, decoder, content_type, content_encoding='utf-8'):

50

"""

51

Register new serializer in the global registry.

52

53

Parameters:

54

- name (str): Serializer name

55

- encoder (callable): Function to encode data

56

- decoder (callable): Function to decode data

57

- content_type (str): MIME content type

58

- content_encoding (str): Content encoding

59

60

Example:

61

def my_encoder(obj):

62

return json.dumps(obj).encode('utf-8')

63

64

def my_decoder(data):

65

return json.loads(data.decode('utf-8'))

66

67

register('myjson', my_encoder, my_decoder, 'application/x-myjson')

68

"""

69

70

def unregister(name):

71

"""

72

Unregister serializer from the global registry.

73

74

Parameters:

75

- name (str): Serializer name to remove

76

77

Returns:

78

bool: True if serializer was removed

79

"""

80

81

def prepare_accept_content(content_types, name_to_type=None):

82

"""

83

Replace serializer name aliases with full content type names.

84

85

Parameters:

86

- content_types (list): List of content types or serializer names

87

- name_to_type (dict): Mapping of names to content types

88

89

Returns:

90

set: Set of full content type names

91

"""

92

```

93

94

### Security Controls

95

96

Functions for controlling which serializers are enabled to prevent security vulnerabilities from unsafe deserialization.

97

98

```python { .api }

99

def enable_insecure_serializers(choices=None):

100

"""

101

Enable serializers considered unsafe (pickle, yaml, msgpack).

102

103

Parameters:

104

- choices (list): Specific serializers to enable (default: all insecure)

105

106

Warning:

107

Only enable insecure serializers in trusted environments.

108

Pickle and yaml can execute arbitrary code during deserialization.

109

"""

110

111

def disable_insecure_serializers(allowed=None):

112

"""

113

Disable untrusted serializers, keeping only safe ones.

114

115

Parameters:

116

- allowed (list): Serializers to keep enabled (default: ['json'])

117

118

This is the default security stance - only JSON is enabled by default.

119

"""

120

```

121

122

### Registry Access

123

124

Access to the global serialization registry and related constants.

125

126

```python { .api }

127

# Global serializer registry

128

registry: SerializerRegistry # Global serializer registry instance

129

130

# Security constants

131

TRUSTED_CONTENT: set # Set of trusted content types

132

SKIP_DECODE: set # Content encodings to skip decoding

133

```

134

135

## Usage Examples

136

137

### Basic Serialization

138

139

```python

140

from kombu.serialization import dumps, loads

141

142

# Serialize data with JSON (default)

143

data = {'message': 'hello', 'numbers': [1, 2, 3]}

144

serialized, content_type, encoding = dumps(data)

145

146

print(f"Content type: {content_type}") # application/json

147

print(f"Encoding: {encoding}") # utf-8

148

print(f"Serialized: {serialized}") # b'{"message": "hello", "numbers": [1, 2, 3]}'

149

150

# Deserialize data

151

deserialized = loads(serialized, content_type, encoding)

152

print(f"Deserialized: {deserialized}") # {'message': 'hello', 'numbers': [1, 2, 3]}

153

```

154

155

### Using Different Serializers

156

157

```python

158

from kombu.serialization import dumps, loads, enable_insecure_serializers

159

import pickle

160

161

# Enable pickle serializer (insecure!)

162

enable_insecure_serializers(['pickle'])

163

164

# Serialize with pickle

165

data = {'complex_object': object(), 'lambda': lambda x: x * 2}

166

serialized, content_type, encoding = dumps(data, serializer='pickle')

167

168

print(f"Content type: {content_type}") # application/x-python-serialize

169

170

# Deserialize pickle data

171

deserialized = loads(serialized, content_type, encoding)

172

print(f"Deserialized: {deserialized}")

173

174

# Serialize with msgpack (if available)

175

try:

176

enable_insecure_serializers(['msgpack'])

177

serialized, content_type, encoding = dumps(data, serializer='msgpack')

178

deserialized = loads(serialized, content_type, encoding)

179

except ImportError:

180

print("msgpack not available")

181

```

182

183

### Content Type Filtering

184

185

```python

186

from kombu.serialization import loads

187

from kombu.exceptions import ContentDisallowed

188

189

# Sample serialized data

190

json_data = b'{"message": "hello"}'

191

pickle_data = b'...' # pickle data

192

193

# Only accept JSON

194

accept_list = ['application/json']

195

196

try:

197

# This will work

198

result = loads(json_data, 'application/json', accept=accept_list)

199

print(f"JSON data: {result}")

200

201

# This will raise ContentDisallowed

202

result = loads(pickle_data, 'application/x-python-serialize', accept=accept_list)

203

except ContentDisallowed as e:

204

print(f"Content not allowed: {e}")

205

```

206

207

### Custom Serializer Registration

208

209

```python

210

from kombu.serialization import register, unregister, dumps, loads

211

import base64

212

import json

213

214

def base64_json_encoder(obj):

215

"""Encode as JSON then base64"""

216

json_data = json.dumps(obj).encode('utf-8')

217

return base64.b64encode(json_data)

218

219

def base64_json_decoder(data):

220

"""Decode base64 then JSON"""

221

json_data = base64.b64decode(data)

222

return json.loads(json_data.decode('utf-8'))

223

224

# Register custom serializer

225

register(

226

name='base64json',

227

encoder=base64_json_encoder,

228

decoder=base64_json_decoder,

229

content_type='application/x-base64-json'

230

)

231

232

# Use custom serializer

233

data = {'encoded': 'data', 'numbers': [1, 2, 3]}

234

serialized, content_type, encoding = dumps(data, serializer='base64json')

235

236

print(f"Content type: {content_type}") # application/x-base64-json

237

print(f"Serialized (base64): {serialized}")

238

239

# Deserialize

240

deserialized = loads(serialized, content_type, encoding)

241

print(f"Deserialized: {deserialized}")

242

243

# Clean up

244

unregister('base64json')

245

```

246

247

### Security Configuration

248

249

```python

250

from kombu.serialization import enable_insecure_serializers, disable_insecure_serializers

251

252

# Default: only JSON is enabled (secure)

253

print("Default secure configuration")

254

255

# Enable specific insecure serializers

256

enable_insecure_serializers(['pickle'])

257

print("Enabled pickle serializer")

258

259

# Enable all insecure serializers (dangerous!)

260

enable_insecure_serializers()

261

print("Enabled all insecure serializers")

262

263

# Disable all insecure serializers, keep only JSON

264

disable_insecure_serializers()

265

print("Back to secure configuration")

266

267

# Allow JSON and msgpack only

268

disable_insecure_serializers(['json', 'msgpack'])

269

print("JSON and msgpack only")

270

```

271

272

### Error Handling

273

274

```python

275

from kombu.serialization import loads

276

from kombu.exceptions import SerializationError, ContentDisallowed

277

278

def safe_deserialize(data, content_type, encoding='utf-8'):

279

"""Safely deserialize data with error handling"""

280

try:

281

return loads(

282

data,

283

content_type,

284

encoding,

285

accept=['application/json'], # Only accept JSON

286

force=False

287

)

288

except ContentDisallowed as e:

289

print(f"Content type not allowed: {e}")

290

return None

291

except SerializationError as e:

292

print(f"Failed to deserialize: {e}")

293

return None

294

except Exception as e:

295

print(f"Unexpected error: {e}")

296

return None

297

298

# Test with good data

299

good_data = b'{"status": "ok"}'

300

result = safe_deserialize(good_data, 'application/json')

301

print(f"Good data result: {result}")

302

303

# Test with disallowed content type

304

pickle_data = b'some pickle data'

305

result = safe_deserialize(pickle_data, 'application/x-python-serialize')

306

print(f"Pickle data result: {result}") # None

307

308

# Test with malformed data

309

bad_data = b'{"invalid": json}'

310

result = safe_deserialize(bad_data, 'application/json')

311

print(f"Bad data result: {result}") # None

312

```

313

314

### Registry Inspection

315

316

```python

317

from kombu.serialization import registry

318

319

# List available serializers

320

print("Available serializers:")

321

for name in registry._encoders.keys():

322

encoder_info = registry._encoders[name]

323

print(f" {name}: {encoder_info.content_type}")

324

325

# Check if a serializer is available

326

if 'json' in registry._encoders:

327

print("JSON serializer is available")

328

329

if 'pickle' in registry._encoders:

330

print("Pickle serializer is available")

331

else:

332

print("Pickle serializer not available (good for security)")

333

```

334

335

### Producer/Consumer Serialization Integration

336

337

```python

338

from kombu import Connection, Producer, Consumer, Queue

339

from kombu.serialization import enable_insecure_serializers

340

341

# Enable msgpack for better performance

342

enable_insecure_serializers(['msgpack'])

343

344

with Connection('redis://localhost:6379/0') as conn:

345

# Producer with specific serializer

346

producer = Producer(

347

conn.channel(),

348

serializer='msgpack',

349

compression='gzip'

350

)

351

352

# Publish with different serializers

353

producer.publish(

354

{'data': list(range(1000))},

355

routing_key='large_data',

356

serializer='msgpack' # Override default

357

)

358

359

producer.publish(

360

{'simple': 'message'},

361

routing_key='simple_data',

362

serializer='json' # Use JSON for simple data

363

)

364

365

# Consumer accepting multiple formats

366

def process_message(body, message):

367

print(f"Received: {type(body)} - {len(str(body))} chars")

368

print(f"Content type: {message.content_type}")

369

message.ack()

370

371

queue = Queue('mixed_format_queue')

372

consumer = Consumer(

373

conn.channel(),

374

[queue],

375

callbacks=[process_message],

376

accept=['application/json', 'application/x-msgpack']

377

)

378

379

consumer.consume()

380

# Process messages...

381

```

382

383

### Performance Comparison

384

385

```python

386

from kombu.serialization import dumps, loads, enable_insecure_serializers

387

import time

388

import sys

389

390

# Enable all serializers for testing

391

enable_insecure_serializers()

392

393

# Test data

394

test_data = {

395

'users': [{'id': i, 'name': f'user_{i}', 'active': i % 2 == 0} for i in range(1000)],

396

'metadata': {'timestamp': time.time(), 'version': '1.0'}

397

}

398

399

serializers = ['json', 'pickle', 'msgpack']

400

401

for serializer in serializers:

402

try:

403

# Serialize

404

start = time.time()

405

serialized, content_type, encoding = dumps(test_data, serializer=serializer)

406

serialize_time = time.time() - start

407

408

# Deserialize

409

start = time.time()

410

deserialized = loads(serialized, content_type, encoding)

411

deserialize_time = time.time() - start

412

413

print(f"{serializer.upper()}:")

414

print(f" Size: {len(serialized)} bytes")

415

print(f" Serialize: {serialize_time:.4f}s")

416

print(f" Deserialize: {deserialize_time:.4f}s")

417

print(f" Total: {serialize_time + deserialize_time:.4f}s")

418

print()

419

420

except ImportError:

421

print(f"{serializer.upper()}: Not available")

422

except Exception as e:

423

print(f"{serializer.upper()}: Error - {e}")

424

```