Python Stateful Stream Processing Framework
npx @tessl/cli install tessl/pypi-bytewax@0.21.0A Python framework that simplifies event and stream processing by coupling the stream and event processing capabilities of systems like Flink, Spark, and Kafka Streams with Python's familiar interface. Built with a Rust distributed processing engine using PyO3 bindings, it provides parallelizable stream processing through a dataflow computational model that supports stateful transformations, windowing operations, joins, and real-time aggregations.
pip install bytewaxpip install bytewax[kafka] for Kafka connectors# Core dataflow construction
from bytewax.dataflow import Dataflow
# Built-in operators for stream processing
import bytewax.operators as op
# Windowing operators for time-based processing
import bytewax.operators.windowing as win
# Runtime execution
import bytewax.run
# Testing utilities
import bytewax.testing
# Input and output interfaces
from bytewax.inputs import Source
from bytewax.outputs import Sink
# Built-in connectors
from bytewax.connectors.stdio import StdOutSink, StdInSource
from bytewax.connectors.files import FileSource, FileSink, CSVSource, DirSource, DirSink
from bytewax.connectors.kafka import KafkaSource, KafkaSink, KafkaSourceMessage, KafkaSinkMessage
from bytewax.connectors.kafka import operators as kop
from bytewax.connectors.demo import RandomMetricSourcefrom bytewax.dataflow import Dataflow
import bytewax.operators as op
from bytewax.testing import TestingSource, run_main
from bytewax.connectors.stdio import StdOutSink
# Create a dataflow
flow = Dataflow("basic_example")
# Add input data
input_data = [1, 2, 3, 4, 5]
stream = op.input("input", flow, TestingSource(input_data))
# Transform the data
doubled = op.map("double", stream, lambda x: x * 2)
# Filter even results
evens = op.filter("evens", doubled, lambda x: x % 4 == 0)
# Output results
op.output("output", evens, StdOutSink())
# Run the dataflow
run_main(flow)Bytewax's architecture consists of several key components that work together to provide scalable stream processing:
This design enables seamless scaling from single-process development to multi-worker distributed deployments across multiple hosts, with support for exactly-once processing guarantees through state recovery mechanisms.
Core functionality for building and configuring stream processing dataflows. Includes the main Dataflow class and Stream abstractions that form the foundation of all processing topologies.
class Dataflow:
def __init__(self, name: str): ...
class Stream[X]:
def __init__(self, id: str, scope): ...Essential operators for transforming, filtering, and routing data through processing pipelines. These operators provide the building blocks for most stream processing use cases including map, filter, flat_map, branch, merge, and more.
def input(step_id: str, flow: Dataflow, source: Source[X]) -> Stream[X]: ...
def output(step_id: str, up: Stream[X], sink: Sink[X]) -> None: ...
def map(step_id: str, up: Stream[X], mapper: Callable[[X], Y]) -> Stream[Y]: ...
def filter(step_id: str, up: Stream[X], predicate: Callable[[X], bool]) -> Stream[X]: ...
def flat_map(step_id: str, up: Stream[X], mapper: Callable[[X], Iterable[Y]]) -> Stream[Y]: ...
def branch(step_id: str, up: Stream[X], predicate: Callable[[X], bool]) -> BranchOut[X, X]: ...
def merge(step_id: str, *ups: Stream[Any]) -> Stream[Any]: ...Advanced operators for maintaining state across events, including reduce, fold, join, and custom stateful transformations. These operators enable complex event processing patterns like aggregations, session tracking, and multi-stream correlations.
def stateful_map(step_id: str, up: KeyedStream[V], mapper: Callable[[Optional[S], V], Tuple[Optional[S], W]]) -> KeyedStream[W]: ...
def stateful_flat_map(step_id: str, up: KeyedStream[V], mapper: Callable[[Optional[S], V], Tuple[Optional[S], Iterable[W]]]) -> KeyedStream[W]: ...
def reduce_final(step_id: str, up: KeyedStream[V], reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...
def fold_final(step_id: str, up: KeyedStream[V], builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...
def join(step_id: str, *sides: KeyedStream[Any], insert_mode: JoinInsertMode = "last", emit_mode: JoinEmitMode = "complete") -> KeyedStream[Tuple]: ...
def collect(step_id: str, up: KeyedStream[V], timeout: timedelta, max_size: int) -> KeyedStream[List[V]]: ...
class StatefulLogic[V, W, S]:
def on_item(self, value: V) -> Tuple[Iterable[W], bool]: ...
def on_notify(self) -> Tuple[Iterable[W], bool]: ...
def on_eof(self) -> Tuple[Iterable[W], bool]: ...
def notify_at(self) -> Optional[datetime]: ...
def snapshot(self) -> S: ...
JoinInsertMode = Literal["first", "last", "product"]
JoinEmitMode = Literal["complete", "final", "running"]Time-based windowing operators for processing streams in temporal buckets. Supports tumbling, sliding, and session windows with various aggregation functions for real-time analytics and temporal pattern detection.
def collect_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower) -> KeyedStream[List[V]]: ...
def fold_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...
def reduce_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...
def count_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, key: Callable[[V], str]) -> KeyedStream[int]: ...
def max_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...
def min_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...
class SystemClock:
def __init__(self, ts_getter: Callable[[V], datetime], wait_for_system_duration: timedelta = timedelta(0)): ...
class EventClock[V]:
def __init__(self, ts_getter: Callable[[V], datetime], wait_for_system_duration: timedelta = timedelta(0)): ...
class TumblingWindower:
def __init__(self, length: timedelta): ...
class SlidingWindower:
def __init__(self, length: timedelta, offset: timedelta): ...
class SessionWindower:
def __init__(self, gap: timedelta): ...
class WindowMetadata:
open_time: datetime
close_time: datetimeInterfaces and implementations for reading data from external systems. Includes abstract base classes for building custom sources and built-in sources for common data systems.
class Source[X]: ...
class StatefulSourcePartition[X, S]: ...
class StatelessSourcePartition[X]: ...
class FixedPartitionedSource[X, S]: ...
class DynamicSource[X]: ...Interfaces and implementations for writing data to external systems. Includes abstract base classes for building custom sinks and patterns for exactly-once output delivery.
class Sink[X]: ...
class StatefulSinkPartition[X, S]: ...
class StatelessSinkPartition[X]: ...
class FixedPartitionedSink[X, S]: ...
class DynamicSink[X]: ...Pre-built connectors for common external systems including Kafka, files, stdio, and demo sources. These connectors provide production-ready integration with popular data systems.
# Kafka connectors
class KafkaSource:
def __init__(self, brokers: List[str], topics: List[str], **kwargs): ...
class KafkaSink:
def __init__(self, brokers: List[str], topic: str, **kwargs): ...
class KafkaSourceMessage[K, V]:
key: K
value: V
timestamp: datetime
partition: int
offset: int
class KafkaSinkMessage[K, V]:
key: K
value: V
# File connectors
class FileSource:
def __init__(self, path: str, **kwargs): ...
class FileSink:
def __init__(self, path: str, **kwargs): ...
class CSVSource:
def __init__(self, path: str, **kwargs): ...
class DirSource:
def __init__(self, dir_path: str, **kwargs): ...
class DirSink:
def __init__(self, dir_path: str, **kwargs): ...
# Stdio connectors
class StdOutSink: ...
class StdInSource: ...
# Demo connectors
class RandomMetricSource:
def __init__(self, **kwargs): ...Functions and classes for executing dataflows in various environments, from single-threaded testing to distributed production clusters. Includes configuration for worker processes, recovery, and distributed coordination.
def cli_main(flow: Dataflow, workers_per_process: int = 1, process_id: Optional[int] = None, addresses: Optional[List[str]] = None, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...
def run_main(flow: Dataflow, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...
def cluster_main(flow: Dataflow, addresses: List[str], proc_id: int, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None, worker_count_per_proc: int = 1): ...Tools and utilities for testing dataflows including test sources, sinks, and execution helpers. Provides deterministic testing environments and data capture utilities for validating stream processing logic.
class TestingSource[X]:
def __init__(self, data: Iterable[X]): ...
class EOF: ...
class ABORT: ...
class PAUSE:
def __init__(self, duration: timedelta): ...
class TestingSink[X]:
def __init__(self): ...
def get_output(self) -> List[X]: ...
class TimeTestingGetter:
def __init__(self, now: datetime): ...
def advance(self, td: timedelta) -> None: ...
def now(self) -> datetime: ...
def run_main(flow: Dataflow, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None) -> None: ...
def cluster_main(flow: Dataflow, addresses: List[str], proc_id: int, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None, worker_count_per_proc: int = 1) -> None: ...
def poll_next_batch(partition, timeout: timedelta) -> List[Any]: ...Recovery mechanisms for fault tolerance including state snapshotting, partition management, and resume capabilities. Enables exactly-once processing guarantees in distributed environments.
class RecoveryConfig:
def __init__(self, db_dir: Path, backup_interval: Optional[timedelta] = None): ...
def init_db_dir(db_dir: Path, count: int): ...
class InconsistentPartitionsError(ValueError): ...
class MissingPartitionsError(FileNotFoundError): ...
class NoPartitionsError(FileNotFoundError): ...Logging, tracing, and monitoring capabilities for observing dataflow execution. Supports integration with Jaeger, OpenTelemetry, and other observability platforms for production monitoring.
class TracingConfig: ...
class JaegerConfig(TracingConfig):
def __init__(self, service_name: str, endpoint: Optional[str] = None, sampling_ratio: float = 1.0): ...
class OtlpTracingConfig(TracingConfig):
def __init__(self, service_name: str, url: Optional[str] = None, sampling_ratio: float = 1.0): ...
def setup_tracing(tracing_config: Optional[TracingConfig] = None, log_level: Optional[str] = None): ...