0
# Serialization
1
2
Data serialization and schema management for type-safe message handling in Faust applications. Provides codecs for different data formats, schema definitions for structured data, and flexible serialization pipelines with support for custom serializers and data transformation.
3
4
## Capabilities
5
6
### Schema Management
7
8
Schema definitions for structured data serialization and deserialization. Schemas provide type-safe message handling with automatic key/value serialization, custom serializer selection, and metadata preservation.
9
10
```python { .api }
11
class Schema:
12
def __init__(
13
self,
14
*,
15
key_type: type = None,
16
value_type: type = None,
17
key_serializer: str = None,
18
value_serializer: str = None,
19
allow_empty: bool = False,
20
**kwargs
21
):
22
"""
23
Create a message schema definition.
24
25
Args:
26
key_type: Type for message keys
27
value_type: Type for message values
28
key_serializer: Serializer name for keys
29
value_serializer: Serializer name for values
30
allow_empty: Allow None/empty values
31
"""
32
33
def loads_key(
34
self,
35
app: App,
36
message: bytes,
37
*,
38
loads: callable = None,
39
serializer: str = None
40
) -> any:
41
"""
42
Deserialize message key.
43
44
Args:
45
app: Faust application instance
46
message: Raw message data
47
loads: Custom deserialization function
48
serializer: Override serializer
49
50
Returns:
51
Deserialized key object
52
"""
53
54
def loads_value(
55
self,
56
app: App,
57
message: bytes,
58
*,
59
loads: callable = None,
60
serializer: str = None
61
) -> any:
62
"""
63
Deserialize message value.
64
65
Args:
66
app: Faust application instance
67
message: Raw message data
68
loads: Custom deserialization function
69
serializer: Override serializer
70
71
Returns:
72
Deserialized value object
73
"""
74
75
def dumps_key(
76
self,
77
app: App,
78
key: any,
79
*,
80
serializer: str = None
81
) -> bytes:
82
"""
83
Serialize message key.
84
85
Args:
86
app: Faust application instance
87
key: Key object to serialize
88
serializer: Override serializer
89
90
Returns:
91
Serialized key bytes
92
"""
93
94
def dumps_value(
95
self,
96
app: App,
97
value: any,
98
*,
99
serializer: str = None
100
) -> bytes:
101
"""
102
Serialize message value.
103
104
Args:
105
app: Faust application instance
106
value: Value object to serialize
107
serializer: Override serializer
108
109
Returns:
110
Serialized value bytes
111
"""
112
113
@property
114
def key_type(self) -> type:
115
"""Type for message keys."""
116
117
@property
118
def value_type(self) -> type:
119
"""Type for message values."""
120
121
@property
122
def key_serializer(self) -> str:
123
"""Serializer name for keys."""
124
125
@property
126
def value_serializer(self) -> str:
127
"""Serializer name for values."""
128
```
129
130
### Codec Interface
131
132
Low-level serialization codec interface for implementing custom data formats and transformation pipelines. Codecs handle the actual byte-level encoding and decoding operations.
133
134
```python { .api }
135
class Codec:
136
def __init__(self, **kwargs):
137
"""
138
Create a serialization codec.
139
140
Args:
141
**kwargs: Codec-specific configuration
142
"""
143
144
def encode(self, obj: any) -> bytes:
145
"""
146
Encode object to bytes.
147
148
Args:
149
obj: Object to encode
150
151
Returns:
152
Encoded bytes
153
154
Raises:
155
SerializationError: If encoding fails
156
"""
157
158
def decode(self, data: bytes) -> any:
159
"""
160
Decode bytes to object.
161
162
Args:
163
data: Bytes to decode
164
165
Returns:
166
Decoded object
167
168
Raises:
169
SerializationError: If decoding fails
170
"""
171
172
@property
173
def mime_type(self) -> str:
174
"""MIME type for this codec."""
175
```
176
177
### Built-in Codecs
178
179
Pre-implemented codecs for common data formats including JSON, pickle, raw bytes, and structured formats with optimized performance and error handling.
180
181
```python { .api }
182
class JSONCodec(Codec):
183
def __init__(
184
self,
185
*,
186
ensure_ascii: bool = False,
187
sort_keys: bool = False,
188
separators: tuple = None,
189
**kwargs
190
):
191
"""
192
JSON serialization codec.
193
194
Args:
195
ensure_ascii: Escape non-ASCII characters
196
sort_keys: Sort dictionary keys
197
separators: Custom separators (item, key)
198
"""
199
200
def encode(self, obj: any) -> bytes:
201
"""Encode object as JSON bytes."""
202
203
def decode(self, data: bytes) -> any:
204
"""Decode JSON bytes to object."""
205
206
class PickleCodec(Codec):
207
def __init__(self, *, protocol: int = None, **kwargs):
208
"""
209
Python pickle serialization codec.
210
211
Args:
212
protocol: Pickle protocol version
213
"""
214
215
def encode(self, obj: any) -> bytes:
216
"""Encode object using pickle."""
217
218
def decode(self, data: bytes) -> any:
219
"""Decode pickle bytes to object."""
220
221
class RawCodec(Codec):
222
def encode(self, obj: bytes) -> bytes:
223
"""Pass-through for raw bytes."""
224
225
def decode(self, data: bytes) -> bytes:
226
"""Pass-through for raw bytes."""
227
228
class BinaryCodec(Codec):
229
def encode(self, obj: any) -> bytes:
230
"""Encode as binary representation."""
231
232
def decode(self, data: bytes) -> any:
233
"""Decode from binary representation."""
234
```
235
236
### Registry Management
237
238
Codec registry for managing available serializers and their configuration. Provides dynamic codec selection, registration of custom codecs, and serializer name resolution.
239
240
```python { .api }
241
class Registry:
242
def __init__(self):
243
"""Create codec registry with built-in codecs."""
244
245
def register(self, name: str, codec: Codec) -> None:
246
"""
247
Register a codec with given name.
248
249
Args:
250
name: Codec name for lookup
251
codec: Codec instance or class
252
"""
253
254
def get(self, name: str) -> Codec:
255
"""
256
Get codec by name.
257
258
Args:
259
name: Codec name
260
261
Returns:
262
Codec instance
263
264
Raises:
265
KeyError: If codec not found
266
"""
267
268
def list_codecs(self) -> list:
269
"""
270
List all registered codec names.
271
272
Returns:
273
List of codec names
274
"""
275
276
# Default codec registry
277
codecs = Registry()
278
279
def register_codec(name: str, codec: Codec) -> None:
280
"""
281
Register codec in default registry.
282
283
Args:
284
name: Codec name
285
codec: Codec instance or class
286
"""
287
288
def get_codec(name: str) -> Codec:
289
"""
290
Get codec from default registry.
291
292
Args:
293
name: Codec name
294
295
Returns:
296
Codec instance
297
"""
298
```
299
300
### Serializer Configuration
301
302
Configuration utilities for setting up serialization behavior at the application, topic, and message level with inheritance and override support.
303
304
```python { .api }
305
class SerializerSettings:
306
def __init__(
307
self,
308
*,
309
key: str = None,
310
value: str = None,
311
allow_empty: bool = True,
312
**kwargs
313
):
314
"""
315
Serializer configuration settings.
316
317
Args:
318
key: Default key serializer name
319
value: Default value serializer name
320
allow_empty: Allow empty/None values
321
"""
322
323
@property
324
def key_serializer(self) -> str:
325
"""Default key serializer."""
326
327
@property
328
def value_serializer(self) -> str:
329
"""Default value serializer."""
330
331
def configure_serializers(
332
app: App,
333
*,
334
key: str = None,
335
value: str = None,
336
**kwargs
337
) -> None:
338
"""
339
Configure default serializers for application.
340
341
Args:
342
app: Faust application
343
key: Default key serializer
344
value: Default value serializer
345
"""
346
```
347
348
### Custom Serializers
349
350
Framework for implementing custom serialization formats with proper error handling, type validation, and performance optimization.
351
352
```python { .api }
353
class CustomCodec(Codec):
354
def __init__(self, **config):
355
"""
356
Base class for custom codecs.
357
358
Args:
359
**config: Codec configuration
360
"""
361
super().__init__(**config)
362
363
def validate(self, obj: any) -> bool:
364
"""
365
Validate object before serialization.
366
367
Args:
368
obj: Object to validate
369
370
Returns:
371
True if valid for this codec
372
"""
373
374
def transform_encode(self, obj: any) -> any:
375
"""
376
Transform object before encoding.
377
378
Args:
379
obj: Object to transform
380
381
Returns:
382
Transformed object
383
"""
384
385
def transform_decode(self, obj: any) -> any:
386
"""
387
Transform object after decoding.
388
389
Args:
390
obj: Decoded object
391
392
Returns:
393
Transformed object
394
"""
395
396
class SerializationError(Exception):
397
"""Raised when serialization/deserialization fails."""
398
pass
399
400
class SchemaError(Exception):
401
"""Raised when schema validation fails."""
402
pass
403
```
404
405
## Usage Examples
406
407
### Basic Serialization
408
409
```python
410
import faust
411
from faust import JSONCodec, PickleCodec
412
413
app = faust.App('serialization-app', broker='kafka://localhost:9092')
414
415
# Topic with JSON serialization
416
json_topic = app.topic(
417
'json-events',
418
value_type=dict,
419
value_serializer='json'
420
)
421
422
# Topic with pickle serialization
423
pickle_topic = app.topic(
424
'pickle-data',
425
value_serializer='pickle'
426
)
427
428
@app.agent(json_topic)
429
async def handle_json_events(events):
430
async for event in events:
431
# Automatically deserialized from JSON
432
print(f"Event type: {event['type']}, data: {event['data']}")
433
434
# Send JSON data
435
await json_topic.send(value={
436
'type': 'user_login',
437
'data': {'user_id': 123, 'timestamp': '2024-01-01T00:00:00Z'}
438
})
439
```
440
441
### Custom Schema Definition
442
443
```python
444
from faust import Schema
445
from datetime import datetime
446
447
class EventSchema(Schema):
448
def __init__(self):
449
super().__init__(
450
key_type=str,
451
value_type=dict,
452
key_serializer='raw',
453
value_serializer='json'
454
)
455
456
# Topic with custom schema
457
events_topic = app.topic(
458
'events',
459
schema=EventSchema()
460
)
461
462
@app.agent(events_topic)
463
async def process_events(events):
464
async for event in events:
465
# Keys are strings, values are dicts
466
key = event.key # Already deserialized
467
data = event.value # Already deserialized
468
print(f"Processing {key}: {data}")
469
```
470
471
### Custom Codec Implementation
472
473
```python
474
import json
475
import gzip
476
from faust import Codec
477
478
class CompressedJSONCodec(Codec):
479
"""JSON codec with gzip compression."""
480
481
def encode(self, obj):
482
json_bytes = json.dumps(obj).encode('utf-8')
483
return gzip.compress(json_bytes)
484
485
def decode(self, data):
486
json_bytes = gzip.decompress(data)
487
return json.loads(json_bytes.decode('utf-8'))
488
489
@property
490
def mime_type(self):
491
return 'application/json+gzip'
492
493
# Register custom codec
494
faust.codecs.register('compressed_json', CompressedJSONCodec())
495
496
# Use custom codec
497
compressed_topic = app.topic(
498
'compressed-data',
499
value_serializer='compressed_json'
500
)
501
```
502
503
### Model-based Serialization
504
505
```python
506
class User(faust.Record, serializer='json'):
507
id: int
508
name: str
509
email: str
510
created_at: datetime
511
512
class UserEvent(faust.Record, serializer='json'):
513
user: User
514
event_type: str
515
timestamp: datetime
516
517
# Topics with model types
518
users_topic = app.topic('users', value_type=User)
519
events_topic = app.topic('user-events', value_type=UserEvent)
520
521
@app.agent(users_topic)
522
async def handle_users(users):
523
async for user in users:
524
# Automatic deserialization to User model
525
print(f"User {user.id}: {user.name} ({user.email})")
526
527
# Create related event
528
event = UserEvent(
529
user=user,
530
event_type='created',
531
timestamp=datetime.utcnow()
532
)
533
await events_topic.send(value=event)
534
```
535
536
### Advanced Serialization Configuration
537
538
```python
539
# Configure app-wide serializer defaults
540
app = faust.App(
541
'my-app',
542
broker='kafka://localhost:9092',
543
key_serializer='raw',
544
value_serializer='json'
545
)
546
547
# Topic with custom serializers
548
special_topic = app.topic(
549
'special-data',
550
key_type=str,
551
value_type=bytes,
552
key_serializer='json', # Override app default
553
value_serializer='raw' # Override app default
554
)
555
556
# Message-level serializer override
557
await special_topic.send(
558
key={'user_id': 123},
559
value=b'raw binary data',
560
key_serializer='pickle', # Override topic default
561
value_serializer='binary' # Override topic default
562
)
563
```
564
565
### Error Handling
566
567
```python
568
from faust import SerializationError
569
570
@app.agent()
571
async def resilient_processor(stream):
572
async for event in stream:
573
try:
574
# Process the event
575
data = event.value
576
process_data(data)
577
except SerializationError as e:
578
# Handle serialization errors
579
print(f"Serialization error: {e}")
580
# Log to dead letter queue or skip
581
await dead_letter_topic.send(
582
key=event.key,
583
value={'error': str(e), 'raw_data': event.message.value}
584
)
585
```
586
587
## Type Interfaces
588
589
```python { .api }
590
from typing import Protocol, Any, Optional, Union, Dict
591
592
class CodecT(Protocol):
593
"""Type interface for Codec."""
594
595
mime_type: str
596
597
def encode(self, obj: Any) -> bytes: ...
598
def decode(self, data: bytes) -> Any: ...
599
600
class SchemaT(Protocol):
601
"""Type interface for Schema."""
602
603
key_type: Optional[type]
604
value_type: Optional[type]
605
key_serializer: Optional[str]
606
value_serializer: Optional[str]
607
608
def loads_key(self, app: Any, message: bytes, **kwargs) -> Any: ...
609
def loads_value(self, app: Any, message: bytes, **kwargs) -> Any: ...
610
def dumps_key(self, app: Any, key: Any, **kwargs) -> bytes: ...
611
def dumps_value(self, app: Any, value: Any, **kwargs) -> bytes: ...
612
613
class RegistryT(Protocol):
614
"""Type interface for codec Registry."""
615
616
def register(self, name: str, codec: CodecT) -> None: ...
617
def get(self, name: str) -> CodecT: ...
618
def list_codecs(self) -> list: ...
619
```