CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-faust

Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines

Pending
Overview
Eval results
Files

serialization.mddocs/

Serialization

Data serialization and schema management for type-safe message handling in Faust applications. Provides codecs for different data formats, schema definitions for structured data, and flexible serialization pipelines with support for custom serializers and data transformation.

Capabilities

Schema Management

Schema definitions for structured data serialization and deserialization. Schemas provide type-safe message handling with automatic key/value serialization, custom serializer selection, and metadata preservation.

class Schema:
    def __init__(
        self,
        *,
        key_type: type = None,
        value_type: type = None,
        key_serializer: str = None,
        value_serializer: str = None,
        allow_empty: bool = False,
        **kwargs
    ):
        """
        Create a message schema definition.
        
        Args:
            key_type: Type for message keys
            value_type: Type for message values
            key_serializer: Serializer name for keys
            value_serializer: Serializer name for values
            allow_empty: Allow None/empty values
        """

    def loads_key(
        self,
        app: App,
        message: bytes,
        *,
        loads: callable = None,
        serializer: str = None
    ) -> any:
        """
        Deserialize message key.
        
        Args:
            app: Faust application instance
            message: Raw message data
            loads: Custom deserialization function
            serializer: Override serializer
            
        Returns:
            Deserialized key object
        """

    def loads_value(
        self,
        app: App,
        message: bytes,
        *,
        loads: callable = None,
        serializer: str = None
    ) -> any:
        """
        Deserialize message value.
        
        Args:
            app: Faust application instance
            message: Raw message data
            loads: Custom deserialization function
            serializer: Override serializer
            
        Returns:
            Deserialized value object
        """

    def dumps_key(
        self,
        app: App,
        key: any,
        *,
        serializer: str = None
    ) -> bytes:
        """
        Serialize message key.
        
        Args:
            app: Faust application instance
            key: Key object to serialize
            serializer: Override serializer
            
        Returns:
            Serialized key bytes
        """

    def dumps_value(
        self,
        app: App,
        value: any,
        *,
        serializer: str = None
    ) -> bytes:
        """
        Serialize message value.
        
        Args:
            app: Faust application instance
            value: Value object to serialize
            serializer: Override serializer
            
        Returns:
            Serialized value bytes
        """

    @property
    def key_type(self) -> type:
        """Type for message keys."""

    @property
    def value_type(self) -> type:
        """Type for message values."""

    @property
    def key_serializer(self) -> str:
        """Serializer name for keys."""

    @property
    def value_serializer(self) -> str:
        """Serializer name for values."""

Codec Interface

Low-level serialization codec interface for implementing custom data formats and transformation pipelines. Codecs handle the actual byte-level encoding and decoding operations.

class Codec:
    def __init__(self, **kwargs):
        """
        Create a serialization codec.
        
        Args:
            **kwargs: Codec-specific configuration
        """

    def encode(self, obj: any) -> bytes:
        """
        Encode object to bytes.
        
        Args:
            obj: Object to encode
            
        Returns:
            Encoded bytes
            
        Raises:
            SerializationError: If encoding fails
        """

    def decode(self, data: bytes) -> any:
        """
        Decode bytes to object.
        
        Args:
            data: Bytes to decode
            
        Returns:
            Decoded object
            
        Raises:
            SerializationError: If decoding fails
        """

    @property
    def mime_type(self) -> str:
        """MIME type for this codec."""

Built-in Codecs

Pre-implemented codecs for common data formats including JSON, pickle, raw bytes, and structured formats with optimized performance and error handling.

class JSONCodec(Codec):
    def __init__(
        self,
        *,
        ensure_ascii: bool = False,
        sort_keys: bool = False,
        separators: tuple = None,
        **kwargs
    ):
        """
        JSON serialization codec.
        
        Args:
            ensure_ascii: Escape non-ASCII characters
            sort_keys: Sort dictionary keys
            separators: Custom separators (item, key)
        """

    def encode(self, obj: any) -> bytes:
        """Encode object as JSON bytes."""

    def decode(self, data: bytes) -> any:
        """Decode JSON bytes to object."""

class PickleCodec(Codec):
    def __init__(self, *, protocol: int = None, **kwargs):
        """
        Python pickle serialization codec.
        
        Args:
            protocol: Pickle protocol version
        """

    def encode(self, obj: any) -> bytes:
        """Encode object using pickle."""

    def decode(self, data: bytes) -> any:
        """Decode pickle bytes to object."""

