CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-faust

Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines

Pending
Overview
Eval results
Files

core-application.mddocs/

Core Application Framework

The foundational components for creating and managing Faust applications. The App class serves as the central orchestrator, providing decorators and methods for defining stream processing logic, data storage, web endpoints, and CLI commands.

Capabilities

Application Class

The main Faust application class that coordinates all components including agents, topics, tables, services, and web endpoints. Acts as the entry point for building distributed stream processing applications.

class App:
    def __init__(
        self,
        id: str,
        *,
        broker: str = None,
        autodiscover: bool = True,
        origin: str = None,
        canonical_url: str = None,
        broker_consumer: str = None,
        broker_producer: str = None,
        cache: str = None,
        web: str = None,
        web_enabled: bool = True,
        web_transport: str = None,
        web_cors_options: dict = None,
        logging_config: dict = None,
        loghandlers: list = None,
        datadir: str = None,
        tabledir: str = None,
        debug: bool = None,
        quiet: bool = None,
        no_color: bool = None,
        blocking_timeout: float = 10.0,
        broker_heartbeat: float = 0.0,
        broker_commit_interval: float = 2.8,
        broker_commit_livelock_soft_timeout: float = 300.0,
        broker_commit_livelock_hard_timeout: float = 600.0,
        broker_session_timeout: float = None,
        broker_request_timeout: float = None,
        broker_retry_backoff_type: str = 'exponential',
        broker_retry_max_delay: float = 32.0,
        broker_retry_delay: float = 0.1,
        broker_api_version: str = 'auto',
        topic_replication_factor: int = None,
        topic_partitions: int = None,
        topic_allow_declare: bool = True,
        topic_disable_leader: bool = False,
        id_format: str = '{id}-{self.version}',
        stream_buffer_maxsize: int = 4096,
        stream_wait_empty: bool = True,
        stream_ack_cancelled_tasks: bool = True,
        stream_ack_exceptions: bool = True,
        stream_publish_on_commit: bool = False,
        stream_recovery_delay: float = 10.0,
        producer_linger: float = 0.0,
        producer_max_batch_size: int = 16384,
        producer_acks: int = -1,
        producer_max_request_size: int = 1000000,
        producer_compression_type: str = None,
        producer_partitioner: str = None,
        producer_request_timeout: float = None,
        producer_api_version: str = None,
        consumer_max_fetch_size: int = 4194304,
        consumer_auto_offset_reset: str = 'earliest',
        consumer_connections_max_idle: float = None,
        consumer_request_timeout: float = None,
        consumer_api_version: str = None,
        consumer_session_timeout: float = None,
        consumer_heartbeat_interval: float = None,
        consumer_max_poll_records: int = None,
        consumer_max_poll_interval: float = None,
        consumer_rebalance_timeout: float = None,
        consumer_group_instance_id: str = None,
        web_bind: str = 'localhost',
        web_port: int = 6066,
        web_host: str = None,
        web_in_thread: bool = False,
        worker_redirect_stdouts: bool = None,
        worker_redirect_stdouts_level: int = None,
        reply_to: str = None,
        reply_to_prefix: str = None,
        reply_expires: float = 120.0,
        reply_create_topic: bool = False,
        ssl_context: object = None,
        store_check_exists: bool = True,
        table_cleanup_interval: float = 30.0,
        table_key_index_size: int = 1000,
        table_standby_replicas: int = 1,
        timezone: str = None,
        **kwargs
    ):
        """
        Create a new Faust application.
        
        Args:
            id: Unique application identifier
            broker: Kafka broker URL (e.g., 'kafka://localhost:9092')
            autodiscover: Enable automatic discovery of agents and tasks
            datadir: Directory for storing application data
            web_enabled: Enable web server for HTTP endpoints
            web_port: Port for web server
            topic_partitions: Default number of partitions for new topics
            **kwargs: Additional configuration options
        """

Usage Example:

import faust

# Basic application
app = faust.App('my-app', broker='kafka://localhost:9092')

