Python Stateful Stream Processing Framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Time-based windowing operators for processing streams in temporal buckets. Supports tumbling, sliding, and session windows with various aggregation functions for real-time analytics and temporal pattern detection.
Core windowing operators that apply aggregation functions over time-based windows.
def collect_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower) -> KeyedStream[List[V]]: ...
def fold_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...
def reduce_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...
def count_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, key: Callable[[V], str]) -> KeyedStream[int]: ...
def max_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...
def min_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...Common Parameters:
step_id (str): Unique identifierup (KeyedStream[V]): Input keyed streamclock (Clock): Clock for extracting timestampswindower (Windower): Window creation strategycollect_window: Collects all items in each window into lists fold_window: Applies fold operation over items in each window reduce_window: Applies reduce operation over items in each window count_window: Counts items in each window max_window/min_window: Finds maximum/minimum in each window
Usage Examples:
from datetime import timedelta
import bytewax.operators.windowing as win
# Collect events into 5-minute tumbling windows
windowed_events = win.collect_window(
"5min_windows",
keyed_events,
win.EventClock(lambda e: e.timestamp),
win.TumblingWindower(timedelta(minutes=5))
)
# Calculate running totals in sliding windows
def create_total():
return 0
def add_to_total(total, event):
return total + event.amount
sliding_totals = win.fold_window(
"sliding_totals",
keyed_transactions,
win.EventClock(lambda t: t.timestamp),
win.SlidingWindower(timedelta(hours=1), timedelta(minutes=15)),
create_total,
add_to_total
)Join multiple keyed streams within time windows.
def join_window(step_id: str, *sides: KeyedStream[Any], clock: Clock, windower: Windower, insert_mode: JoinInsertMode = "last", emit_mode: JoinEmitMode = "complete") -> KeyedStream[Tuple]: ...Parameters:
step_id (str): Unique identifier*sides (KeyedStream[Any]): Multiple keyed streams to joinclock (Clock): Clock for extracting timestampswindower (Windower): Window creation strategyinsert_mode (JoinInsertMode): How to handle multiple values per sideemit_mode (JoinEmitMode): When to emit join resultsUsage Example:
# Join user actions with user profiles within 1-hour windows
user_activity = win.join_window(
"activity_join",
user_actions, user_profiles,
clock=win.EventClock(lambda e: e.timestamp),
windower=win.TumblingWindower(timedelta(hours=1)),
emit_mode="complete"
)Classes for extracting timestamps from events or using system time.
class Clock:
def extract_ts(self, value: Any) -> datetime: ...
class EventClock(Clock):
def __init__(self, ts_getter: Callable[[Any], datetime], wait_for_system_duration: timedelta = timedelta(0)): ...
class SystemClock(Clock):
def __init__(self): ...EventClock Parameters:
ts_getter (Callable): Function to extract timestamp from eventwait_for_system_duration (timedelta): How long to wait for late events (default: no wait)Usage Examples:
# Extract timestamp from event field
event_clock = win.EventClock(lambda event: event.created_at)
# Extract from nested structure with late event tolerance
complex_clock = win.EventClock(
lambda msg: datetime.fromisoformat(msg.metadata.timestamp),
wait_for_system_duration=timedelta(seconds=30)
)
# Use system time (processing time)
system_clock = win.SystemClock()Classes defining different window creation strategies.
class Windower:
def windows_for_time(self, timestamp: datetime) -> List[Tuple[str, datetime, datetime]]: ...
class TumblingWindower(Windower):
def __init__(self, length: timedelta): ...
class SlidingWindower(Windower):
def __init__(self, length: timedelta, offset: timedelta): ...
class SessionWindower(Windower):
def __init__(self, gap: timedelta): ...TumblingWindower Parameters:
length (timedelta): Size of each window (non-overlapping)SlidingWindower Parameters:
length (timedelta): Size of each windowoffset (timedelta): Slide distance between windowsSessionWindower Parameters:
gap (timedelta): Maximum gap between events in same sessionUsage Examples:
# 15-minute tumbling windows (no overlap)
tumbling = win.TumblingWindower(timedelta(minutes=15))
# 1-hour windows sliding every 15 minutes (overlap)
sliding = win.SlidingWindower(
length=timedelta(hours=1),
offset=timedelta(minutes=15)
)
# Session windows with 30-minute timeout
sessions = win.SessionWindower(timedelta(minutes=30))Types and structures for working with window information.
WindowMetadata = Tuple[str, datetime, datetime] # (window_id, start_time, end_time)
class WindowConfig:
def __init__(self, clock: Clock, windower: Windower): ...Tumbling Windows Example:
# Non-overlapping 5-minute windows for real-time metrics
metrics = win.fold_window(
"realtime_metrics",
sensor_data,
win.EventClock(lambda reading: reading.timestamp),
win.TumblingWindower(timedelta(minutes=5)),
lambda: {"sum": 0, "count": 0, "min": float('inf'), "max": float('-inf')},
lambda acc, reading: {
"sum": acc["sum"] + reading.value,
"count": acc["count"] + 1,
"min": min(acc["min"], reading.value),
"max": max(acc["max"], reading.value)
}
)Sliding Windows Example:
# Overlapping windows for trend analysis
trends = win.collect_window(
"trend_analysis",
stock_prices,
win.EventClock(lambda price: price.timestamp),
win.SlidingWindower(
length=timedelta(hours=4), # 4-hour windows
offset=timedelta(hours=1) # New window every hour
)
)Session Windows Example:
# Group user interactions into sessions
user_sessions = win.collect_window(
"user_sessions",
user_events,
win.EventClock(lambda event: event.timestamp),
win.SessionWindower(timedelta(minutes=30)) # 30-minute session timeout
)Late Event Handling:
# Handle events arriving up to 1 minute late
tolerant_clock = win.EventClock(
lambda event: event.event_time,
wait_for_system_duration=timedelta(minutes=1)
)
late_tolerant_windows = win.collect_window(
"late_tolerant",
events,
tolerant_clock,
win.TumblingWindower(timedelta(minutes=5))
)Install with Tessl CLI
npx tessl i tessl/pypi-bytewax