CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-faust

Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines

Pending
Overview
Eval results
Files

data-management.mddocs/

Data Management

Stateful data management through tables and models in Faust applications. Tables provide distributed key-value storage with changelog-based replication, while models offer structured data definitions with type-safe serialization and validation capabilities.

Capabilities

Table Storage

Distributed key-value storage interface with changelog-based replication for stateful stream processing. Tables automatically maintain consistency across application instances and provide both local and global access patterns.

class Table:
    def __init__(
        self,
        app: App,
        *,
        name: str,
        default: callable = None,
        key_type: type = None,
        value_type: type = None,
        partitions: int = None,
        window: Window = None,
        changelog_topic: Topic = None,
        help: str = None,
        **kwargs
    ):
        """
        Create a new distributed table.
        
        Args:
            app: The Faust application instance
            name: Table name (used for changelog topic)
            default: Default value factory function
            key_type: Type for table keys
            value_type: Type for table values
            partitions: Number of changelog partitions
            window: Window specification for windowed tables
            changelog_topic: Custom changelog topic
            help: Help text for CLI
        """

    def __getitem__(self, key: any) -> any:
        """
        Get value by key.
        
        Args:
            key: Table key
            
        Returns:
            Value associated with key
            
        Raises:
            KeyError: If key not found and no default
        """

    def __setitem__(self, key: any, value: any) -> None:
        """
        Set key-value pair.
        
        Args:
            key: Table key
            value: Value to store
        """

    def __delitem__(self, key: any) -> None:
        """
        Delete key from table.
        
        Args:
            key: Key to delete
            
        Raises:
            KeyError: If key not found
        """

    def __contains__(self, key: any) -> bool:
        """
        Check if key exists in table.
        
        Args:
            key: Key to check
            
        Returns:
            True if key exists
        """

    def get(self, key: any, default: any = None) -> any:
        """
        Get value by key with optional default.
        
        Args:
            key: Table key
            default: Default value if key not found
            
        Returns:
            Value or default
        """

    def setdefault(self, key: any, default: any = None) -> any:
        """
        Get value or set and return default.
        
        Args:
            key: Table key
            default: Default value to set if key missing
            
        Returns:
            Existing value or newly set default
        """

    def pop(self, key: any, *default) -> any:
        """
        Remove key and return value.
        
        Args:
            key: Key to remove
            *default: Optional default if key not found
            
        Returns:
            Value that was removed
            
        Raises:
            KeyError: If key not found and no default provided
        """

    def update(self, *args, **kwargs) -> None:
        """
        Update table with key-value pairs.
        
        Args:
            *args: Mapping or iterable of pairs
            **kwargs: Keyword arguments as key-value pairs
        """

    def clear(self) -> None:
        """Remove all items from table."""

    def items(self) -> Iterator:
        """
        Iterate over key-value pairs.
        
        Returns:
            Iterator of (key, value) tuples
        """

    def keys(self) -> Iterator:
        """
        Iterate over keys.
        
        Returns:
            Iterator of keys
        """

    def values(self) -> Iterator:
        """
        Iterate over values.
        
        Returns:
            Iterator of values
        """

    def copy(self) -> dict:
        """
        Create a dictionary copy of table contents.
        
        Returns:
            Dictionary with current table state
        """

    @property
    def name(self) -> str:
        """Table name."""

    @property
    def default(self) -> callable:
        """Default value factory."""

    @property
    def key_type(self) -> type:
        """Type for table keys."""

    @property
    def value_type(self) -> type:
        """Type for table values."""

    @property
    def changelog_topic(self) -> Topic:
        """Changelog topic for replication."""

Global Table

Global table providing read-only access to table data across all application instances, regardless of partition assignment. Useful for lookup tables and reference data that all instances need access to.

class GlobalTable(Table):
    def __init__(
        self,
        app: App,
        *,
        name: str,
        default: callable = None,
        key_type: type = None,
        value_type: type = None,
        changelog_topic: Topic = None,
        help: str = None,
        **kwargs
    ):
        """
        Create a new global table with read access from all instances.
        
        Args:
            app: The Faust application instance
            name: Table name
            default: Default value factory function
            key_type: Type for table keys
            value_type: Type for table values
            changelog_topic: Custom changelog topic
            help: Help text for CLI
        """

    def __setitem__(self, key: any, value: any) -> None:
        """
        Set operations not supported on global tables.
        
        Raises:
            NotImplementedError: Global tables are read-only
        """

    def __delitem__(self, key: any) -> None:
        """
        Delete operations not supported on global tables.
        
        Raises:
            NotImplementedError: Global tables are read-only
        """

Set Tables

Specialized table implementations for storing sets of values, providing set operations and membership testing with distributed consistency.

