Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines
—
The foundational components for creating and managing Faust applications. The App class serves as the central orchestrator, providing decorators and methods for defining stream processing logic, data storage, web endpoints, and CLI commands.
The main Faust application class that coordinates all components including agents, topics, tables, services, and web endpoints. Acts as the entry point for building distributed stream processing applications.
class App:
def __init__(
self,
id: str,
*,
broker: str = None,
autodiscover: bool = True,
origin: str = None,
canonical_url: str = None,
broker_consumer: str = None,
broker_producer: str = None,
cache: str = None,
web: str = None,
web_enabled: bool = True,
web_transport: str = None,
web_cors_options: dict = None,
logging_config: dict = None,
loghandlers: list = None,
datadir: str = None,
tabledir: str = None,
debug: bool = None,
quiet: bool = None,
no_color: bool = None,
blocking_timeout: float = 10.0,
broker_heartbeat: float = 0.0,
broker_commit_interval: float = 2.8,
broker_commit_livelock_soft_timeout: float = 300.0,
broker_commit_livelock_hard_timeout: float = 600.0,
broker_session_timeout: float = None,
broker_request_timeout: float = None,
broker_retry_backoff_type: str = 'exponential',
broker_retry_max_delay: float = 32.0,
broker_retry_delay: float = 0.1,
broker_api_version: str = 'auto',
topic_replication_factor: int = None,
topic_partitions: int = None,
topic_allow_declare: bool = True,
topic_disable_leader: bool = False,
id_format: str = '{id}-{self.version}',
stream_buffer_maxsize: int = 4096,
stream_wait_empty: bool = True,
stream_ack_cancelled_tasks: bool = True,
stream_ack_exceptions: bool = True,
stream_publish_on_commit: bool = False,
stream_recovery_delay: float = 10.0,
producer_linger: float = 0.0,
producer_max_batch_size: int = 16384,
producer_acks: int = -1,
producer_max_request_size: int = 1000000,
producer_compression_type: str = None,
producer_partitioner: str = None,
producer_request_timeout: float = None,
producer_api_version: str = None,
consumer_max_fetch_size: int = 4194304,
consumer_auto_offset_reset: str = 'earliest',
consumer_connections_max_idle: float = None,
consumer_request_timeout: float = None,
consumer_api_version: str = None,
consumer_session_timeout: float = None,
consumer_heartbeat_interval: float = None,
consumer_max_poll_records: int = None,
consumer_max_poll_interval: float = None,
consumer_rebalance_timeout: float = None,
consumer_group_instance_id: str = None,
web_bind: str = 'localhost',
web_port: int = 6066,
web_host: str = None,
web_in_thread: bool = False,
worker_redirect_stdouts: bool = None,
worker_redirect_stdouts_level: int = None,
reply_to: str = None,
reply_to_prefix: str = None,
reply_expires: float = 120.0,
reply_create_topic: bool = False,
ssl_context: object = None,
store_check_exists: bool = True,
table_cleanup_interval: float = 30.0,
table_key_index_size: int = 1000,
table_standby_replicas: int = 1,
timezone: str = None,
**kwargs
):
"""
Create a new Faust application.
Args:
id: Unique application identifier
broker: Kafka broker URL (e.g., 'kafka://localhost:9092')
autodiscover: Enable automatic discovery of agents and tasks
datadir: Directory for storing application data
web_enabled: Enable web server for HTTP endpoints
web_port: Port for web server
topic_partitions: Default number of partitions for new topics
**kwargs: Additional configuration options
"""Usage Example:
import faust
# Basic application
app = faust.App('my-app', broker='kafka://localhost:9092')
# Application with custom configuration
app = faust.App(
'my-app',
broker='kafka://localhost:9092',
web_port=8080,
topic_partitions=8,
datadir='/var/faust-data',
logging_config={'level': 'INFO'}
)Decorator for creating stream processing agents that consume from channels or topics. Agents are async functions that process streams of data with automatic scaling and fault tolerance.
def agent(
self,
channel=None,
*,
name: str = None,
concurrency: int = 1,
sink: list = None,
on_error: callable = None,
supervisor_strategy: str = None,
help: str = None,
**kwargs
):
"""
Decorator to define a stream processing agent.
Args:
channel: Channel or topic to consume from
name: Agent name (defaults to function name)
concurrency: Number of concurrent instances
sink: List of channels to forward results to
on_error: Error handler function
supervisor_strategy: Strategy for handling agent failures
help: Help text for CLI
**kwargs: Additional agent options
Returns:
Agent decorator function
"""Usage Example:
# Basic agent
@app.agent(app.topic('orders'))
async def process_orders(orders):
async for order in orders:
print(f'Processing order: {order}')
# Agent with concurrency and error handling
@app.agent(
app.topic('payments'),
concurrency=5,
on_error=lambda agent, exc: print(f'Error: {exc}')
)
async def process_payments(payments):
async for payment in payments:
# Process payment
await process_payment(payment)Method for defining Kafka topics with type safety, serialization, and partitioning configuration.
def topic(
self,
topic: str,
*,
key_type: type = None,
value_type: type = None,
key_serializer: str = None,
value_serializer: str = None,
partitions: int = None,
retention: float = None,
compacting: bool = None,
deleting: bool = None,
replicas: int = None,
acks: bool = True,
delivery_guarantee: str = 'at_least_once',
maxsize: int = None,
root: str = None,
config: dict = None,
**kwargs
):
"""
Define a Kafka topic for the application.
Args:
topic: Topic name
key_type: Type for message keys
value_type: Type for message values
key_serializer: Serializer for keys ('json', 'raw', etc.)
value_serializer: Serializer for values ('json', 'pickle', etc.)
partitions: Number of partitions
retention: Message retention time in seconds
compacting: Enable log compaction
replicas: Replication factor
acks: Require broker acknowledgment
delivery_guarantee: 'at_least_once', 'at_most_once', 'exactly_once'
config: Additional Kafka topic configuration
Returns:
Topic object
"""Usage Example:
# Basic topic
orders_topic = app.topic('orders', value_type=str)
# Typed topic with custom serialization
from faust import Record
class Order(Record):
id: int
amount: float
customer: str
orders_topic = app.topic(
'orders',
key_type=int,
value_type=Order,
partitions=16,
retention=86400.0 # 24 hours
)Method for creating distributed key-value tables with changelog-based replication and windowing support.
def table(
self,
name: str,
*,
default: callable = None,
window: object = None,
partitions: int = None,
help: str = None,
**kwargs
):
"""
Create a distributed table for stateful processing.
Args:
name: Table name
default: Default value factory function
window: Window specification for windowed tables
partitions: Number of partitions
help: Help text for CLI
**kwargs: Additional table options
Returns:
Table object
"""Usage Example:
# Basic table
word_counts = app.Table('word-counts', default=int)
# Table with custom default
user_profiles = app.Table('user-profiles', default=dict)
# Windowed table for time-based aggregation
from faust import TumblingWindow
windowed_counts = app.Table(
'hourly-counts',
default=int,
window=TumblingWindow(3600.0) # 1 hour windows
)Decorator for creating periodic background tasks that execute at regular intervals.
def timer(
self,
seconds: float,
*,
on_error: callable = None,
**kwargs
):
"""
Decorator for periodic timer tasks.
Args:
seconds: Interval between executions in seconds
on_error: Error handler function
**kwargs: Additional timer options
Returns:
Timer decorator function
"""Usage Example:
# Basic timer
@app.timer(interval=30.0)
async def cleanup_task():
print("Running cleanup...")
# Cleanup logic here
# Timer with error handling
@app.timer(
interval=60.0,
on_error=lambda timer, exc: print(f'Timer error: {exc}')
)
async def health_check():
# Health check logic
await check_system_health()Decorator for creating cron-style scheduled tasks using cron expressions.
def crontab(
self,
cron_format: str,
*,
timezone: str = None,
on_error: callable = None,
**kwargs
):
"""
Decorator for cron-scheduled tasks.
Args:
cron_format: Cron expression (e.g., '0 */2 * * *')
timezone: Timezone for scheduling
on_error: Error handler function
**kwargs: Additional cron options
Returns:
Cron decorator function
"""Usage Example:
# Daily task at midnight
@app.crontab('0 0 * * *')
async def daily_report():
print("Generating daily report...")
# Every 15 minutes with timezone
@app.crontab('*/15 * * * *', timezone='UTC')
async def sync_data():
await synchronize_external_data()Decorator for creating HTTP endpoints and web pages integrated with the Faust web server.
def page(
self,
path: str,
*,
base: object = None,
cors_options: dict = None,
name: str = None
):
"""
Decorator for HTTP endpoints.
Args:
path: URL path pattern
base: Base view class
cors_options: CORS configuration
name: Endpoint name
Returns:
Web page decorator function
"""Usage Example:
# Basic web endpoint
@app.page('/health')
async def health_check(web, request):
return web.json({'status': 'healthy'})
# Endpoint with parameters
@app.page('/orders/{order_id}')
async def get_order(web, request, order_id):
order = await get_order_by_id(order_id)
return web.json(order.asdict())Decorator for creating application-specific CLI commands integrated with the Faust command-line interface.
def command(
self,
*options,
base: object = None,
**kwargs
):
"""
Decorator for CLI commands.
Args:
*options: Click command options
base: Base command class
**kwargs: Additional command options
Returns:
Command decorator function
"""Usage Example:
import click
# Basic command
@app.command()
async def hello():
print("Hello from Faust!")
# Command with arguments
@app.command(
click.argument('name'),
click.option('--count', default=1, help='Number of greetings')
)
async def greet(name, count):
for i in range(count):
print(f"Hello {name}!")Methods for managing the application lifecycle including startup, shutdown, and main execution.
def main(self):
"""
Main entry point for running the application.
Handles command-line arguments and starts the application.
"""
async def start(self):
"""
Start the application and all its services.
"""
async def stop(self):
"""
Stop the application and clean up resources.
"""
def loop(self):
"""
Get the asyncio event loop for the application.
Returns:
Event loop instance
"""Usage Example:
# Standard application entry point
if __name__ == '__main__':
app.main()
# Programmatic control
import asyncio
async def run_app():
await app.start()
# Application running...
await app.stop()
asyncio.run(run_app())from typing import Protocol
class AppT(Protocol):
"""Type interface for Faust applications."""
id: str
broker: str
def agent(self, channel=None, **kwargs): ...
def topic(self, topic: str, **kwargs): ...
def table(self, name: str, **kwargs): ...
def main(self): ...Install with Tessl CLI
npx tessl i tessl/pypi-faust