# Application with custom configuration
app = faust.App(
    'my-app',
    broker='kafka://localhost:9092',
    web_port=8080,
    topic_partitions=8,
    datadir='/var/faust-data',
    logging_config={'level': 'INFO'}
)

Agent Decorator

Decorator for creating stream processing agents that consume from channels or topics. Agents are async functions that process streams of data with automatic scaling and fault tolerance.

def agent(
    self,
    channel=None,
    *,
    name: str = None,
    concurrency: int = 1,
    sink: list = None,
    on_error: callable = None,
    supervisor_strategy: str = None,
    help: str = None,
    **kwargs
):
    """
    Decorator to define a stream processing agent.
    
    Args:
        channel: Channel or topic to consume from
        name: Agent name (defaults to function name)
        concurrency: Number of concurrent instances
        sink: List of channels to forward results to
        on_error: Error handler function
        supervisor_strategy: Strategy for handling agent failures
        help: Help text for CLI
        **kwargs: Additional agent options
        
    Returns:
        Agent decorator function
    """

Usage Example:

# Basic agent
@app.agent(app.topic('orders'))
async def process_orders(orders):
    async for order in orders:
        print(f'Processing order: {order}')

# Agent with concurrency and error handling
@app.agent(
    app.topic('payments'),
    concurrency=5,
    on_error=lambda agent, exc: print(f'Error: {exc}')
)
async def process_payments(payments):
    async for payment in payments:
        # Process payment
        await process_payment(payment)

Topic Definition

Method for defining Kafka topics with type safety, serialization, and partitioning configuration.

def topic(
    self,
    topic: str,
    *,
    key_type: type = None,
    value_type: type = None,
    key_serializer: str = None,
    value_serializer: str = None,
    partitions: int = None,
    retention: float = None,
    compacting: bool = None,
    deleting: bool = None,
    replicas: int = None,
    acks: bool = True,
    delivery_guarantee: str = 'at_least_once',
    maxsize: int = None,
    root: str = None,
    config: dict = None,
    **kwargs
):
    """
    Define a Kafka topic for the application.
    
    Args:
        topic: Topic name
        key_type: Type for message keys
        value_type: Type for message values
        key_serializer: Serializer for keys ('json', 'raw', etc.)
        value_serializer: Serializer for values ('json', 'pickle', etc.)
        partitions: Number of partitions
        retention: Message retention time in seconds
        compacting: Enable log compaction
        replicas: Replication factor
        acks: Require broker acknowledgment
        delivery_guarantee: 'at_least_once', 'at_most_once', 'exactly_once'
        config: Additional Kafka topic configuration
        
    Returns:
        Topic object
    """

Usage Example:

# Basic topic
orders_topic = app.topic('orders', value_type=str)

# Typed topic with custom serialization
from faust import Record

class Order(Record):
    id: int
    amount: float
    customer: str

orders_topic = app.topic(
    'orders',
    key_type=int,
    value_type=Order,
    partitions=16,
    retention=86400.0  # 24 hours
)

Table Creation

Method for creating distributed key-value tables with changelog-based replication and windowing support.

def table(
    self,
    name: str,
    *,
    default: callable = None,
    window: object = None,
    partitions: int = None,
    help: str = None,
    **kwargs
):
    """
    Create a distributed table for stateful processing.
    
    Args:
        name: Table name
        default: Default value factory function
        window: Window specification for windowed tables
        partitions: Number of partitions
        help: Help text for CLI
        **kwargs: Additional table options
        
    Returns:
        Table object
    """

Usage Example:

# Basic table
word_counts = app.Table('word-counts', default=int)

# Table with custom default
user_profiles = app.Table('user-profiles', default=dict)

# Windowed table for time-based aggregation
from faust import TumblingWindow

windowed_counts = app.Table(
    'hourly-counts',
    default=int,
    window=TumblingWindow(3600.0)  # 1 hour windows
)

Timer Decorator

Decorator for creating periodic background tasks that execute at regular intervals.

