Python Stateful Stream Processing Framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Essential operators for transforming, filtering, and routing data through processing pipelines. These operators provide the building blocks for most stream processing use cases and form the core vocabulary of Bytewax dataflows.
Operators for introducing data into dataflows and writing results to external systems.
def input(step_id: str, flow: Dataflow, source: Source[X]) -> Stream[X]: ...
def output(step_id: str, up: Stream[X], sink: Sink[X]) -> None: ...input Parameters:
step_id (str): Unique identifier for this operator stepflow (Dataflow): The dataflow to add input tosource (Source[X]): Source to read items fromoutput Parameters:
step_id (str): Unique identifier for this operator stepup (Stream[X]): Stream of items to writesink (Sink[X]): Sink to write items toUsage Example:
from bytewax.testing import TestingSource
from bytewax.connectors.stdio import StdOutSink
# Add input to dataflow
stream = op.input("data_input", flow, TestingSource([1, 2, 3]))
# Output results
op.output("results", processed_stream, StdOutSink())Core operators for transforming data items one-by-one or one-to-many.
def map(step_id: str, up: Stream[X], mapper: Callable[[X], Y]) -> Stream[Y]: ...
def map_value(step_id: str, up: KeyedStream[V], mapper: Callable[[V], W]) -> KeyedStream[W]: ...
def flat_map(step_id: str, up: Stream[X], mapper: Callable[[X], Iterable[Y]]) -> Stream[Y]: ...
def flat_map_value(step_id: str, up: KeyedStream[V], mapper: Callable[[V], Iterable[W]]) -> KeyedStream[W]: ...
def flat_map_batch(step_id: str, up: Stream[X], mapper: Callable[[List[X]], Iterable[Y]]) -> Stream[Y]: ...map Parameters:
step_id (str): Unique identifierup (Stream[X]): Input streammapper (Callable[[X], Y]): Function to transform each itemflat_map Parameters:
step_id (str): Unique identifierup (Stream[X]): Input streammapper (Callable[[X], Iterable[Y]]): Function returning iterable of itemsUsage Examples:
# Transform each item
doubled = op.map("double", numbers, lambda x: x * 2)
# Transform values in keyed stream
processed = op.map_value("process", keyed_stream, lambda val: val.upper())
# One-to-many transformation
words = op.flat_map("split", sentences, lambda s: s.split())
# Batch processing for efficiency
processed = op.flat_map_batch("batch_process", stream, lambda batch: [expensive_transform(x) for x in batch])Operators for selectively keeping or transforming items based on predicates.
def filter(step_id: str, up: Stream[X], predicate: Callable[[X], bool]) -> Stream[X]: ...
def filter_value(step_id: str, up: KeyedStream[V], predicate: Callable[[V], bool]) -> KeyedStream[V]: ...
def filter_map(step_id: str, up: Stream[X], mapper: Callable[[X], Optional[Y]]) -> Stream[Y]: ...
def filter_map_value(step_id: str, up: KeyedStream[V], mapper: Callable[[V], Optional[W]]) -> KeyedStream[W]: ...filter Parameters:
step_id (str): Unique identifierup (Stream[X]): Input streampredicate (Callable[[X], bool]): Function returning True to keep itemfilter_map Parameters:
step_id (str): Unique identifierup (Stream[X]): Input streammapper (Callable[[X], Optional[Y]]): Function returning None to filter out, value to keepUsage Examples:
# Keep only even numbers
evens = op.filter("evens", numbers, lambda x: x % 2 == 0)
# Filter and transform in one step
valid_data = op.filter_map("validate", raw_data, lambda x: x.value if x.is_valid else None)Operators for splitting, combining, and redistributing streams.
def branch(step_id: str, up: Stream[X], predicate: Callable[[X], bool]) -> BranchOut[X, X]: ...
def merge(step_id: str, *ups: Stream[Any]) -> Stream[Any]: ...
def redistribute(step_id: str, up: Stream[X]) -> Stream[X]: ...
class BranchOut[X, Y]:
trues: Stream[X]
falses: Stream[Y]branch Parameters:
step_id (str): Unique identifierup (Stream[X]): Input stream to splitpredicate (Callable[[X], bool]): Function to determine which branchmerge Parameters:
step_id (str): Unique identifier*ups (Stream[Any]): Multiple streams to combineUsage Examples:
# Split stream based on condition
branches = op.branch("split_data", stream, lambda x: x.priority == "high")
high_priority = branches.trues
low_priority = branches.falses
# Combine multiple streams
combined = op.merge("combine", stream1, stream2, stream3)
# Redistribute for better parallelization
redistributed = op.redistribute("rebalance", stream)Operators for working with keyed streams required for stateful operations.
def key_on(step_id: str, up: Stream[X], key: Callable[[X], str]) -> KeyedStream[X]: ...
def key_rm(step_id: str, up: KeyedStream[X]) -> Stream[X]: ...key_on Parameters:
step_id (str): Unique identifierup (Stream[X]): Input streamkey (Callable[[X], str]): Function to extract key from each itemkey_rm Parameters:
step_id (str): Unique identifierup (KeyedStream[X]): Keyed stream to remove keys fromUsage Examples:
# Add keys for stateful operations
keyed = op.key_on("by_user", events, lambda e: e.user_id)
# Remove keys when no longer needed
unkeyed = op.key_rm("remove_keys", keyed_results)Operators for working with nested data structures.
def flatten(step_id: str, up: Stream[Iterable[X]]) -> Stream[X]: ...Parameters:
step_id (str): Unique identifierup (Stream[Iterable[X]]): Stream of iterables to flattenUsage Example:
# Flatten nested lists
flattened = op.flatten("flatten", stream_of_lists)Operators for debugging, inspection, and error handling.
def inspect(step_id: str, up: Stream[X], inspector: Callable[[str, X], None] = _default_inspector) -> Stream[X]: ...
def inspect_debug(step_id: str, up: Stream[X], inspector: Callable[[str, X, int, int], None] = _default_debug_inspector) -> Stream[X]: ...
def raises(step_id: str, up: Stream[Any]) -> None: ...inspect Parameters:
step_id (str): Unique identifierup (Stream[X]): Stream to observeinspector (Callable): Function called for each item (default prints to stdout)inspect_debug Parameters:
step_id (str): Unique identifierup (Stream[X]): Stream to observeinspector (Callable): Function called with item, epoch, and worker infoUsage Examples:
# Debug stream contents
debugged = op.inspect("debug_point", stream)
# Detailed debugging with worker info
detailed = op.inspect_debug("detailed_debug", stream)
# Crash on any item (for testing)
op.raises("should_be_empty", error_stream)Operators for joining streams with external data sources.
def enrich_cached(step_id: str, up: Stream[X], getter: Callable[[DK], DV], mapper: Callable[[TTLCache[DK, DV], X], Y], ttl: timedelta = timedelta.max, _now_getter: Callable[[], datetime] = _get_system_utc) -> Stream[Y]: ...
class TTLCache[DK, DV]:
def __init__(self, v_getter: Callable[[DK], DV], now_getter: Callable[[], datetime], ttl: timedelta): ...
def get(self, k: DK) -> DV: ...
def remove(self, k: DK) -> None: ...enrich_cached Parameters:
step_id (str): Unique identifierup (Stream[X]): Input streamgetter (Callable[[DK], DV]): Function to fetch data for cache missesmapper (Callable[[TTLCache, X], Y]): Function to enrich each item using cachettl (timedelta): Time-to-live for cached valuesUsage Example:
def lookup_user_data(user_id):
# Fetch from database or API
return user_database.get(user_id)
def enrich_event(cache, event):
user_data = cache.get(event.user_id)
return {**event, "user_name": user_data.name}
enriched = op.enrich_cached("enrich_users", events, lookup_user_data, enrich_event, ttl=timedelta(minutes=5))Operators for performing final aggregations on finite streams.
def count_final(step_id: str, up: Stream[X], key: Callable[[X], str]) -> KeyedStream[int]: ...
def max_final(step_id: str, up: KeyedStream[V], by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...
def min_final(step_id: str, up: KeyedStream[V], by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...
def reduce_final(step_id: str, up: KeyedStream[V], reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...
def fold_final(step_id: str, up: KeyedStream[V], builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...count_final Parameters:
step_id (str): Unique identifierup (Stream[X]): Stream of items to countkey (Callable[[X], str]): Function to convert items to count keysreduce_final Parameters:
step_id (str): Unique identifierup (KeyedStream[V]): Keyed stream to reducereducer (Callable[[V, V], V]): Function to combine two valuesfold_final Parameters:
step_id (str): Unique identifierup (KeyedStream[V]): Keyed stream to foldbuilder (Callable[[], S]): Function to build initial accumulatorfolder (Callable[[S, V], S]): Function to combine accumulator with valueUsage Examples:
# Count occurrences by key
counts = op.count_final("count_words", words, lambda w: w)
# Find maximum value per key
max_values = op.max_final("find_max", keyed_numbers)
# Find minimum with custom comparison
min_by_priority = op.min_final("min_priority", tasks, lambda t: t.priority)
# Sum all values per key
sums = op.reduce_final("sum", keyed_numbers, lambda a, b: a + b)
# Build custom accumulator
def build_list():
return []
def add_to_list(lst, item):
return lst + [item]
lists = op.fold_final("collect", keyed_items, build_list, add_to_list)Install with Tessl CLI
npx tessl i tessl/pypi-bytewax