or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

authentication.mdcli-framework.mdcore-application.mddata-management.mdindex.mdmonitoring.mdserialization.mdstream-processing.mdtopics-channels.mdwindowing.mdworker-management.md
tile.json

tessl/pypi-faust

Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/faust@1.10.x

To install, run

npx @tessl/cli install tessl/pypi-faust@1.10.0

index.mddocs/

Faust

A Python stream processing library that ports the ideas from Kafka Streams to Python. Faust enables building distributed systems and real-time data pipelines that process streams of data with state management, automatic failover, and changelog-based replication.

Package Information

  • Package Name: faust
  • Language: Python
  • Python Version: >=3.6.0
  • Installation: pip install faust
  • License: BSD-3-Clause

Core Imports

import faust

Common imports for stream processing:

from faust import App, Agent, Stream, Table, Record

Basic Usage

import faust

# Create a Faust application
app = faust.App('word-count-app', broker='kafka://localhost:9092')

# Define a data model
class WordCount(faust.Record):
    word: str
    count: int

# Create a topic
words_topic = app.topic('words', value_type=str)
word_counts_table = app.Table('word-counts', default=int)

# Define a stream processing agent
@app.agent(words_topic)
async def count_words(words):
    async for word in words:
        word_counts_table[word] += 1
        
# Define a timer to print current counts
@app.timer(interval=10.0)
async def print_counts():
    for word, count in word_counts_table.items():
        print(f'{word}: {count}')

# Run the application
if __name__ == '__main__':
    app.main()

Architecture

Faust follows a decorator-based architecture centered around the App class:

  • App: Main application container that manages agents, topics, tables, and services
  • Agents: Stream processing functions decorated with @app.agent() that consume from channels
  • Topics: Named channels backed by Kafka topics for message distribution
  • Streams: Async iterators over events in channels with transformation capabilities
  • Tables: Distributed key-value stores with changelog-based replication for stateful processing
  • Events: Message containers with key, value, headers, and acknowledgment capabilities
  • Models: Structured data classes for type-safe serialization/deserialization

This design enables building scalable, fault-tolerant stream processing applications that integrate seamlessly with the Python ecosystem.

Capabilities

Core Application Framework

The foundational components for creating and managing Faust applications, including the main App class and decorators for defining agents, topics, tables, web endpoints, and CLI commands.

class App:
    def __init__(self, id: str, *, broker: str = None, **kwargs): ...
    def agent(self, channel=None, *, name=None, concurrency=1, **kwargs): ...
    def topic(self, topic: str, *, key_type=None, value_type=None, **kwargs): ...
    def table(self, name: str, *, default=None, window=None, **kwargs): ...
    def timer(self, seconds: float, *, on_error=None, **kwargs): ...
    def crontab(self, cron_format: str, *, timezone=None, **kwargs): ...
    def command(self, *options, base=None, **kwargs): ...
    def page(self, path: str, *, cors_options=None, **kwargs): ...
    def main(self): ...

Core Application

Stream Processing

Stream processing components including agents for consuming data streams, stream transformation operations, and event handling for building real-time data processing pipelines.

class Agent:
    async def send(self, value=None, *, key=None, partition=None): ...
    async def ask(self, value=None, *, key=None, reply_to=None, **kwargs): ...
    def cast(self, value=None, *, key=None, partition=None): ...

class Stream:
    def filter(self, fun): ...
    def map(self, fun): ...
    def group_by(self, key, *, name=None): ...
    def take(self, max_: int, *, within=None): ...
    def rate_limit(self, rate: float, *, per=1.0): ...
    def through(self, channel, **kwargs): ...

class Event:
    def ack(self): ...
    def reject(self): ...
    async def send(self, channel, key=None, value=None, **kwargs): ...
    async def forward(self, channel, *, key=None, value=None, **kwargs): ...

def current_event() -> Event: ...

Stream Processing

Topics and Channels

Topic and channel management for message distribution, including configuration, partitioning, serialization, and integration with Kafka infrastructure.

class Topic:
    async def send(self, key=None, value=None, *, partition=None, **kwargs): ...
    def send_soon(self, key=None, value=None, *, partition=None, **kwargs): ...
    def stream(self, **kwargs) -> Stream: ...
    def events(self, **kwargs) -> Stream: ...
    def get_partition_key(self, key, partition): ...

