0
# Windowing Operations
1
2
Time-based windowing operators for processing streams in temporal buckets. Supports tumbling, sliding, and session windows with various aggregation functions for real-time analytics and temporal pattern detection.
3
4
## Capabilities
5
6
### Window Aggregation Operations
7
8
Core windowing operators that apply aggregation functions over time-based windows.
9
10
```python { .api }
11
def collect_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower) -> KeyedStream[List[V]]: ...
12
13
def fold_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...
14
15
def reduce_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...
16
17
def count_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, key: Callable[[V], str]) -> KeyedStream[int]: ...
18
19
def max_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...
20
21
def min_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...
22
```
23
24
**Common Parameters:**
25
- `step_id` (str): Unique identifier
26
- `up` (KeyedStream[V]): Input keyed stream
27
- `clock` (Clock): Clock for extracting timestamps
28
- `windower` (Windower): Window creation strategy
29
30
**collect_window**: Collects all items in each window into lists
31
**fold_window**: Applies fold operation over items in each window
32
**reduce_window**: Applies reduce operation over items in each window
33
**count_window**: Counts items in each window
34
**max_window/min_window**: Finds maximum/minimum in each window
35
36
**Usage Examples:**
37
```python
38
from datetime import timedelta
39
import bytewax.operators.windowing as win
40
41
# Collect events into 5-minute tumbling windows
42
windowed_events = win.collect_window(
43
"5min_windows",
44
keyed_events,
45
win.EventClock(lambda e: e.timestamp),
46
win.TumblingWindower(timedelta(minutes=5))
47
)
48
49
# Calculate running totals in sliding windows
50
def create_total():
51
return 0
52
53
def add_to_total(total, event):
54
return total + event.amount
55
56
sliding_totals = win.fold_window(
57
"sliding_totals",
58
keyed_transactions,
59
win.EventClock(lambda t: t.timestamp),
60
win.SlidingWindower(timedelta(hours=1), timedelta(minutes=15)),
61
create_total,
62
add_to_total
63
)
64
```
65
66
### Window Join Operations
67
68
Join multiple keyed streams within time windows.
69
70
```python { .api }
71
def join_window(step_id: str, *sides: KeyedStream[Any], clock: Clock, windower: Windower, insert_mode: JoinInsertMode = "last", emit_mode: JoinEmitMode = "complete") -> KeyedStream[Tuple]: ...
72
```
73
74
**Parameters:**
75
- `step_id` (str): Unique identifier
76
- `*sides` (KeyedStream[Any]): Multiple keyed streams to join
77
- `clock` (Clock): Clock for extracting timestamps
78
- `windower` (Windower): Window creation strategy
79
- `insert_mode` (JoinInsertMode): How to handle multiple values per side
80
- `emit_mode` (JoinEmitMode): When to emit join results
81
82
**Usage Example:**
83
```python
84
# Join user actions with user profiles within 1-hour windows
85
user_activity = win.join_window(
86
"activity_join",
87
user_actions, user_profiles,
88
clock=win.EventClock(lambda e: e.timestamp),
89
windower=win.TumblingWindower(timedelta(hours=1)),
90
emit_mode="complete"
91
)
92
```
93
94
### Clock Implementations
95
96
Classes for extracting timestamps from events or using system time.
97
98
```python { .api }
99
class Clock:
100
def extract_ts(self, value: Any) -> datetime: ...
101
102
class EventClock(Clock):
103
def __init__(self, ts_getter: Callable[[Any], datetime], wait_for_system_duration: timedelta = timedelta(0)): ...
104
105
class SystemClock(Clock):
106
def __init__(self): ...
107
```
108
109
**EventClock Parameters:**
110
- `ts_getter` (Callable): Function to extract timestamp from event
111
- `wait_for_system_duration` (timedelta): How long to wait for late events (default: no wait)
112
113
**Usage Examples:**
114
```python
115
# Extract timestamp from event field
116
event_clock = win.EventClock(lambda event: event.created_at)
117
118
# Extract from nested structure with late event tolerance
119
complex_clock = win.EventClock(
120
lambda msg: datetime.fromisoformat(msg.metadata.timestamp),
121
wait_for_system_duration=timedelta(seconds=30)
122
)
123
124
# Use system time (processing time)
125
system_clock = win.SystemClock()
126
```
127
128
### Windower Implementations
129
130
Classes defining different window creation strategies.
131
132
```python { .api }
133
class Windower:
134
def windows_for_time(self, timestamp: datetime) -> List[Tuple[str, datetime, datetime]]: ...
135
136
class TumblingWindower(Windower):
137
def __init__(self, length: timedelta): ...
138
139
class SlidingWindower(Windower):
140
def __init__(self, length: timedelta, offset: timedelta): ...
141
142
class SessionWindower(Windower):
143
def __init__(self, gap: timedelta): ...
144
```
145
146
**TumblingWindower Parameters:**
147
- `length` (timedelta): Size of each window (non-overlapping)
148
149
**SlidingWindower Parameters:**
150
- `length` (timedelta): Size of each window
151
- `offset` (timedelta): Slide distance between windows
152
153
**SessionWindower Parameters:**
154
- `gap` (timedelta): Maximum gap between events in same session
155
156
**Usage Examples:**
157
```python
158
# 15-minute tumbling windows (no overlap)
159
tumbling = win.TumblingWindower(timedelta(minutes=15))
160
161
# 1-hour windows sliding every 15 minutes (overlap)
162
sliding = win.SlidingWindower(
163
length=timedelta(hours=1),
164
offset=timedelta(minutes=15)
165
)
166
167
# Session windows with 30-minute timeout
168
sessions = win.SessionWindower(timedelta(minutes=30))
169
```
170
171
### Window Metadata Types
172
173
Types and structures for working with window information.
174
175
```python { .api }
176
WindowMetadata = Tuple[str, datetime, datetime] # (window_id, start_time, end_time)
177
178
class WindowConfig:
179
def __init__(self, clock: Clock, windower: Windower): ...
180
```
181
182
### Advanced Windowing Patterns
183
184
**Tumbling Windows Example:**
185
```python
186
# Non-overlapping 5-minute windows for real-time metrics
187
metrics = win.fold_window(
188
"realtime_metrics",
189
sensor_data,
190
win.EventClock(lambda reading: reading.timestamp),
191
win.TumblingWindower(timedelta(minutes=5)),
192
lambda: {"sum": 0, "count": 0, "min": float('inf'), "max": float('-inf')},
193
lambda acc, reading: {
194
"sum": acc["sum"] + reading.value,
195
"count": acc["count"] + 1,
196
"min": min(acc["min"], reading.value),
197
"max": max(acc["max"], reading.value)
198
}
199
)
200
```
201
202
**Sliding Windows Example:**
203
```python
204
# Overlapping windows for trend analysis
205
trends = win.collect_window(
206
"trend_analysis",
207
stock_prices,
208
win.EventClock(lambda price: price.timestamp),
209
win.SlidingWindower(
210
length=timedelta(hours=4), # 4-hour windows
211
offset=timedelta(hours=1) # New window every hour
212
)
213
)
214
```
215
216
**Session Windows Example:**
217
```python
218
# Group user interactions into sessions
219
user_sessions = win.collect_window(
220
"user_sessions",
221
user_events,
222
win.EventClock(lambda event: event.timestamp),
223
win.SessionWindower(timedelta(minutes=30)) # 30-minute session timeout
224
)
225
```
226
227
**Late Event Handling:**
228
```python
229
# Handle events arriving up to 1 minute late
230
tolerant_clock = win.EventClock(
231
lambda event: event.event_time,
232
wait_for_system_duration=timedelta(minutes=1)
233
)
234
235
late_tolerant_windows = win.collect_window(
236
"late_tolerant",
237
events,
238
tolerant_clock,
239
win.TumblingWindower(timedelta(minutes=5))
240
)
241
```