Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines
—
Data serialization and schema management for type-safe message handling in Faust applications. Provides codecs for different data formats, schema definitions for structured data, and flexible serialization pipelines with support for custom serializers and data transformation.
Schema definitions for structured data serialization and deserialization. Schemas provide type-safe message handling with automatic key/value serialization, custom serializer selection, and metadata preservation.
class Schema:
def __init__(
self,
*,
key_type: type = None,
value_type: type = None,
key_serializer: str = None,
value_serializer: str = None,
allow_empty: bool = False,
**kwargs
):
"""
Create a message schema definition.
Args:
key_type: Type for message keys
value_type: Type for message values
key_serializer: Serializer name for keys
value_serializer: Serializer name for values
allow_empty: Allow None/empty values
"""
def loads_key(
self,
app: App,
message: bytes,
*,
loads: callable = None,
serializer: str = None
) -> any:
"""
Deserialize message key.
Args:
app: Faust application instance
message: Raw message data
loads: Custom deserialization function
serializer: Override serializer
Returns:
Deserialized key object
"""
def loads_value(
self,
app: App,
message: bytes,
*,
loads: callable = None,
serializer: str = None
) -> any:
"""
Deserialize message value.
Args:
app: Faust application instance
message: Raw message data
loads: Custom deserialization function
serializer: Override serializer
Returns:
Deserialized value object
"""
def dumps_key(
self,
app: App,
key: any,
*,
serializer: str = None
) -> bytes:
"""
Serialize message key.
Args:
app: Faust application instance
key: Key object to serialize
serializer: Override serializer
Returns:
Serialized key bytes
"""
def dumps_value(
self,
app: App,
value: any,
*,
serializer: str = None
) -> bytes:
"""
Serialize message value.
Args:
app: Faust application instance
value: Value object to serialize
serializer: Override serializer
Returns:
Serialized value bytes
"""
@property
def key_type(self) -> type:
"""Type for message keys."""
@property
def value_type(self) -> type:
"""Type for message values."""
@property
def key_serializer(self) -> str:
"""Serializer name for keys."""
@property
def value_serializer(self) -> str:
"""Serializer name for values."""Low-level serialization codec interface for implementing custom data formats and transformation pipelines. Codecs handle the actual byte-level encoding and decoding operations.
class Codec:
def __init__(self, **kwargs):
"""
Create a serialization codec.
Args:
**kwargs: Codec-specific configuration
"""
def encode(self, obj: any) -> bytes:
"""
Encode object to bytes.
Args:
obj: Object to encode
Returns:
Encoded bytes
Raises:
SerializationError: If encoding fails
"""
def decode(self, data: bytes) -> any:
"""
Decode bytes to object.
Args:
data: Bytes to decode
Returns:
Decoded object
Raises:
SerializationError: If decoding fails
"""
@property
def mime_type(self) -> str:
"""MIME type for this codec."""Pre-implemented codecs for common data formats including JSON, pickle, raw bytes, and structured formats with optimized performance and error handling.
class JSONCodec(Codec):
def __init__(
self,
*,
ensure_ascii: bool = False,
sort_keys: bool = False,
separators: tuple = None,
**kwargs
):
"""
JSON serialization codec.
Args:
ensure_ascii: Escape non-ASCII characters
sort_keys: Sort dictionary keys
separators: Custom separators (item, key)
"""
def encode(self, obj: any) -> bytes:
"""Encode object as JSON bytes."""
def decode(self, data: bytes) -> any:
"""Decode JSON bytes to object."""
class PickleCodec(Codec):
def __init__(self, *, protocol: int = None, **kwargs):
"""
Python pickle serialization codec.
Args:
protocol: Pickle protocol version
"""
def encode(self, obj: any) -> bytes:
"""Encode object using pickle."""
def decode(self, data: bytes) -> any:
"""Decode pickle bytes to object."""
class RawCodec(Codec):
def encode(self, obj: bytes) -> bytes:
"""Pass-through for raw bytes."""
def decode(self, data: bytes) -> bytes:
"""Pass-through for raw bytes."""
class BinaryCodec(Codec):
def encode(self, obj: any) -> bytes:
"""Encode as binary representation."""
def decode(self, data: bytes) -> any:
"""Decode from binary representation."""Codec registry for managing available serializers and their configuration. Provides dynamic codec selection, registration of custom codecs, and serializer name resolution.
class Registry:
def __init__(self):
"""Create codec registry with built-in codecs."""
def register(self, name: str, codec: Codec) -> None:
"""
Register a codec with given name.
Args:
name: Codec name for lookup
codec: Codec instance or class
"""
def get(self, name: str) -> Codec:
"""
Get codec by name.
Args:
name: Codec name
Returns:
Codec instance
Raises:
KeyError: If codec not found
"""
def list_codecs(self) -> list:
"""
List all registered codec names.
Returns:
List of codec names
"""
# Default codec registry
codecs = Registry()
def register_codec(name: str, codec: Codec) -> None:
"""
Register codec in default registry.
Args:
name: Codec name
codec: Codec instance or class
"""
def get_codec(name: str) -> Codec:
"""
Get codec from default registry.
Args:
name: Codec name
Returns:
Codec instance
"""Configuration utilities for setting up serialization behavior at the application, topic, and message level with inheritance and override support.
class SerializerSettings:
def __init__(
self,
*,
key: str = None,
value: str = None,
allow_empty: bool = True,
**kwargs
):
"""
Serializer configuration settings.
Args:
key: Default key serializer name
value: Default value serializer name
allow_empty: Allow empty/None values
"""
@property
def key_serializer(self) -> str:
"""Default key serializer."""
@property
def value_serializer(self) -> str:
"""Default value serializer."""
def configure_serializers(
app: App,
*,
key: str = None,
value: str = None,
**kwargs
) -> None:
"""
Configure default serializers for application.
Args:
app: Faust application
key: Default key serializer
value: Default value serializer
"""Framework for implementing custom serialization formats with proper error handling, type validation, and performance optimization.
class CustomCodec(Codec):
def __init__(self, **config):
"""
Base class for custom codecs.
Args:
**config: Codec configuration
"""
super().__init__(**config)
def validate(self, obj: any) -> bool:
"""
Validate object before serialization.
Args:
obj: Object to validate
Returns:
True if valid for this codec
"""
def transform_encode(self, obj: any) -> any:
"""
Transform object before encoding.
Args:
obj: Object to transform
Returns:
Transformed object
"""
def transform_decode(self, obj: any) -> any:
"""
Transform object after decoding.
Args:
obj: Decoded object
Returns:
Transformed object
"""
class SerializationError(Exception):
"""Raised when serialization/deserialization fails."""
pass
class SchemaError(Exception):
"""Raised when schema validation fails."""
passimport faust
from faust import JSONCodec, PickleCodec
app = faust.App('serialization-app', broker='kafka://localhost:9092')
# Topic with JSON serialization
json_topic = app.topic(
'json-events',
value_type=dict,
value_serializer='json'
)
# Topic with pickle serialization
pickle_topic = app.topic(
'pickle-data',
value_serializer='pickle'
)
@app.agent(json_topic)
async def handle_json_events(events):
async for event in events:
# Automatically deserialized from JSON
print(f"Event type: {event['type']}, data: {event['data']}")
# Send JSON data
await json_topic.send(value={
'type': 'user_login',
'data': {'user_id': 123, 'timestamp': '2024-01-01T00:00:00Z'}
})from faust import Schema
from datetime import datetime
class EventSchema(Schema):
def __init__(self):
super().__init__(
key_type=str,
value_type=dict,
key_serializer='raw',
value_serializer='json'
)
# Topic with custom schema
events_topic = app.topic(
'events',
schema=EventSchema()
)
@app.agent(events_topic)
async def process_events(events):
async for event in events:
# Keys are strings, values are dicts
key = event.key # Already deserialized
data = event.value # Already deserialized
print(f"Processing {key}: {data}")import json
import gzip
from faust import Codec
class CompressedJSONCodec(Codec):
"""JSON codec with gzip compression."""
def encode(self, obj):
json_bytes = json.dumps(obj).encode('utf-8')
return gzip.compress(json_bytes)
def decode(self, data):
json_bytes = gzip.decompress(data)
return json.loads(json_bytes.decode('utf-8'))
@property
def mime_type(self):
return 'application/json+gzip'
# Register custom codec
faust.codecs.register('compressed_json', CompressedJSONCodec())
# Use custom codec
compressed_topic = app.topic(
'compressed-data',
value_serializer='compressed_json'
)class User(faust.Record, serializer='json'):
id: int
name: str
email: str
created_at: datetime
class UserEvent(faust.Record, serializer='json'):
user: User
event_type: str
timestamp: datetime
# Topics with model types
users_topic = app.topic('users', value_type=User)
events_topic = app.topic('user-events', value_type=UserEvent)
@app.agent(users_topic)
async def handle_users(users):
async for user in users:
# Automatic deserialization to User model
print(f"User {user.id}: {user.name} ({user.email})")
# Create related event
event = UserEvent(
user=user,
event_type='created',
timestamp=datetime.utcnow()
)
await events_topic.send(value=event)# Configure app-wide serializer defaults
app = faust.App(
'my-app',
broker='kafka://localhost:9092',
key_serializer='raw',
value_serializer='json'
)
# Topic with custom serializers
special_topic = app.topic(
'special-data',
key_type=str,
value_type=bytes,
key_serializer='json', # Override app default
value_serializer='raw' # Override app default
)
# Message-level serializer override
await special_topic.send(
key={'user_id': 123},
value=b'raw binary data',
key_serializer='pickle', # Override topic default
value_serializer='binary' # Override topic default
)from faust import SerializationError
@app.agent()
async def resilient_processor(stream):
async for event in stream:
try:
# Process the event
data = event.value
process_data(data)
except SerializationError as e:
# Handle serialization errors
print(f"Serialization error: {e}")
# Log to dead letter queue or skip
await dead_letter_topic.send(
key=event.key,
value={'error': str(e), 'raw_data': event.message.value}
)from typing import Protocol, Any, Optional, Union, Dict
class CodecT(Protocol):
"""Type interface for Codec."""
mime_type: str
def encode(self, obj: Any) -> bytes: ...
def decode(self, data: bytes) -> Any: ...
class SchemaT(Protocol):
"""Type interface for Schema."""
key_type: Optional[type]
value_type: Optional[type]
key_serializer: Optional[str]
value_serializer: Optional[str]
def loads_key(self, app: Any, message: bytes, **kwargs) -> Any: ...
def loads_value(self, app: Any, message: bytes, **kwargs) -> Any: ...
def dumps_key(self, app: Any, key: Any, **kwargs) -> bytes: ...
def dumps_value(self, app: Any, value: Any, **kwargs) -> bytes: ...
class RegistryT(Protocol):
"""Type interface for codec Registry."""
def register(self, name: str, codec: CodecT) -> None: ...
def get(self, name: str) -> CodecT: ...
def list_codecs(self) -> list: ...Install with Tessl CLI
npx tessl i tessl/pypi-faust