Python Stateful Stream Processing Framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Advanced operators for maintaining state across events, enabling complex event processing patterns like aggregations, session tracking, and multi-stream correlations. All stateful operators work on keyed streams to ensure correct state partitioning in distributed environments.
Simple stateful transformations that maintain state per key and emit results based on state updates.
def stateful_map(step_id: str, up: KeyedStream[V], mapper: Callable[[Optional[S], V], Tuple[Optional[S], W]]) -> KeyedStream[W]: ...
def stateful_flat_map(step_id: str, up: KeyedStream[V], mapper: Callable[[Optional[S], V], Tuple[Optional[S], Iterable[W]]]) -> KeyedStream[W]: ...stateful_map Parameters:
step_id (str): Unique identifierup (KeyedStream[V]): Input keyed streammapper (Callable): Function receiving current state and value, returning (new_state, output_value)stateful_flat_map Parameters:
step_id (str): Unique identifierup (KeyedStream[V]): Input keyed streammapper (Callable): Function receiving current state and value, returning (new_state, output_values)Usage Examples:
# Running count per key
def counter(state, value):
count = (state or 0) + 1
return count, count
counts = op.stateful_map("count", keyed_stream, counter)
# Session tracking with multiple outputs
def session_tracker(state, event):
if state is None:
state = {"start": event.timestamp, "events": []}
state["events"].append(event)
if event.type == "session_end":
summary = {"duration": event.timestamp - state["start"], "event_count": len(state["events"])}
return None, [summary] # Discard state, emit summary
else:
return state, [] # Keep state, no output yet
sessions = op.stateful_flat_map("sessions", keyed_events, session_tracker)Aggregation operators that work on finite streams and emit results only when the upstream completes.
def reduce_final(step_id: str, up: KeyedStream[V], reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...
def fold_final(step_id: str, up: KeyedStream[V], builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...
def count_final(step_id: str, up: Stream[X], key: Callable[[X], str]) -> KeyedStream[int]: ...
def max_final(step_id: str, up: KeyedStream[V], by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...
def min_final(step_id: str, up: KeyedStream[V], by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...reduce_final Parameters:
step_id (str): Unique identifierup (KeyedStream[V]): Input keyed streamreducer (Callable[[V, V], V]): Function to combine two valuesfold_final Parameters:
step_id (str): Unique identifierup (KeyedStream[V]): Input keyed streambuilder (Callable[[], S]): Function to create initial accumulatorfolder (Callable[[S, V], S]): Function to combine accumulator with new valueUsage Examples:
# Sum all values per key
totals = op.reduce_final("sum", keyed_numbers, lambda acc, val: acc + val)
# Build complex aggregations
def create_stats():
return {"sum": 0, "count": 0, "min": float('inf'), "max": float('-inf')}
def update_stats(stats, value):
return {
"sum": stats["sum"] + value,
"count": stats["count"] + 1,
"min": min(stats["min"], value),
"max": max(stats["max"], value)
}
stats = op.fold_final("statistics", keyed_numbers, create_stats, update_stats)
# Count occurrences
word_counts = op.count_final("word_count", words, lambda word: word.lower())Operators for collecting items into batches based on size or time constraints.
def collect(step_id: str, up: KeyedStream[V], timeout: timedelta, max_size: int) -> KeyedStream[List[V]]: ...Parameters:
step_id (str): Unique identifierup (KeyedStream[V]): Input keyed streamtimeout (timedelta): Maximum time to wait before emitting collected itemsmax_size (int): Maximum number of items to collect before emittingUsage Example:
from datetime import timedelta
# Collect events into batches of up to 10 items or every 5 seconds
batches = op.collect("batch_events", keyed_events, timedelta(seconds=5), 10)Operators for correlating data across multiple keyed streams.
def join(step_id: str, *sides: KeyedStream[Any], insert_mode: JoinInsertMode = "last", emit_mode: JoinEmitMode = "complete") -> KeyedStream[Tuple]: ...
JoinInsertMode = Literal["first", "last", "product"]
JoinEmitMode = Literal["complete", "final", "running"]Parameters:
step_id (str): Unique identifier*sides (KeyedStream[Any]): Multiple keyed streams to joininsert_mode (JoinInsertMode): How to handle multiple values from same side
"first": Keep only first value per side"last": Keep only last value per side"product": Cross-product of all valuesemit_mode (JoinEmitMode): When to emit join results
"complete": Emit when all sides have values, then discard state"final": Emit when upstream ends (finite streams only)"running": Emit every time any side updatesUsage Examples:
# Inner join - emit when both sides have data
user_orders = op.join("user_order_join", users, orders, emit_mode="complete")
# Left join - emit final results including partial matches
all_users = op.join("left_join", users, orders, emit_mode="final")
# Running join - emit updates as they arrive
live_correlation = op.join("live_join", stream1, stream2, emit_mode="running")Low-level stateful operators providing maximum control over state management and lifecycle.
def stateful(step_id: str, up: KeyedStream[V], builder: Callable[[Optional[S]], StatefulLogic[V, W, S]]) -> KeyedStream[W]: ...
def stateful_batch(step_id: str, up: KeyedStream[V], builder: Callable[[Optional[S]], StatefulBatchLogic[V, W, S]]) -> KeyedStream[W]: ...
class StatefulLogic[V, W, S]:
RETAIN: bool = False
DISCARD: bool = True
def on_item(self, value: V) -> Tuple[Iterable[W], bool]: ...
def on_notify(self) -> Tuple[Iterable[W], bool]: ...
def on_eof(self) -> Tuple[Iterable[W], bool]: ...
def notify_at(self) -> Optional[datetime]: ...
def snapshot(self) -> S: ...
class StatefulBatchLogic[V, W, S]:
RETAIN: bool = False
DISCARD: bool = True
def on_batch(self, values: List[V]) -> Tuple[Iterable[W], bool]: ...
def on_notify(self) -> Tuple[Iterable[W], bool]: ...
def on_eof(self) -> Tuple[Iterable[W], bool]: ...
def notify_at(self) -> Optional[datetime]: ...
def snapshot(self) -> S: ...StatefulLogic Methods:
on_item: Called for each new value, returns (outputs, should_discard_state)on_notify: Called when notification time is reachedon_eof: Called when upstream endsnotify_at: Returns next notification time or Nonesnapshot: Returns state for recovery (must be pickle-able and immutable)Usage Example:
from datetime import datetime, timedelta
class SlidingWindowLogic(StatefulLogic):
def __init__(self, window_size):
self.window_size = window_size
self.items = []
def on_item(self, value):
now = datetime.now()
self.items.append((now, value))
# Remove old items
cutoff = now - self.window_size
self.items = [(ts, val) for ts, val in self.items if ts > cutoff]
# Emit current window sum
total = sum(val for _, val in self.items)
return [total], StatefulLogic.RETAIN
def notify_at(self):
if self.items:
# Next cleanup time
return self.items[0][0] + self.window_size
return None
def on_notify(self):
# Clean up expired items
now = datetime.now()
cutoff = now - self.window_size
old_count = len(self.items)
self.items = [(ts, val) for ts, val in self.items if ts > cutoff]
if len(self.items) != old_count:
total = sum(val for _, val in self.items)
return [total], StatefulLogic.RETAIN
return [], StatefulLogic.RETAIN
def snapshot(self):
return copy.deepcopy(self.items)
def build_sliding_window(resume_state):
logic = SlidingWindowLogic(timedelta(minutes=5))
if resume_state:
logic.items = resume_state
return logic
windowed = op.stateful("sliding_window", keyed_stream, build_sliding_window)Install with Tessl CLI
npx tessl i tessl/pypi-bytewax