or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

connectors.mddataflow.mdindex.mdoperators.mdrecovery.mdruntime.mdsinks.mdsources.mdstateful.mdtesting.mdtracing.mdwindowing.md
tile.json

tessl/pypi-bytewax

Python Stateful Stream Processing Framework

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/bytewax@0.21.x

To install, run

npx @tessl/cli install tessl/pypi-bytewax@0.21.0

index.mddocs/

Bytewax

A Python framework that simplifies event and stream processing by coupling the stream and event processing capabilities of systems like Flink, Spark, and Kafka Streams with Python's familiar interface. Built with a Rust distributed processing engine using PyO3 bindings, it provides parallelizable stream processing through a dataflow computational model that supports stateful transformations, windowing operations, joins, and real-time aggregations.

Package Information

  • Package Name: bytewax
  • Language: Python
  • Installation: pip install bytewax
  • Optional Dependencies: pip install bytewax[kafka] for Kafka connectors

Core Imports

# Core dataflow construction
from bytewax.dataflow import Dataflow

# Built-in operators for stream processing
import bytewax.operators as op

# Windowing operators for time-based processing
import bytewax.operators.windowing as win

# Runtime execution
import bytewax.run

# Testing utilities
import bytewax.testing

# Input and output interfaces
from bytewax.inputs import Source
from bytewax.outputs import Sink

# Built-in connectors
from bytewax.connectors.stdio import StdOutSink, StdInSource
from bytewax.connectors.files import FileSource, FileSink, CSVSource, DirSource, DirSink
from bytewax.connectors.kafka import KafkaSource, KafkaSink, KafkaSourceMessage, KafkaSinkMessage
from bytewax.connectors.kafka import operators as kop
from bytewax.connectors.demo import RandomMetricSource

Basic Usage

from bytewax.dataflow import Dataflow
import bytewax.operators as op
from bytewax.testing import TestingSource, run_main
from bytewax.connectors.stdio import StdOutSink

# Create a dataflow
flow = Dataflow("basic_example")

# Add input data
input_data = [1, 2, 3, 4, 5]
stream = op.input("input", flow, TestingSource(input_data))

# Transform the data
doubled = op.map("double", stream, lambda x: x * 2)

# Filter even results
evens = op.filter("evens", doubled, lambda x: x % 4 == 0)

# Output results
op.output("output", evens, StdOutSink())

# Run the dataflow
run_main(flow)

Architecture

Bytewax's architecture consists of several key components that work together to provide scalable stream processing:

  • Dataflow: The main container that defines the processing graph and topology
  • Operators: Processing primitives that transform, filter, aggregate, and route data
  • Streams: Typed data flows between operators that can be keyed for stateful operations
  • Sources/Sinks: Input and output connectors for external systems
  • Runtime: Rust-based execution engine that handles distribution, state management, and recovery
  • Recovery System: Built-in state snapshotting and recovery mechanisms for fault tolerance

This design enables seamless scaling from single-process development to multi-worker distributed deployments across multiple hosts, with support for exactly-once processing guarantees through state recovery mechanisms.

Capabilities

Dataflow Construction

Core functionality for building and configuring stream processing dataflows. Includes the main Dataflow class and Stream abstractions that form the foundation of all processing topologies.

class Dataflow:
    def __init__(self, name: str): ...

class Stream[X]:
    def __init__(self, id: str, scope): ...

Dataflow Construction

Stream Processing Operators

Essential operators for transforming, filtering, and routing data through processing pipelines. These operators provide the building blocks for most stream processing use cases including map, filter, flat_map, branch, merge, and more.

def input(step_id: str, flow: Dataflow, source: Source[X]) -> Stream[X]: ...
def output(step_id: str, up: Stream[X], sink: Sink[X]) -> None: ...
def map(step_id: str, up: Stream[X], mapper: Callable[[X], Y]) -> Stream[Y]: ...
def filter(step_id: str, up: Stream[X], predicate: Callable[[X], bool]) -> Stream[X]: ...
def flat_map(step_id: str, up: Stream[X], mapper: Callable[[X], Iterable[Y]]) -> Stream[Y]: ...
def branch(step_id: str, up: Stream[X], predicate: Callable[[X], bool]) -> BranchOut[X, X]: ...
def merge(step_id: str, *ups: Stream[Any]) -> Stream[Any]: ...

Stream Processing Operators

Stateful Processing

Advanced operators for maintaining state across events, including reduce, fold, join, and custom stateful transformations. These operators enable complex event processing patterns like aggregations, session tracking, and multi-stream correlations.

def stateful_map(step_id: str, up: KeyedStream[V], mapper: Callable[[Optional[S], V], Tuple[Optional[S], W]]) -> KeyedStream[W]: ...
def stateful_flat_map(step_id: str, up: KeyedStream[V], mapper: Callable[[Optional[S], V], Tuple[Optional[S], Iterable[W]]]) -> KeyedStream[W]: ...
def reduce_final(step_id: str, up: KeyedStream[V], reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...
def fold_final(step_id: str, up: KeyedStream[V], builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...
def join(step_id: str, *sides: KeyedStream[Any], insert_mode: JoinInsertMode = "last", emit_mode: JoinEmitMode = "complete") -> KeyedStream[Tuple]: ...
def collect(step_id: str, up: KeyedStream[V], timeout: timedelta, max_size: int) -> KeyedStream[List[V]]: ...

class StatefulLogic[V, W, S]:
    def on_item(self, value: V) -> Tuple[Iterable[W], bool]: ...
    def on_notify(self) -> Tuple[Iterable[W], bool]: ...
    def on_eof(self) -> Tuple[Iterable[W], bool]: ...
    def notify_at(self) -> Optional[datetime]: ...
    def snapshot(self) -> S: ...

JoinInsertMode = Literal["first", "last", "product"]
JoinEmitMode = Literal["complete", "final", "running"]

Stateful Processing

Windowing Operations

Time-based windowing operators for processing streams in temporal buckets. Supports tumbling, sliding, and session windows with various aggregation functions for real-time analytics and temporal pattern detection.

def collect_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower) -> KeyedStream[List[V]]: ...
def fold_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...
def reduce_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...
def count_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, key: Callable[[V], str]) -> KeyedStream[int]: ...
def max_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...
def min_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...

