Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines
—
Stateful data management through tables and models in Faust applications. Tables provide distributed key-value storage with changelog-based replication, while models offer structured data definitions with type-safe serialization and validation capabilities.
Distributed key-value storage interface with changelog-based replication for stateful stream processing. Tables automatically maintain consistency across application instances and provide both local and global access patterns.
class Table:
def __init__(
self,
app: App,
*,
name: str,
default: callable = None,
key_type: type = None,
value_type: type = None,
partitions: int = None,
window: Window = None,
changelog_topic: Topic = None,
help: str = None,
**kwargs
):
"""
Create a new distributed table.
Args:
app: The Faust application instance
name: Table name (used for changelog topic)
default: Default value factory function
key_type: Type for table keys
value_type: Type for table values
partitions: Number of changelog partitions
window: Window specification for windowed tables
changelog_topic: Custom changelog topic
help: Help text for CLI
"""
def __getitem__(self, key: any) -> any:
"""
Get value by key.
Args:
key: Table key
Returns:
Value associated with key
Raises:
KeyError: If key not found and no default
"""
def __setitem__(self, key: any, value: any) -> None:
"""
Set key-value pair.
Args:
key: Table key
value: Value to store
"""
def __delitem__(self, key: any) -> None:
"""
Delete key from table.
Args:
key: Key to delete
Raises:
KeyError: If key not found
"""
def __contains__(self, key: any) -> bool:
"""
Check if key exists in table.
Args:
key: Key to check
Returns:
True if key exists
"""
def get(self, key: any, default: any = None) -> any:
"""
Get value by key with optional default.
Args:
key: Table key
default: Default value if key not found
Returns:
Value or default
"""
def setdefault(self, key: any, default: any = None) -> any:
"""
Get value or set and return default.
Args:
key: Table key
default: Default value to set if key missing
Returns:
Existing value or newly set default
"""
def pop(self, key: any, *default) -> any:
"""
Remove key and return value.
Args:
key: Key to remove
*default: Optional default if key not found
Returns:
Value that was removed
Raises:
KeyError: If key not found and no default provided
"""
def update(self, *args, **kwargs) -> None:
"""
Update table with key-value pairs.
Args:
*args: Mapping or iterable of pairs
**kwargs: Keyword arguments as key-value pairs
"""
def clear(self) -> None:
"""Remove all items from table."""
def items(self) -> Iterator:
"""
Iterate over key-value pairs.
Returns:
Iterator of (key, value) tuples
"""
def keys(self) -> Iterator:
"""
Iterate over keys.
Returns:
Iterator of keys
"""
def values(self) -> Iterator:
"""
Iterate over values.
Returns:
Iterator of values
"""
def copy(self) -> dict:
"""
Create a dictionary copy of table contents.
Returns:
Dictionary with current table state
"""
@property
def name(self) -> str:
"""Table name."""
@property
def default(self) -> callable:
"""Default value factory."""
@property
def key_type(self) -> type:
"""Type for table keys."""
@property
def value_type(self) -> type:
"""Type for table values."""
@property
def changelog_topic(self) -> Topic:
"""Changelog topic for replication."""Global table providing read-only access to table data across all application instances, regardless of partition assignment. Useful for lookup tables and reference data that all instances need access to.
class GlobalTable(Table):
def __init__(
self,
app: App,
*,
name: str,
default: callable = None,
key_type: type = None,
value_type: type = None,
changelog_topic: Topic = None,
help: str = None,
**kwargs
):
"""
Create a new global table with read access from all instances.
Args:
app: The Faust application instance
name: Table name
default: Default value factory function
key_type: Type for table keys
value_type: Type for table values
changelog_topic: Custom changelog topic
help: Help text for CLI
"""
def __setitem__(self, key: any, value: any) -> None:
"""
Set operations not supported on global tables.
Raises:
NotImplementedError: Global tables are read-only
"""
def __delitem__(self, key: any) -> None:
"""
Delete operations not supported on global tables.
Raises:
NotImplementedError: Global tables are read-only
"""Specialized table implementations for storing sets of values, providing set operations and membership testing with distributed consistency.
class SetTable:
def __init__(
self,
app: App,
*,
name: str,
key_type: type = None,
value_type: type = None,
partitions: int = None,
changelog_topic: Topic = None,
help: str = None,
**kwargs
):
"""
Create a distributed set table.
Args:
app: The Faust application instance
name: Table name
key_type: Type for set keys
value_type: Type for set elements
partitions: Number of changelog partitions
changelog_topic: Custom changelog topic
help: Help text for CLI
"""
def add(self, key: any, value: any) -> None:
"""
Add element to set at key.
Args:
key: Set key
value: Element to add
"""
def discard(self, key: any, value: any) -> None:
"""
Remove element from set at key if present.
Args:
key: Set key
value: Element to remove
"""
def remove(self, key: any, value: any) -> None:
"""
Remove element from set at key.
Args:
key: Set key
value: Element to remove
Raises:
KeyError: If element not in set
"""
def __contains__(self, item: tuple) -> bool:
"""
Test membership of (key, value) pair.
Args:
item: (key, value) tuple to test
Returns:
True if value is in set at key
"""
def intersection(self, key: any, *others) -> set:
"""
Return intersection with other sets.
Args:
key: Set key
*others: Other sets to intersect with
Returns:
Set intersection
"""
def union(self, key: any, *others) -> set:
"""
Return union with other sets.
Args:
key: Set key
*others: Other sets to union with
Returns:
Set union
"""
class SetGlobalTable(SetTable, GlobalTable):
"""Global set table combining set operations with global access."""
passStructured data classes for type-safe serialization and deserialization of messages and table values. Models provide schema validation, field typing, and automatic serialization support.
class Model:
def __init__(self, *args, **kwargs):
"""
Create model instance with field values.
Args:
*args: Positional field values
**kwargs: Named field values
"""
def dumps(self, *, serializer: str = None) -> bytes:
"""
Serialize model to bytes.
Args:
serializer: Serializer to use (defaults to model serializer)
Returns:
Serialized model data
"""
@classmethod
def loads(
cls,
s: bytes,
*,
serializer: str = None,
default_serializer: str = None
):
"""
Deserialize model from bytes.
Args:
s: Serialized data
serializer: Serializer to use
default_serializer: Fallback serializer
Returns:
Model instance
"""
def asdict(self) -> dict:
"""
Convert model to dictionary.
Returns:
Dictionary representation of model
"""
def derive(self, **fields):
"""
Create new model instance with updated fields.
Args:
**fields: Fields to update
Returns:
New model instance with changes
"""
@property
def _options(self) -> 'ModelOptions':
"""Model configuration options."""
class Record(Model):
"""
Record model with automatic field detection from type annotations.
Example:
class User(faust.Record):
id: int
name: str
email: str = None
"""
def __init_subclass__(cls, **kwargs):
"""Initialize record subclass with field introspection."""
super().__init_subclass__(**kwargs)
class ModelOptions:
def __init__(
self,
*,
serializer: str = None,
include_metadata: bool = True,
polymorphic_fields: bool = False,
allow_blessed_key: bool = False,
isodates: bool = False,
decimals: bool = False,
validation: bool = False,
**kwargs
):
"""
Model configuration options.
Args:
serializer: Default serializer
include_metadata: Include type metadata in serialization
polymorphic_fields: Support polymorphic field types
allow_blessed_key: Allow blessed key optimization
isodates: Parse ISO date strings to datetime objects
decimals: Use decimal.Decimal for float fields
validation: Enable field validation
"""
@property
def serializer(self) -> str:
"""Default serializer name."""
@property
def include_metadata(self) -> bool:
"""Whether to include type metadata."""Type definitions and validation for model fields with automatic conversion and validation support.
from typing import Optional, List, Dict, Any
from datetime import datetime
from decimal import Decimal
class FieldDescriptor:
def __init__(
self,
*,
required: bool = True,
default: Any = None,
default_factory: callable = None,
coerce: bool = True,
validator: callable = None,
exclude: bool = False,
**kwargs
):
"""
Field descriptor for model attributes.
Args:
required: Field is required (no None values)
default: Default value
default_factory: Factory for default values
coerce: Attempt type coercion
validator: Validation function
exclude: Exclude from serialization
"""
def DatetimeField(*, timezone: str = None, **kwargs) -> datetime:
"""
Datetime field with timezone support.
Args:
timezone: Timezone name (e.g., 'UTC')
**kwargs: Additional field options
Returns:
Field descriptor for datetime values
"""
def DecimalField(*, max_digits: int = None, decimal_places: int = None, **kwargs) -> Decimal:
"""
Decimal field for precise numeric values.
Args:
max_digits: Maximum number of digits
decimal_places: Number of decimal places
**kwargs: Additional field options
Returns:
Field descriptor for Decimal values
"""
class StringField(FieldDescriptor):
"""
String field descriptor for text values.
Provides validation and processing for string-type model fields.
"""
def maybe_model(arg: any) -> any:
"""
Convert dictionary to model instance if it has model metadata.
Checks if the argument is a dictionary with Faust model metadata
and converts it to the appropriate model instance.
Args:
arg: Value to potentially convert to model
Returns:
Model instance if arg contains model metadata, otherwise arg unchanged
"""
registry: dict = {}
"""
Global registry of model classes by namespace.
Maps model namespace strings to their corresponding model classes,
enabling deserialization of models from their serialized representations.
"""import faust
app = faust.App('table-app', broker='kafka://localhost:9092')
# Create a table with default values
user_scores = app.Table('user-scores', default=int)
@app.agent()
async def update_scores(stream):
async for event in stream:
user_id = event['user_id']
points = event['points']
# Increment user score
user_scores[user_id] += points
print(f"User {user_id} now has {user_scores[user_id]} points")
# Access table data
@app.timer(interval=30.0)
async def print_leaderboard():
top_users = sorted(
user_scores.items(),
key=lambda x: x[1],
reverse=True
)[:10]
for user_id, score in top_users:
print(f"{user_id}: {score}")from faust import TumblingWindow
# Table with time-based windows
hourly_stats = app.Table(
'hourly-stats',
default=lambda: {'count': 0, 'total': 0},
window=TumblingWindow(3600) # 1 hour windows
)
@app.agent()
async def collect_stats(stream):
async for event in stream:
key = event['category']
value = event['value']
# Update stats for current hour
stats = hourly_stats[key]
stats['count'] += 1
stats['total'] += value
hourly_stats[key] = statsclass Order(faust.Record):
id: int
customer_id: str
product_id: str
quantity: int
price: float
timestamp: datetime
class Meta:
serializer = 'json'
class OrderStatus(faust.Record):
order_id: int
status: str
updated_at: datetime
# Use models with topics and tables
orders_topic = app.topic('orders', value_type=Order)
order_status_table = app.Table('order-status', value_type=OrderStatus)
@app.agent(orders_topic)
async def process_orders(orders):
async for order in orders:
# Type-safe access to order fields
print(f"Processing order {order.id} for {order.quantity} units")
# Store order status
status = OrderStatus(
order_id=order.id,
status='processing',
updated_at=datetime.utcnow()
)
order_status_table[order.id] = status# Track user sessions
user_sessions = app.SetTable('user-sessions')
@app.agent()
async def track_sessions(events):
async for event in events:
user_id = event['user_id']
session_id = event['session_id']
action = event['action']
if action == 'login':
user_sessions.add(user_id, session_id)
elif action == 'logout':
user_sessions.discard(user_id, session_id)
# Check active sessions
@app.timer(interval=60.0)
async def monitor_sessions():
for user_id in user_sessions.keys():
sessions = user_sessions[user_id]
if len(sessions) > 5:
print(f"User {user_id} has {len(sessions)} active sessions")from typing import Protocol, Iterator, Any, Optional, Callable, Dict
class TableT(Protocol):
"""Type interface for Table."""
name: str
key_type: Optional[type]
value_type: Optional[type]
def __getitem__(self, key: Any) -> Any: ...
def __setitem__(self, key: Any, value: Any) -> None: ...
def __delitem__(self, key: Any) -> None: ...
def __contains__(self, key: Any) -> bool: ...
def get(self, key: Any, default: Any = None) -> Any: ...
def items(self) -> Iterator: ...
def keys(self) -> Iterator: ...
def values(self) -> Iterator: ...
class ModelT(Protocol):
"""Type interface for Model."""
def dumps(self, *, serializer: Optional[str] = None) -> bytes: ...
@classmethod
def loads(cls, s: bytes, **kwargs) -> 'ModelT': ...
def asdict(self) -> Dict[str, Any]: ...
def derive(self, **fields) -> 'ModelT': ...Install with Tessl CLI
npx tessl i tessl/pypi-faust