CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-bytewax

Python Stateful Stream Processing Framework

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

windowing.mddocs/

Windowing Operations

Time-based windowing operators for processing streams in temporal buckets. Supports tumbling, sliding, and session windows with various aggregation functions for real-time analytics and temporal pattern detection.

Capabilities

Window Aggregation Operations

Core windowing operators that apply aggregation functions over time-based windows.

def collect_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower) -> KeyedStream[List[V]]: ...

def fold_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...

def reduce_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...

def count_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, key: Callable[[V], str]) -> KeyedStream[int]: ...

def max_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...

def min_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...

Common Parameters:

  • step_id (str): Unique identifier
  • up (KeyedStream[V]): Input keyed stream
  • clock (Clock): Clock for extracting timestamps
  • windower (Windower): Window creation strategy

collect_window: Collects all items in each window into lists fold_window: Applies fold operation over items in each window reduce_window: Applies reduce operation over items in each window count_window: Counts items in each window max_window/min_window: Finds maximum/minimum in each window

Usage Examples:

from datetime import timedelta
import bytewax.operators.windowing as win

# Collect events into 5-minute tumbling windows
windowed_events = win.collect_window(
    "5min_windows",
    keyed_events,
    win.EventClock(lambda e: e.timestamp),
    win.TumblingWindower(timedelta(minutes=5))
)

# Calculate running totals in sliding windows
def create_total():
    return 0

def add_to_total(total, event):
    return total + event.amount

sliding_totals = win.fold_window(
    "sliding_totals",
    keyed_transactions,
    win.EventClock(lambda t: t.timestamp),
    win.SlidingWindower(timedelta(hours=1), timedelta(minutes=15)),
    create_total,
    add_to_total
)

Window Join Operations

Join multiple keyed streams within time windows.

def join_window(step_id: str, *sides: KeyedStream[Any], clock: Clock, windower: Windower, insert_mode: JoinInsertMode = "last", emit_mode: JoinEmitMode = "complete") -> KeyedStream[Tuple]: ...

Parameters:

  • step_id (str): Unique identifier
  • *sides (KeyedStream[Any]): Multiple keyed streams to join
  • clock (Clock): Clock for extracting timestamps
  • windower (Windower): Window creation strategy
  • insert_mode (JoinInsertMode): How to handle multiple values per side
  • emit_mode (JoinEmitMode): When to emit join results

Usage Example:

# Join user actions with user profiles within 1-hour windows
user_activity = win.join_window(
    "activity_join",
    user_actions, user_profiles,
    clock=win.EventClock(lambda e: e.timestamp),
    windower=win.TumblingWindower(timedelta(hours=1)),
    emit_mode="complete"
)

Clock Implementations

Classes for extracting timestamps from events or using system time.

class Clock:
    def extract_ts(self, value: Any) -> datetime: ...

class EventClock(Clock):
    def __init__(self, ts_getter: Callable[[Any], datetime], wait_for_system_duration: timedelta = timedelta(0)): ...

class SystemClock(Clock):
    def __init__(self): ...

EventClock Parameters:

  • ts_getter (Callable): Function to extract timestamp from event
  • wait_for_system_duration (timedelta): How long to wait for late events (default: no wait)

Usage Examples:

# Extract timestamp from event field
event_clock = win.EventClock(lambda event: event.created_at)

# Extract from nested structure with late event tolerance
complex_clock = win.EventClock(
    lambda msg: datetime.fromisoformat(msg.metadata.timestamp),
    wait_for_system_duration=timedelta(seconds=30)
)

# Use system time (processing time)
system_clock = win.SystemClock()

Windower Implementations

Classes defining different window creation strategies.

class Windower:
    def windows_for_time(self, timestamp: datetime) -> List[Tuple[str, datetime, datetime]]: ...

class TumblingWindower(Windower):
    def __init__(self, length: timedelta): ...

class SlidingWindower(Windower):
    def __init__(self, length: timedelta, offset: timedelta): ...

class SessionWindower(Windower):
    def __init__(self, gap: timedelta): ...

TumblingWindower Parameters:

  • length (timedelta): Size of each window (non-overlapping)

SlidingWindower Parameters:

  • length (timedelta): Size of each window
  • offset (timedelta): Slide distance between windows

SessionWindower Parameters:

  • gap (timedelta): Maximum gap between events in same session

Usage Examples:

# 15-minute tumbling windows (no overlap)
tumbling = win.TumblingWindower(timedelta(minutes=15))

# 1-hour windows sliding every 15 minutes (overlap)
sliding = win.SlidingWindower(
    length=timedelta(hours=1),
    offset=timedelta(minutes=15)
)

# Session windows with 30-minute timeout
sessions = win.SessionWindower(timedelta(minutes=30))

Window Metadata Types

Types and structures for working with window information.

WindowMetadata = Tuple[str, datetime, datetime]  # (window_id, start_time, end_time)

class WindowConfig:
    def __init__(self, clock: Clock, windower: Windower): ...

Advanced Windowing Patterns

Tumbling Windows Example:

# Non-overlapping 5-minute windows for real-time metrics
metrics = win.fold_window(
    "realtime_metrics",
    sensor_data,
    win.EventClock(lambda reading: reading.timestamp),
    win.TumblingWindower(timedelta(minutes=5)),
    lambda: {"sum": 0, "count": 0, "min": float('inf'), "max": float('-inf')},
    lambda acc, reading: {
        "sum": acc["sum"] + reading.value,
        "count": acc["count"] + 1,
        "min": min(acc["min"], reading.value),
        "max": max(acc["max"], reading.value)
    }
)

Sliding Windows Example:

# Overlapping windows for trend analysis
trends = win.collect_window(
    "trend_analysis",
    stock_prices,
    win.EventClock(lambda price: price.timestamp),
    win.SlidingWindower(
        length=timedelta(hours=4),    # 4-hour windows
        offset=timedelta(hours=1)     # New window every hour
    )
)

Session Windows Example:

# Group user interactions into sessions
user_sessions = win.collect_window(
    "user_sessions",
    user_events,
    win.EventClock(lambda event: event.timestamp),
    win.SessionWindower(timedelta(minutes=30))  # 30-minute session timeout
)

Late Event Handling:

# Handle events arriving up to 1 minute late
tolerant_clock = win.EventClock(
    lambda event: event.event_time,
    wait_for_system_duration=timedelta(minutes=1)
)

late_tolerant_windows = win.collect_window(
    "late_tolerant",
    events,
    tolerant_clock,
    win.TumblingWindower(timedelta(minutes=5))
)

Install with Tessl CLI

npx tessl i tessl/pypi-bytewax

docs

connectors.md

dataflow.md

index.md

operators.md

recovery.md

runtime.md

sinks.md

sources.md

stateful.md

testing.md

tracing.md

windowing.md

tile.json