class SetTable:
    def __init__(
        self,
        app: App,
        *,
        name: str,
        key_type: type = None,
        value_type: type = None,
        partitions: int = None,
        changelog_topic: Topic = None,
        help: str = None,
        **kwargs
    ):
        """
        Create a distributed set table.
        
        Args:
            app: The Faust application instance
            name: Table name
            key_type: Type for set keys
            value_type: Type for set elements
            partitions: Number of changelog partitions
            changelog_topic: Custom changelog topic
            help: Help text for CLI
        """

    def add(self, key: any, value: any) -> None:
        """
        Add element to set at key.
        
        Args:
            key: Set key
            value: Element to add
        """

    def discard(self, key: any, value: any) -> None:
        """
        Remove element from set at key if present.
        
        Args:
            key: Set key
            value: Element to remove
        """

    def remove(self, key: any, value: any) -> None:
        """
        Remove element from set at key.
        
        Args:
            key: Set key
            value: Element to remove
            
        Raises:
            KeyError: If element not in set
        """

    def __contains__(self, item: tuple) -> bool:
        """
        Test membership of (key, value) pair.
        
        Args:
            item: (key, value) tuple to test
            
        Returns:
            True if value is in set at key
        """

    def intersection(self, key: any, *others) -> set:
        """
        Return intersection with other sets.
        
        Args:
            key: Set key
            *others: Other sets to intersect with
            
        Returns:
            Set intersection
        """

    def union(self, key: any, *others) -> set:
        """
        Return union with other sets.
        
        Args:
            key: Set key
            *others: Other sets to union with
            
        Returns:
            Set union
        """

class SetGlobalTable(SetTable, GlobalTable):
    """Global set table combining set operations with global access."""
    pass

Data Models

Structured data classes for type-safe serialization and deserialization of messages and table values. Models provide schema validation, field typing, and automatic serialization support.

class Model:
    def __init__(self, *args, **kwargs):
        """
        Create model instance with field values.
        
        Args:
            *args: Positional field values
            **kwargs: Named field values
        """

    def dumps(self, *, serializer: str = None) -> bytes:
        """
        Serialize model to bytes.
        
        Args:
            serializer: Serializer to use (defaults to model serializer)
            
        Returns:
            Serialized model data
        """

    @classmethod
    def loads(
        cls,
        s: bytes,
        *,
        serializer: str = None,
        default_serializer: str = None
    ):
        """
        Deserialize model from bytes.
        
        Args:
            s: Serialized data
            serializer: Serializer to use
            default_serializer: Fallback serializer
            
        Returns:
            Model instance
        """

    def asdict(self) -> dict:
        """
        Convert model to dictionary.
        
        Returns:
            Dictionary representation of model
        """

    def derive(self, **fields):
        """
        Create new model instance with updated fields.
        
        Args:
            **fields: Fields to update
            
        Returns:
            New model instance with changes
        """

    @property
    def _options(self) -> 'ModelOptions':
        """Model configuration options."""

class Record(Model):
    """
    Record model with automatic field detection from type annotations.
    
    Example:
        class User(faust.Record):
            id: int
            name: str
            email: str = None
    """
    
    def __init_subclass__(cls, **kwargs):
        """Initialize record subclass with field introspection."""
        super().__init_subclass__(**kwargs)

class ModelOptions:
    def __init__(
        self,
        *,
        serializer: str = None,
        include_metadata: bool = True,
        polymorphic_fields: bool = False,
        allow_blessed_key: bool = False,
        isodates: bool = False,
        decimals: bool = False,
        validation: bool = False,
        **kwargs
    ):
        """
        Model configuration options.
        
        Args:
            serializer: Default serializer
            include_metadata: Include type metadata in serialization
            polymorphic_fields: Support polymorphic field types
            allow_blessed_key: Allow blessed key optimization
            isodates: Parse ISO date strings to datetime objects
            decimals: Use decimal.Decimal for float fields
            validation: Enable field validation
        """

    @property
    def serializer(self) -> str:
        """Default serializer name."""

    @property
    def include_metadata(self) -> bool:
        """Whether to include type metadata."""

Field Types

Type definitions and validation for model fields with automatic conversion and validation support.

from typing import Optional, List, Dict, Any
from datetime import datetime
from decimal import Decimal

class FieldDescriptor:
    def __init__(
        self,
        *,
        required: bool = True,
        default: Any = None,
        default_factory: callable = None,
        coerce: bool = True,
        validator: callable = None,
        exclude: bool = False,
        **kwargs
    ):
        """
        Field descriptor for model attributes.
        
        Args:
            required: Field is required (no None values)
            default: Default value
            default_factory: Factory for default values
            coerce: Attempt type coercion
            validator: Validation function
            exclude: Exclude from serialization
        """

def DatetimeField(*, timezone: str = None, **kwargs) -> datetime:
    """
    Datetime field with timezone support.
    
    Args:
        timezone: Timezone name (e.g., 'UTC')
        **kwargs: Additional field options
        
    Returns:
        Field descriptor for datetime values
    """

