CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-bytewax

Python Stateful Stream Processing Framework

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

stateful.mddocs/

Stateful Processing

Advanced operators for maintaining state across events, enabling complex event processing patterns like aggregations, session tracking, and multi-stream correlations. All stateful operators work on keyed streams to ensure correct state partitioning in distributed environments.

Capabilities

Basic Stateful Operations

Simple stateful transformations that maintain state per key and emit results based on state updates.

def stateful_map(step_id: str, up: KeyedStream[V], mapper: Callable[[Optional[S], V], Tuple[Optional[S], W]]) -> KeyedStream[W]: ...

def stateful_flat_map(step_id: str, up: KeyedStream[V], mapper: Callable[[Optional[S], V], Tuple[Optional[S], Iterable[W]]]) -> KeyedStream[W]: ...

stateful_map Parameters:

  • step_id (str): Unique identifier
  • up (KeyedStream[V]): Input keyed stream
  • mapper (Callable): Function receiving current state and value, returning (new_state, output_value)

stateful_flat_map Parameters:

  • step_id (str): Unique identifier
  • up (KeyedStream[V]): Input keyed stream
  • mapper (Callable): Function receiving current state and value, returning (new_state, output_values)

Usage Examples:

# Running count per key
def counter(state, value):
    count = (state or 0) + 1
    return count, count

counts = op.stateful_map("count", keyed_stream, counter)

# Session tracking with multiple outputs
def session_tracker(state, event):
    if state is None:
        state = {"start": event.timestamp, "events": []}
    
    state["events"].append(event)
    
    if event.type == "session_end":
        summary = {"duration": event.timestamp - state["start"], "event_count": len(state["events"])}
        return None, [summary]  # Discard state, emit summary
    else:
        return state, []  # Keep state, no output yet

sessions = op.stateful_flat_map("sessions", keyed_events, session_tracker)

Aggregation Operations (Finite Streams)

Aggregation operators that work on finite streams and emit results only when the upstream completes.