class Channel:
    async def send(self, value=None, *, key=None, partition=None, **kwargs): ...
    def send_soon(self, value=None, *, key=None, partition=None, **kwargs): ...
    def stream(self, **kwargs) -> Stream: ...
    def events(self, **kwargs) -> Stream: ...

Topics and Channels

Data Management

Stateful data management through tables and models, including distributed key-value storage, windowing operations, and structured data modeling with serialization.

class Table:
    def __getitem__(self, key): ...
    def __setitem__(self, key, value): ...
    def get(self, key, default=None): ...
    def setdefault(self, key, default=None): ...
    def pop(self, key, *default): ...
    def items(): ...
    def keys(): ...
    def values(): ...
    def clear(self): ...

class GlobalTable(Table): ...
class SetTable: ...
class SetGlobalTable: ...

class Model:
    def dumps(self, *, serializer=None) -> bytes: ...
    @classmethod
    def loads(cls, s: bytes, *, serializer=None): ...
    def asdict(self) -> dict: ...
    def derive(self, **fields): ...

class Record(Model): ...
class ModelOptions: ...

class FieldDescriptor: ...
class StringField(FieldDescriptor): ...

def maybe_model(arg) -> any: ...

registry: dict = {}

Data Management

Serialization

Data serialization and schema management for type-safe message handling, including codecs for different data formats and schema definitions for structured data.

class Schema:
    def loads_key(self, app, message, *, loads=None, serializer=None): ...
    def loads_value(self, app, message, *, loads=None, serializer=None): ...
    def dumps_key(self, app, key, *, serializer=None) -> bytes: ...
    def dumps_value(self, app, value, *, serializer=None) -> bytes: ...

class Codec:
    def encode(self, obj) -> bytes: ...
    def decode(self, data: bytes): ...

Serialization

Authentication

Security and authentication mechanisms for secure connections to Kafka brokers, including SSL, SASL, and GSSAPI credential management.

class SSLCredentials:
    def __init__(self, *, context=None, purpose=None, cafile=None, **kwargs): ...

class SASLCredentials:
    def __init__(self, *, mechanism=None, username=None, password=None, **kwargs): ...

class GSSAPICredentials:
    def __init__(self, *, kerberos_service_name='kafka', **kwargs): ...

Authentication

Windowing

Time-based windowing operations for temporal data aggregation, including tumbling, hopping, and sliding window implementations for stream analytics.

class Window: ...

class TumblingWindow(Window):
    def __init__(self, size: float, *, expires=None): ...

class HoppingWindow(Window):
    def __init__(self, size: float, step: float, *, expires=None): ...

class SlidingWindow(Window):
    def __init__(self, before: float, after: float, *, expires=None): ...

Windowing

Monitoring and Sensors

Monitoring, metrics collection, and sensor framework for observability, including custom sensor implementations and integration with monitoring systems.

class Sensor:
    def on_message_in(self, tp, offset, message): ...
    def on_message_out(self, tp, offset, message): ...
    def on_table_get(self, table, key): ...
    def on_table_set(self, table, key, value): ...
    def on_commit_completed(self, consumer, state): ...

class Monitor(Sensor): ...

Monitoring

CLI Framework

Command-line interface framework for building application-specific commands, including argument parsing, option handling, and integration with the Faust CLI system.

class Command:
    def run(self, *args, **kwargs): ...

class AppCommand(Command):
    def run(self, *args, **kwargs): ...

class argument:
    def __init__(self, *args, **kwargs): ...
    def __call__(self, fun): ...

class option:
    def __init__(self, *args, **kwargs): ...
    def __call__(self, fun): ...

def call_command(command: str, args=None, **kwargs) -> tuple: ...

CLI Framework

Worker Management

Worker process management and service coordination, including application lifecycle management, process coordination, and service orchestration.

class Worker:
    def start(self): ...
    def stop(self): ...
    def restart(self): ...

class Service: ...
class ServiceT: ...

Worker Management

Type Interfaces

Faust provides comprehensive type interfaces for static type checking:

from typing import Protocol

class AppT(Protocol): ...
class AgentT(Protocol): ...
class ChannelT(Protocol): ...
class EventT(Protocol): ...
class StreamT(Protocol): ...
class TopicT(Protocol): ...
class ServiceT(Protocol): ...

Configuration

Application configuration through the Settings class:

class Settings:
    # Configuration options for brokers, serialization, etc.
    pass

Utilities

Utility functions and helpers:

def uuid() -> str: ...