Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines
npx @tessl/cli install tessl/pypi-faust@1.10.0A Python stream processing library that ports the ideas from Kafka Streams to Python. Faust enables building distributed systems and real-time data pipelines that process streams of data with state management, automatic failover, and changelog-based replication.
pip install faustimport faustCommon imports for stream processing:
from faust import App, Agent, Stream, Table, Recordimport faust
# Create a Faust application
app = faust.App('word-count-app', broker='kafka://localhost:9092')
# Define a data model
class WordCount(faust.Record):
word: str
count: int
# Create a topic
words_topic = app.topic('words', value_type=str)
word_counts_table = app.Table('word-counts', default=int)
# Define a stream processing agent
@app.agent(words_topic)
async def count_words(words):
async for word in words:
word_counts_table[word] += 1
# Define a timer to print current counts
@app.timer(interval=10.0)
async def print_counts():
for word, count in word_counts_table.items():
print(f'{word}: {count}')
# Run the application
if __name__ == '__main__':
app.main()Faust follows a decorator-based architecture centered around the App class:
@app.agent() that consume from channelsThis design enables building scalable, fault-tolerant stream processing applications that integrate seamlessly with the Python ecosystem.
The foundational components for creating and managing Faust applications, including the main App class and decorators for defining agents, topics, tables, web endpoints, and CLI commands.
class App:
def __init__(self, id: str, *, broker: str = None, **kwargs): ...
def agent(self, channel=None, *, name=None, concurrency=1, **kwargs): ...
def topic(self, topic: str, *, key_type=None, value_type=None, **kwargs): ...
def table(self, name: str, *, default=None, window=None, **kwargs): ...
def timer(self, seconds: float, *, on_error=None, **kwargs): ...
def crontab(self, cron_format: str, *, timezone=None, **kwargs): ...
def command(self, *options, base=None, **kwargs): ...
def page(self, path: str, *, cors_options=None, **kwargs): ...
def main(self): ...Stream processing components including agents for consuming data streams, stream transformation operations, and event handling for building real-time data processing pipelines.
class Agent:
async def send(self, value=None, *, key=None, partition=None): ...
async def ask(self, value=None, *, key=None, reply_to=None, **kwargs): ...
def cast(self, value=None, *, key=None, partition=None): ...
class Stream:
def filter(self, fun): ...
def map(self, fun): ...
def group_by(self, key, *, name=None): ...
def take(self, max_: int, *, within=None): ...
def rate_limit(self, rate: float, *, per=1.0): ...
def through(self, channel, **kwargs): ...
class Event:
def ack(self): ...
def reject(self): ...
async def send(self, channel, key=None, value=None, **kwargs): ...
async def forward(self, channel, *, key=None, value=None, **kwargs): ...
def current_event() -> Event: ...Topic and channel management for message distribution, including configuration, partitioning, serialization, and integration with Kafka infrastructure.
class Topic:
async def send(self, key=None, value=None, *, partition=None, **kwargs): ...
def send_soon(self, key=None, value=None, *, partition=None, **kwargs): ...
def stream(self, **kwargs) -> Stream: ...
def events(self, **kwargs) -> Stream: ...
def get_partition_key(self, key, partition): ...
class Channel:
async def send(self, value=None, *, key=None, partition=None, **kwargs): ...
def send_soon(self, value=None, *, key=None, partition=None, **kwargs): ...
def stream(self, **kwargs) -> Stream: ...
def events(self, **kwargs) -> Stream: ...Stateful data management through tables and models, including distributed key-value storage, windowing operations, and structured data modeling with serialization.
class Table:
def __getitem__(self, key): ...
def __setitem__(self, key, value): ...
def get(self, key, default=None): ...
def setdefault(self, key, default=None): ...
def pop(self, key, *default): ...
def items(): ...
def keys(): ...
def values(): ...
def clear(self): ...
class GlobalTable(Table): ...
class SetTable: ...
class SetGlobalTable: ...
class Model:
def dumps(self, *, serializer=None) -> bytes: ...
@classmethod
def loads(cls, s: bytes, *, serializer=None): ...
def asdict(self) -> dict: ...
def derive(self, **fields): ...
class Record(Model): ...
class ModelOptions: ...
class FieldDescriptor: ...
class StringField(FieldDescriptor): ...
def maybe_model(arg) -> any: ...
registry: dict = {}Data serialization and schema management for type-safe message handling, including codecs for different data formats and schema definitions for structured data.
class Schema:
def loads_key(self, app, message, *, loads=None, serializer=None): ...
def loads_value(self, app, message, *, loads=None, serializer=None): ...
def dumps_key(self, app, key, *, serializer=None) -> bytes: ...
def dumps_value(self, app, value, *, serializer=None) -> bytes: ...
class Codec:
def encode(self, obj) -> bytes: ...
def decode(self, data: bytes): ...Security and authentication mechanisms for secure connections to Kafka brokers, including SSL, SASL, and GSSAPI credential management.
class SSLCredentials:
def __init__(self, *, context=None, purpose=None, cafile=None, **kwargs): ...
class SASLCredentials:
def __init__(self, *, mechanism=None, username=None, password=None, **kwargs): ...
class GSSAPICredentials:
def __init__(self, *, kerberos_service_name='kafka', **kwargs): ...Time-based windowing operations for temporal data aggregation, including tumbling, hopping, and sliding window implementations for stream analytics.
class Window: ...
class TumblingWindow(Window):
def __init__(self, size: float, *, expires=None): ...
class HoppingWindow(Window):
def __init__(self, size: float, step: float, *, expires=None): ...
class SlidingWindow(Window):
def __init__(self, before: float, after: float, *, expires=None): ...Monitoring, metrics collection, and sensor framework for observability, including custom sensor implementations and integration with monitoring systems.
class Sensor:
def on_message_in(self, tp, offset, message): ...
def on_message_out(self, tp, offset, message): ...
def on_table_get(self, table, key): ...
def on_table_set(self, table, key, value): ...
def on_commit_completed(self, consumer, state): ...
class Monitor(Sensor): ...Command-line interface framework for building application-specific commands, including argument parsing, option handling, and integration with the Faust CLI system.
class Command:
def run(self, *args, **kwargs): ...
class AppCommand(Command):
def run(self, *args, **kwargs): ...
class argument:
def __init__(self, *args, **kwargs): ...
def __call__(self, fun): ...
class option:
def __init__(self, *args, **kwargs): ...
def __call__(self, fun): ...
def call_command(command: str, args=None, **kwargs) -> tuple: ...Worker process management and service coordination, including application lifecycle management, process coordination, and service orchestration.
class Worker:
def start(self): ...
def stop(self): ...
def restart(self): ...
class Service: ...
class ServiceT: ...Faust provides comprehensive type interfaces for static type checking:
from typing import Protocol
class AppT(Protocol): ...
class AgentT(Protocol): ...
class ChannelT(Protocol): ...
class EventT(Protocol): ...
class StreamT(Protocol): ...
class TopicT(Protocol): ...
class ServiceT(Protocol): ...Application configuration through the Settings class:
class Settings:
# Configuration options for brokers, serialization, etc.
passUtility functions and helpers:
def uuid() -> str: ...