def reduce_final(step_id: str, up: KeyedStream[V], reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...

def fold_final(step_id: str, up: KeyedStream[V], builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...

def count_final(step_id: str, up: Stream[X], key: Callable[[X], str]) -> KeyedStream[int]: ...

def max_final(step_id: str, up: KeyedStream[V], by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...

def min_final(step_id: str, up: KeyedStream[V], by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...

reduce_final Parameters:

  • step_id (str): Unique identifier
  • up (KeyedStream[V]): Input keyed stream
  • reducer (Callable[[V, V], V]): Function to combine two values

fold_final Parameters:

  • step_id (str): Unique identifier
  • up (KeyedStream[V]): Input keyed stream
  • builder (Callable[[], S]): Function to create initial accumulator
  • folder (Callable[[S, V], S]): Function to combine accumulator with new value

Usage Examples:

# Sum all values per key
totals = op.reduce_final("sum", keyed_numbers, lambda acc, val: acc + val)

# Build complex aggregations
def create_stats():
    return {"sum": 0, "count": 0, "min": float('inf'), "max": float('-inf')}

def update_stats(stats, value):
    return {
        "sum": stats["sum"] + value,
        "count": stats["count"] + 1,
        "min": min(stats["min"], value),
        "max": max(stats["max"], value)
    }

stats = op.fold_final("statistics", keyed_numbers, create_stats, update_stats)

# Count occurrences
word_counts = op.count_final("word_count", words, lambda word: word.lower())

Collection Operations

Operators for collecting items into batches based on size or time constraints.

def collect(step_id: str, up: KeyedStream[V], timeout: timedelta, max_size: int) -> KeyedStream[List[V]]: ...

Parameters:

  • step_id (str): Unique identifier
  • up (KeyedStream[V]): Input keyed stream
  • timeout (timedelta): Maximum time to wait before emitting collected items
  • max_size (int): Maximum number of items to collect before emitting

Usage Example:

from datetime import timedelta

# Collect events into batches of up to 10 items or every 5 seconds
batches = op.collect("batch_events", keyed_events, timedelta(seconds=5), 10)

Join Operations

Operators for correlating data across multiple keyed streams.

def join(step_id: str, *sides: KeyedStream[Any], insert_mode: JoinInsertMode = "last", emit_mode: JoinEmitMode = "complete") -> KeyedStream[Tuple]: ...

JoinInsertMode = Literal["first", "last", "product"]
JoinEmitMode = Literal["complete", "final", "running"]

Parameters:

  • step_id (str): Unique identifier
  • *sides (KeyedStream[Any]): Multiple keyed streams to join
  • insert_mode (JoinInsertMode): How to handle multiple values from same side
    • "first": Keep only first value per side
    • "last": Keep only last value per side
    • "product": Cross-product of all values
  • emit_mode (JoinEmitMode): When to emit join results
    • "complete": Emit when all sides have values, then discard state
    • "final": Emit when upstream ends (finite streams only)
    • "running": Emit every time any side updates

Usage Examples:

# Inner join - emit when both sides have data
user_orders = op.join("user_order_join", users, orders, emit_mode="complete")

# Left join - emit final results including partial matches
all_users = op.join("left_join", users, orders, emit_mode="final")

# Running join - emit updates as they arrive
live_correlation = op.join("live_join", stream1, stream2, emit_mode="running")

Advanced Stateful Operations

Low-level stateful operators providing maximum control over state management and lifecycle.

def stateful(step_id: str, up: KeyedStream[V], builder: Callable[[Optional[S]], StatefulLogic[V, W, S]]) -> KeyedStream[W]: ...

def stateful_batch(step_id: str, up: KeyedStream[V], builder: Callable[[Optional[S]], StatefulBatchLogic[V, W, S]]) -> KeyedStream[W]: ...

class StatefulLogic[V, W, S]:
    RETAIN: bool = False
    DISCARD: bool = True
    
    def on_item(self, value: V) -> Tuple[Iterable[W], bool]: ...
    def on_notify(self) -> Tuple[Iterable[W], bool]: ...
    def on_eof(self) -> Tuple[Iterable[W], bool]: ...
    def notify_at(self) -> Optional[datetime]: ...
    def snapshot(self) -> S: ...

class StatefulBatchLogic[V, W, S]:
    RETAIN: bool = False
    DISCARD: bool = True
    
    def on_batch(self, values: List[V]) -> Tuple[Iterable[W], bool]: ...
    def on_notify(self) -> Tuple[Iterable[W], bool]: ...
    def on_eof(self) -> Tuple[Iterable[W], bool]: ...
    def notify_at(self) -> Optional[datetime]: ...
    def snapshot(self) -> S: ...

StatefulLogic Methods:

  • on_item: Called for each new value, returns (outputs, should_discard_state)
  • on_notify: Called when notification time is reached
  • on_eof: Called when upstream ends
  • notify_at: Returns next notification time or None
  • snapshot: Returns state for recovery (must be pickle-able and immutable)

Usage Example:

from datetime import datetime, timedelta

class SlidingWindowLogic(StatefulLogic):
    def __init__(self, window_size):
        self.window_size = window_size
        self.items = []
        
    def on_item(self, value):
        now = datetime.now()
        self.items.append((now, value))
        
        # Remove old items
        cutoff = now - self.window_size
        self.items = [(ts, val) for ts, val in self.items if ts > cutoff]
        
        # Emit current window sum
        total = sum(val for _, val in self.items)
        return [total], StatefulLogic.RETAIN
        
    def notify_at(self):
        if self.items:
            # Next cleanup time
            return self.items[0][0] + self.window_size
        return None
        
    def on_notify(self):
        # Clean up expired items
        now = datetime.now()
        cutoff = now - self.window_size
        old_count = len(self.items)
        self.items = [(ts, val) for ts, val in self.items if ts > cutoff]
        
        if len(self.items) != old_count:
            total = sum(val for _, val in self.items)
            return [total], StatefulLogic.RETAIN
        return [], StatefulLogic.RETAIN
        
    def snapshot(self):
        return copy.deepcopy(self.items)

def build_sliding_window(resume_state):
    logic = SlidingWindowLogic(timedelta(minutes=5))
    if resume_state:
        logic.items = resume_state
    return logic

windowed = op.stateful("sliding_window", keyed_stream, build_sliding_window)

Install with Tessl CLI

npx tessl i tessl/pypi-bytewax

docs

connectors.md

dataflow.md

index.md

operators.md

recovery.md

runtime.md

sinks.md

sources.md

stateful.md

testing.md

tracing.md

windowing.md

tile.json