class RawCodec(Codec):
    def encode(self, obj: bytes) -> bytes:
        """Pass-through for raw bytes."""

    def decode(self, data: bytes) -> bytes:
        """Pass-through for raw bytes."""

class BinaryCodec(Codec):
    def encode(self, obj: any) -> bytes:
        """Encode as binary representation."""

    def decode(self, data: bytes) -> any:
        """Decode from binary representation."""

Registry Management

Codec registry for managing available serializers and their configuration. Provides dynamic codec selection, registration of custom codecs, and serializer name resolution.

class Registry:
    def __init__(self):
        """Create codec registry with built-in codecs."""

    def register(self, name: str, codec: Codec) -> None:
        """
        Register a codec with given name.
        
        Args:
            name: Codec name for lookup
            codec: Codec instance or class
        """

    def get(self, name: str) -> Codec:
        """
        Get codec by name.
        
        Args:
            name: Codec name
            
        Returns:
            Codec instance
            
        Raises:
            KeyError: If codec not found
        """

    def list_codecs(self) -> list:
        """
        List all registered codec names.
        
        Returns:
            List of codec names
        """

# Default codec registry
codecs = Registry()

def register_codec(name: str, codec: Codec) -> None:
    """
    Register codec in default registry.
    
    Args:
        name: Codec name
        codec: Codec instance or class
    """

def get_codec(name: str) -> Codec:
    """
    Get codec from default registry.
    
    Args:
        name: Codec name
        
    Returns:
        Codec instance
    """

Serializer Configuration

Configuration utilities for setting up serialization behavior at the application, topic, and message level with inheritance and override support.

class SerializerSettings:
    def __init__(
        self,
        *,
        key: str = None,
        value: str = None,
        allow_empty: bool = True,
        **kwargs
    ):
        """
        Serializer configuration settings.
        
        Args:
            key: Default key serializer name
            value: Default value serializer name
            allow_empty: Allow empty/None values
        """

    @property
    def key_serializer(self) -> str:
        """Default key serializer."""

    @property
    def value_serializer(self) -> str:
        """Default value serializer."""

def configure_serializers(
    app: App,
    *,
    key: str = None,
    value: str = None,
    **kwargs
) -> None:
    """
    Configure default serializers for application.
    
    Args:
        app: Faust application
        key: Default key serializer
        value: Default value serializer
    """

Custom Serializers

Framework for implementing custom serialization formats with proper error handling, type validation, and performance optimization.

class CustomCodec(Codec):
    def __init__(self, **config):
        """
        Base class for custom codecs.
        
        Args:
            **config: Codec configuration
        """
        super().__init__(**config)

    def validate(self, obj: any) -> bool:
        """
        Validate object before serialization.
        
        Args:
            obj: Object to validate
            
        Returns:
            True if valid for this codec
        """

    def transform_encode(self, obj: any) -> any:
        """
        Transform object before encoding.
        
        Args:
            obj: Object to transform
            
        Returns:
            Transformed object
        """

    def transform_decode(self, obj: any) -> any:
        """
        Transform object after decoding.
        
        Args:
            obj: Decoded object
            
        Returns:
            Transformed object
        """

class SerializationError(Exception):
    """Raised when serialization/deserialization fails."""
    pass

class SchemaError(Exception):
    """Raised when schema validation fails."""
    pass

Usage Examples

Basic Serialization

import faust
from faust import JSONCodec, PickleCodec

app = faust.App('serialization-app', broker='kafka://localhost:9092')

# Topic with JSON serialization
json_topic = app.topic(
    'json-events',
    value_type=dict,
    value_serializer='json'
)

# Topic with pickle serialization
pickle_topic = app.topic(
    'pickle-data',
    value_serializer='pickle'
)

@app.agent(json_topic)
async def handle_json_events(events):
    async for event in events:
        # Automatically deserialized from JSON
        print(f"Event type: {event['type']}, data: {event['data']}")

# Send JSON data
await json_topic.send(value={
    'type': 'user_login',
    'data': {'user_id': 123, 'timestamp': '2024-01-01T00:00:00Z'}
})

Custom Schema Definition

from faust import Schema
from datetime import datetime

