Python Stateful Stream Processing Framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core functionality for building and configuring stream processing dataflows. The Dataflow class serves as the main container for defining processing topologies, while Stream represents typed data flows between operators.
The primary container class that manages the overall processing graph and provides the execution context for all operators.
class Dataflow:
def __init__(self, flow_id: str): ...Parameters:
flow_id (str): Unique identifier for the dataflow, used for logging and monitoringUsage Example:
from bytewax.dataflow import Dataflow
# Create a new dataflow
flow = Dataflow("my_processing_pipeline")Represents a typed stream of data flowing between operators in the dataflow graph.
class Stream[X]:
def __init__(self, id: str, scope): ...Type Parameters:
X: The type of items flowing through the streamParameters:
id (str): Internal identifier for the streamscope: Internal scope management objectNote: Stream objects are typically created by operators rather than instantiated directly.
A specialized stream type for stateful operations that require data to be partitioned by key.
KeyedStream = Stream[Tuple[str, V]]Type Definition:
KeyedStream[V]: A stream of (key, value) tuples where keys are strings and values are of type VUsage Pattern:
import bytewax.operators as op
# Convert a regular stream to a keyed stream
keyed_stream = op.key_on("add_keys", stream, lambda item: item.user_id)
# Now can use stateful operators
aggregated = op.reduce_final("sum", keyed_stream, lambda acc, val: acc + val.amount)Decorator for creating custom operators that integrate with the dataflow system.
def operator(_core: bool = False): ...Parameters:
_core (bool): Internal flag indicating core operators (default: False)Usage Example:
from bytewax.dataflow import operator
@operator
def my_custom_operator(step_id: str, up: Stream[int]) -> Stream[str]:
"""Convert integers to strings with custom formatting."""
return op.map("format", up, lambda x: f"Value: {x}")Common type variables used throughout the dataflow system for generic type annotations.
P = ParamSpec("P") # Signature of an operator function
R = TypeVar("R") # Return type of an operator function
N = TypeVar("N") # Type of name of each stream
X_co = TypeVar("X_co", covariant=True) # Type contained within a Stream
F = TypeVar("F", bound=Callable[..., Any]) # Function type boundHelper functions for working with dataflow components.
def f_repr(func: Callable) -> str: ...Parameters:
func (Callable): Function to get string representation ofReturns:
str: Human-readable representation of the functionUsage: Used internally for error messages and debugging output.
Install with Tessl CLI
npx tessl i tessl/pypi-bytewax