def timer(
    self,
    seconds: float,
    *,
    on_error: callable = None,
    **kwargs
):
    """
    Decorator for periodic timer tasks.
    
    Args:
        seconds: Interval between executions in seconds
        on_error: Error handler function
        **kwargs: Additional timer options
        
    Returns:
        Timer decorator function
    """

Usage Example:

# Basic timer
@app.timer(interval=30.0)
async def cleanup_task():
    print("Running cleanup...")
    # Cleanup logic here

# Timer with error handling
@app.timer(
    interval=60.0,
    on_error=lambda timer, exc: print(f'Timer error: {exc}')
)
async def health_check():
    # Health check logic
    await check_system_health()

Cron Job Decorator

Decorator for creating cron-style scheduled tasks using cron expressions.

def crontab(
    self,
    cron_format: str,
    *,
    timezone: str = None,
    on_error: callable = None,
    **kwargs
):
    """
    Decorator for cron-scheduled tasks.
    
    Args:
        cron_format: Cron expression (e.g., '0 */2 * * *')
        timezone: Timezone for scheduling
        on_error: Error handler function
        **kwargs: Additional cron options
        
    Returns:
        Cron decorator function
    """

Usage Example:

# Daily task at midnight
@app.crontab('0 0 * * *')
async def daily_report():
    print("Generating daily report...")

# Every 15 minutes with timezone
@app.crontab('*/15 * * * *', timezone='UTC')
async def sync_data():
    await synchronize_external_data()

Web Page Decorator

Decorator for creating HTTP endpoints and web pages integrated with the Faust web server.

def page(
    self,
    path: str,
    *,
    base: object = None,
    cors_options: dict = None,
    name: str = None
):
    """
    Decorator for HTTP endpoints.
    
    Args:
        path: URL path pattern
        base: Base view class
        cors_options: CORS configuration
        name: Endpoint name
        
    Returns:
        Web page decorator function
    """

Usage Example:

# Basic web endpoint
@app.page('/health')
async def health_check(web, request):
    return web.json({'status': 'healthy'})

# Endpoint with parameters
@app.page('/orders/{order_id}')
async def get_order(web, request, order_id):
    order = await get_order_by_id(order_id)
    return web.json(order.asdict())

CLI Command Decorator

Decorator for creating application-specific CLI commands integrated with the Faust command-line interface.

def command(
    self,
    *options,
    base: object = None,
    **kwargs
):
    """
    Decorator for CLI commands.
    
    Args:
        *options: Click command options
        base: Base command class
        **kwargs: Additional command options
        
    Returns:
        Command decorator function
    """

Usage Example:

import click

# Basic command
@app.command()
async def hello():
    print("Hello from Faust!")

# Command with arguments
@app.command(
    click.argument('name'),
    click.option('--count', default=1, help='Number of greetings')
)
async def greet(name, count):
    for i in range(count):
        print(f"Hello {name}!")

Application Lifecycle

Methods for managing the application lifecycle including startup, shutdown, and main execution.

def main(self):
    """
    Main entry point for running the application.
    Handles command-line arguments and starts the application.
    """

async def start(self):
    """
    Start the application and all its services.
    """

async def stop(self):
    """
    Stop the application and clean up resources.
    """

def loop(self):
    """
    Get the asyncio event loop for the application.
    
    Returns:
        Event loop instance
    """

Usage Example:

# Standard application entry point
if __name__ == '__main__':
    app.main()

# Programmatic control
import asyncio

async def run_app():
    await app.start()
    # Application running...
    await app.stop()

asyncio.run(run_app())

Type Interfaces

from typing import Protocol

class AppT(Protocol):
    """Type interface for Faust applications."""
    id: str
    broker: str
    
    def agent(self, channel=None, **kwargs): ...
    def topic(self, topic: str, **kwargs): ...
    def table(self, name: str, **kwargs): ...
    def main(self): ...

Install with Tessl CLI

npx tessl i tessl/pypi-faust

docs

authentication.md

cli-framework.md

core-application.md

data-management.md

index.md

monitoring.md

serialization.md

stream-processing.md

topics-channels.md

windowing.md

worker-management.md

tile.json