class EventSchema(Schema):
    def __init__(self):
        super().__init__(
            key_type=str,
            value_type=dict,
            key_serializer='raw',
            value_serializer='json'
        )

# Topic with custom schema
events_topic = app.topic(
    'events',
    schema=EventSchema()
)

@app.agent(events_topic)
async def process_events(events):
    async for event in events:
        # Keys are strings, values are dicts
        key = event.key  # Already deserialized
        data = event.value  # Already deserialized
        print(f"Processing {key}: {data}")

Custom Codec Implementation

import json
import gzip
from faust import Codec

class CompressedJSONCodec(Codec):
    """JSON codec with gzip compression."""
    
    def encode(self, obj):
        json_bytes = json.dumps(obj).encode('utf-8')
        return gzip.compress(json_bytes)
    
    def decode(self, data):
        json_bytes = gzip.decompress(data)
        return json.loads(json_bytes.decode('utf-8'))
    
    @property
    def mime_type(self):
        return 'application/json+gzip'

# Register custom codec
faust.codecs.register('compressed_json', CompressedJSONCodec())

# Use custom codec
compressed_topic = app.topic(
    'compressed-data',
    value_serializer='compressed_json'
)

Model-based Serialization

class User(faust.Record, serializer='json'):
    id: int
    name: str
    email: str
    created_at: datetime

class UserEvent(faust.Record, serializer='json'):
    user: User
    event_type: str
    timestamp: datetime

# Topics with model types
users_topic = app.topic('users', value_type=User)
events_topic = app.topic('user-events', value_type=UserEvent)

@app.agent(users_topic)
async def handle_users(users):
    async for user in users:
        # Automatic deserialization to User model
        print(f"User {user.id}: {user.name} ({user.email})")
        
        # Create related event
        event = UserEvent(
            user=user,
            event_type='created',
            timestamp=datetime.utcnow()
        )
        await events_topic.send(value=event)

Advanced Serialization Configuration

# Configure app-wide serializer defaults
app = faust.App(
    'my-app',
    broker='kafka://localhost:9092',
    key_serializer='raw',
    value_serializer='json'
)

# Topic with custom serializers
special_topic = app.topic(
    'special-data',
    key_type=str,
    value_type=bytes,
    key_serializer='json',  # Override app default
    value_serializer='raw'  # Override app default
)

# Message-level serializer override
await special_topic.send(
    key={'user_id': 123},
    value=b'raw binary data',
    key_serializer='pickle',    # Override topic default
    value_serializer='binary'   # Override topic default
)

Error Handling

from faust import SerializationError

@app.agent()
async def resilient_processor(stream):
    async for event in stream:
        try:
            # Process the event
            data = event.value
            process_data(data)
        except SerializationError as e:
            # Handle serialization errors
            print(f"Serialization error: {e}")
            # Log to dead letter queue or skip
            await dead_letter_topic.send(
                key=event.key,
                value={'error': str(e), 'raw_data': event.message.value}
            )

Type Interfaces

from typing import Protocol, Any, Optional, Union, Dict

class CodecT(Protocol):
    """Type interface for Codec."""
    
    mime_type: str
    
    def encode(self, obj: Any) -> bytes: ...
    def decode(self, data: bytes) -> Any: ...

class SchemaT(Protocol):
    """Type interface for Schema."""
    
    key_type: Optional[type]
    value_type: Optional[type]
    key_serializer: Optional[str]
    value_serializer: Optional[str]
    
    def loads_key(self, app: Any, message: bytes, **kwargs) -> Any: ...
    def loads_value(self, app: Any, message: bytes, **kwargs) -> Any: ...
    def dumps_key(self, app: Any, key: Any, **kwargs) -> bytes: ...
    def dumps_value(self, app: Any, value: Any, **kwargs) -> bytes: ...

class RegistryT(Protocol):
    """Type interface for codec Registry."""
    
    def register(self, name: str, codec: CodecT) -> None: ...
    def get(self, name: str) -> CodecT: ...
    def list_codecs(self) -> list: ...

Install with Tessl CLI

npx tessl i tessl/pypi-faust

docs

authentication.md

cli-framework.md

core-application.md

data-management.md

index.md

monitoring.md

serialization.md

stream-processing.md

topics-channels.md

windowing.md

worker-management.md

tile.json