def DecimalField(*, max_digits: int = None, decimal_places: int = None, **kwargs) -> Decimal:
    """
    Decimal field for precise numeric values.
    
    Args:
        max_digits: Maximum number of digits
        decimal_places: Number of decimal places
        **kwargs: Additional field options
        
    Returns:
        Field descriptor for Decimal values
    """

class StringField(FieldDescriptor):
    """
    String field descriptor for text values.
    
    Provides validation and processing for string-type model fields.
    """

def maybe_model(arg: any) -> any:
    """
    Convert dictionary to model instance if it has model metadata.
    
    Checks if the argument is a dictionary with Faust model metadata
    and converts it to the appropriate model instance.
    
    Args:
        arg: Value to potentially convert to model
        
    Returns:
        Model instance if arg contains model metadata, otherwise arg unchanged
    """

registry: dict = {}
"""
Global registry of model classes by namespace.

Maps model namespace strings to their corresponding model classes,
enabling deserialization of models from their serialized representations.
"""

Usage Examples

Basic Table Operations

import faust

app = faust.App('table-app', broker='kafka://localhost:9092')

# Create a table with default values
user_scores = app.Table('user-scores', default=int)

@app.agent()
async def update_scores(stream):
    async for event in stream:
        user_id = event['user_id']
        points = event['points']
        
        # Increment user score
        user_scores[user_id] += points
        
        print(f"User {user_id} now has {user_scores[user_id]} points")

# Access table data
@app.timer(interval=30.0)
async def print_leaderboard():
    top_users = sorted(
        user_scores.items(),
        key=lambda x: x[1],
        reverse=True
    )[:10]
    
    for user_id, score in top_users:
        print(f"{user_id}: {score}")

Windowed Tables

from faust import TumblingWindow

# Table with time-based windows
hourly_stats = app.Table(
    'hourly-stats',
    default=lambda: {'count': 0, 'total': 0},
    window=TumblingWindow(3600)  # 1 hour windows
)

@app.agent()
async def collect_stats(stream):
    async for event in stream:
        key = event['category']
        value = event['value']
        
        # Update stats for current hour
        stats = hourly_stats[key]
        stats['count'] += 1
        stats['total'] += value
        hourly_stats[key] = stats

Structured Data Models

class Order(faust.Record):
    id: int
    customer_id: str
    product_id: str
    quantity: int
    price: float
    timestamp: datetime
    
    class Meta:
        serializer = 'json'

class OrderStatus(faust.Record):
    order_id: int
    status: str
    updated_at: datetime

# Use models with topics and tables
orders_topic = app.topic('orders', value_type=Order)
order_status_table = app.Table('order-status', value_type=OrderStatus)

@app.agent(orders_topic)
async def process_orders(orders):
    async for order in orders:
        # Type-safe access to order fields
        print(f"Processing order {order.id} for {order.quantity} units")
        
        # Store order status
        status = OrderStatus(
            order_id=order.id,
            status='processing',
            updated_at=datetime.utcnow()
        )
        order_status_table[order.id] = status

Set Table Operations

# Track user sessions
user_sessions = app.SetTable('user-sessions')

@app.agent()
async def track_sessions(events):
    async for event in events:
        user_id = event['user_id']
        session_id = event['session_id']
        action = event['action']
        
        if action == 'login':
            user_sessions.add(user_id, session_id)
        elif action == 'logout':
            user_sessions.discard(user_id, session_id)

# Check active sessions
@app.timer(interval=60.0)
async def monitor_sessions():
    for user_id in user_sessions.keys():
        sessions = user_sessions[user_id]
        if len(sessions) > 5:
            print(f"User {user_id} has {len(sessions)} active sessions")

Type Interfaces

from typing import Protocol, Iterator, Any, Optional, Callable, Dict

class TableT(Protocol):
    """Type interface for Table."""
    
    name: str
    key_type: Optional[type]
    value_type: Optional[type]
    
    def __getitem__(self, key: Any) -> Any: ...
    def __setitem__(self, key: Any, value: Any) -> None: ...
    def __delitem__(self, key: Any) -> None: ...
    def __contains__(self, key: Any) -> bool: ...
    
    def get(self, key: Any, default: Any = None) -> Any: ...
    def items(self) -> Iterator: ...
    def keys(self) -> Iterator: ...
    def values(self) -> Iterator: ...

class ModelT(Protocol):
    """Type interface for Model."""
    
    def dumps(self, *, serializer: Optional[str] = None) -> bytes: ...
    
    @classmethod
    def loads(cls, s: bytes, **kwargs) -> 'ModelT': ...
    
    def asdict(self) -> Dict[str, Any]: ...
    def derive(self, **fields) -> 'ModelT': ...

Install with Tessl CLI

npx tessl i tessl/pypi-faust

docs

authentication.md

cli-framework.md

core-application.md

data-management.md

index.md

monitoring.md

serialization.md

stream-processing.md

topics-channels.md

windowing.md

worker-management.md

tile.json