class SystemClock:
    def __init__(self, ts_getter: Callable[[V], datetime], wait_for_system_duration: timedelta = timedelta(0)): ...

class EventClock[V]:
    def __init__(self, ts_getter: Callable[[V], datetime], wait_for_system_duration: timedelta = timedelta(0)): ...

class TumblingWindower:
    def __init__(self, length: timedelta): ...

class SlidingWindower:
    def __init__(self, length: timedelta, offset: timedelta): ...

class SessionWindower:
    def __init__(self, gap: timedelta): ...

class WindowMetadata:
    open_time: datetime
    close_time: datetime

Windowing Operations

Input Sources

Interfaces and implementations for reading data from external systems. Includes abstract base classes for building custom sources and built-in sources for common data systems.

class Source[X]: ...
class StatefulSourcePartition[X, S]: ...
class StatelessSourcePartition[X]: ...
class FixedPartitionedSource[X, S]: ...
class DynamicSource[X]: ...

Input Sources

Output Sinks

Interfaces and implementations for writing data to external systems. Includes abstract base classes for building custom sinks and patterns for exactly-once output delivery.

class Sink[X]: ...
class StatefulSinkPartition[X, S]: ...
class StatelessSinkPartition[X]: ...
class FixedPartitionedSink[X, S]: ...
class DynamicSink[X]: ...

Output Sinks

Built-in Connectors

Pre-built connectors for common external systems including Kafka, files, stdio, and demo sources. These connectors provide production-ready integration with popular data systems.

# Kafka connectors
class KafkaSource:
    def __init__(self, brokers: List[str], topics: List[str], **kwargs): ...

class KafkaSink:
    def __init__(self, brokers: List[str], topic: str, **kwargs): ...

class KafkaSourceMessage[K, V]:
    key: K
    value: V
    timestamp: datetime
    partition: int
    offset: int

class KafkaSinkMessage[K, V]:
    key: K
    value: V

# File connectors  
class FileSource:
    def __init__(self, path: str, **kwargs): ...

class FileSink:
    def __init__(self, path: str, **kwargs): ...

class CSVSource:
    def __init__(self, path: str, **kwargs): ...

class DirSource:
    def __init__(self, dir_path: str, **kwargs): ...

class DirSink:
    def __init__(self, dir_path: str, **kwargs): ...

# Stdio connectors
class StdOutSink: ...
class StdInSource: ...

# Demo connectors
class RandomMetricSource:
    def __init__(self, **kwargs): ...

Built-in Connectors

Runtime and Execution

Functions and classes for executing dataflows in various environments, from single-threaded testing to distributed production clusters. Includes configuration for worker processes, recovery, and distributed coordination.

def cli_main(flow: Dataflow, workers_per_process: int = 1, process_id: Optional[int] = None, addresses: Optional[List[str]] = None, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...

def run_main(flow: Dataflow, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...

def cluster_main(flow: Dataflow, addresses: List[str], proc_id: int, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None, worker_count_per_proc: int = 1): ...

Runtime and Execution

Testing Utilities

Tools and utilities for testing dataflows including test sources, sinks, and execution helpers. Provides deterministic testing environments and data capture utilities for validating stream processing logic.

class TestingSource[X]:
    def __init__(self, data: Iterable[X]): ...
    class EOF: ...
    class ABORT: ...
    class PAUSE:
        def __init__(self, duration: timedelta): ...

class TestingSink[X]:
    def __init__(self): ...
    def get_output(self) -> List[X]: ...

class TimeTestingGetter:
    def __init__(self, now: datetime): ...
    def advance(self, td: timedelta) -> None: ...
    def now(self) -> datetime: ...

def run_main(flow: Dataflow, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None) -> None: ...
def cluster_main(flow: Dataflow, addresses: List[str], proc_id: int, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None, worker_count_per_proc: int = 1) -> None: ...
def poll_next_batch(partition, timeout: timedelta) -> List[Any]: ...

Testing Utilities

State Recovery

Recovery mechanisms for fault tolerance including state snapshotting, partition management, and resume capabilities. Enables exactly-once processing guarantees in distributed environments.

class RecoveryConfig:
    def __init__(self, db_dir: Path, backup_interval: Optional[timedelta] = None): ...

def init_db_dir(db_dir: Path, count: int): ...

class InconsistentPartitionsError(ValueError): ...
class MissingPartitionsError(FileNotFoundError): ...
class NoPartitionsError(FileNotFoundError): ...

State Recovery

Tracing and Monitoring

Logging, tracing, and monitoring capabilities for observing dataflow execution. Supports integration with Jaeger, OpenTelemetry, and other observability platforms for production monitoring.

class TracingConfig: ...

class JaegerConfig(TracingConfig):
    def __init__(self, service_name: str, endpoint: Optional[str] = None, sampling_ratio: float = 1.0): ...

class OtlpTracingConfig(TracingConfig):
    def __init__(self, service_name: str, url: Optional[str] = None, sampling_ratio: float = 1.0): ...

def setup_tracing(tracing_config: Optional[TracingConfig] = None, log_level: Optional[str] = None): ...

Tracing and Monitoring