0
# Windowing
1
2
Time-based windowing operations for temporal data aggregation in Faust applications. Provides tumbling, hopping, and sliding window implementations for stream analytics with configurable time boundaries, expiration policies, and efficient state management.
3
4
## Capabilities
5
6
### Window Base Class
7
8
Abstract base class for all window implementations. Defines the common interface and behavior for time-based data partitioning and aggregation operations.
9
10
```python { .api }
11
class Window:
12
def __init__(self, *, expires: float = None, **kwargs):
13
"""
14
Base window implementation.
15
16
Args:
17
expires: Window expiration time in seconds
18
"""
19
20
def ranges(self, timestamp: float) -> list:
21
"""
22
Get window ranges for a given timestamp.
23
24
Args:
25
timestamp: Event timestamp in seconds
26
27
Returns:
28
List of (start, end) tuples for applicable windows
29
"""
30
31
def stale(self, timestamp: float, latest_timestamp: float) -> bool:
32
"""
33
Check if window is stale and should be expired.
34
35
Args:
36
timestamp: Window timestamp
37
latest_timestamp: Latest observed timestamp
38
39
Returns:
40
True if window should be expired
41
"""
42
43
def current(self, timestamp: float) -> tuple:
44
"""
45
Get current window range for timestamp.
46
47
Args:
48
timestamp: Event timestamp
49
50
Returns:
51
(start, end) tuple for current window
52
"""
53
54
@property
55
def expires(self) -> float:
56
"""Window expiration time in seconds."""
57
58
@property
59
def ident(self) -> str:
60
"""Unique identifier for this window type."""
61
```
62
63
### Tumbling Windows
64
65
Fixed-size, non-overlapping windows that partition time into discrete intervals. Each event belongs to exactly one window, making them ideal for aggregations like counts, sums, and averages over regular time periods.
66
67
```python { .api }
68
class TumblingWindow(Window):
69
def __init__(self, size: float, *, expires: float = None):
70
"""
71
Create tumbling window with fixed size.
72
73
Args:
74
size: Window size in seconds
75
expires: Window expiration time (defaults to size * 2)
76
77
Example:
78
# 5-minute tumbling windows
79
window = TumblingWindow(300) # 300 seconds = 5 minutes
80
"""
81
82
def ranges(self, timestamp: float) -> list:
83
"""
84
Get single window range for timestamp.
85
86
Args:
87
timestamp: Event timestamp
88
89
Returns:
90
List containing single (start, end) tuple
91
"""
92
93
def current(self, timestamp: float) -> tuple:
94
"""
95
Get current window boundaries.
96
97
Args:
98
timestamp: Event timestamp
99
100
Returns:
101
(start, end) tuple for window containing timestamp
102
"""
103
104
@property
105
def size(self) -> float:
106
"""Window size in seconds."""
107
108
@property
109
def ident(self) -> str:
110
"""Window identifier: 'tumbling_{size}'."""
111
```
112
113
### Hopping Windows
114
115
Fixed-size, overlapping windows that advance by a smaller step size. Events can belong to multiple windows, enabling sliding aggregations and overlapping time-based analysis.
116
117
```python { .api }
118
class HoppingWindow(Window):
119
def __init__(self, size: float, step: float, *, expires: float = None):
120
"""
121
Create hopping window with size and step.
122
123
Args:
124
size: Window size in seconds
125
step: Step size (advance interval) in seconds
126
expires: Window expiration time (defaults to size * 2)
127
128
Example:
129
# 10-minute windows advancing every 5 minutes
130
window = HoppingWindow(size=600, step=300)
131
"""
132
133
def ranges(self, timestamp: float) -> list:
134
"""
135
Get multiple overlapping window ranges.
136
137
Args:
138
timestamp: Event timestamp
139
140
Returns:
141
List of (start, end) tuples for overlapping windows
142
"""
143
144
def current(self, timestamp: float) -> tuple:
145
"""
146
Get most recent window containing timestamp.
147
148
Args:
149
timestamp: Event timestamp
150
151
Returns:
152
(start, end) tuple for latest applicable window
153
"""
154
155
@property
156
def size(self) -> float:
157
"""Window size in seconds."""
158
159
@property
160
def step(self) -> float:
161
"""Step size in seconds."""
162
163
@property
164
def ident(self) -> str:
165
"""Window identifier: 'hopping_{size}_{step}'."""
166
```
167
168
### Sliding Windows
169
170
Variable-size windows that expand around each event timestamp. Useful for time-range queries and event correlation within flexible time boundaries.
171
172
```python { .api }
173
class SlidingWindow(Window):
174
def __init__(self, before: float, after: float, *, expires: float = None):
175
"""
176
Create sliding window with before/after ranges.
177
178
Args:
179
before: Time range before event timestamp (seconds)
180
after: Time range after event timestamp (seconds)
181
expires: Window expiration time (defaults to before + after + 60)
182
183
Example:
184
# 5 minutes before, 2 minutes after each event
185
window = SlidingWindow(before=300, after=120)
186
"""
187
188
def ranges(self, timestamp: float) -> list:
189
"""
190
Get window range centered on timestamp.
191
192
Args:
193
timestamp: Event timestamp
194
195
Returns:
196
List containing single (start, end) tuple
197
"""
198
199
def current(self, timestamp: float) -> tuple:
200
"""
201
Get window boundaries around timestamp.
202
203
Args:
204
timestamp: Event timestamp
205
206
Returns:
207
(start, end) tuple: (timestamp - before, timestamp + after)
208
"""
209
210
@property
211
def before(self) -> float:
212
"""Time range before event in seconds."""
213
214
@property
215
def after(self) -> float:
216
"""Time range after event in seconds."""
217
218
@property
219
def total_size(self) -> float:
220
"""Total window size (before + after)."""
221
222
@property
223
def ident(self) -> str:
224
"""Window identifier: 'sliding_{before}_{after}'."""
225
```
226
227
### Windowed Tables
228
229
Integration between windows and tables for time-based stateful aggregations. Windowed tables automatically partition data by time windows and manage window lifecycle.
230
231
```python { .api }
232
class WindowedTable:
233
def __init__(
234
self,
235
table: Table,
236
window: Window,
237
*,
238
key_index_size: int = None
239
):
240
"""
241
Create windowed table wrapper.
242
243
Args:
244
table: Base table for storage
245
window: Window specification
246
key_index_size: Size of key index for cleanup
247
"""
248
249
def __getitem__(self, key_and_timestamp: tuple) -> any:
250
"""
251
Get value for key at specific timestamp.
252
253
Args:
254
key_and_timestamp: (key, timestamp) tuple
255
256
Returns:
257
Value in applicable window
258
"""
259
260
def __setitem__(self, key_and_timestamp: tuple, value: any) -> None:
261
"""
262
Set value for key at specific timestamp.
263
264
Args:
265
key_and_timestamp: (key, timestamp) tuple
266
value: Value to store
267
"""
268
269
def get_window(self, key: any, window_range: tuple) -> any:
270
"""
271
Get value for specific window range.
272
273
Args:
274
key: Table key
275
window_range: (start, end) window tuple
276
277
Returns:
278
Value in specified window
279
"""
280
281
def set_window(self, key: any, window_range: tuple, value: any) -> None:
282
"""
283
Set value for specific window range.
284
285
Args:
286
key: Table key
287
window_range: (start, end) window tuple
288
value: Value to store
289
"""
290
291
def expire_windows(self, latest_timestamp: float) -> int:
292
"""
293
Expire stale windows based on latest timestamp.
294
295
Args:
296
latest_timestamp: Latest observed timestamp
297
298
Returns:
299
Number of windows expired
300
"""
301
302
def windows_for_key(self, key: any) -> list:
303
"""
304
Get all active windows for a key.
305
306
Args:
307
key: Table key
308
309
Returns:
310
List of (window_range, value) tuples
311
"""
312
313
@property
314
def window(self) -> Window:
315
"""Window specification."""
316
317
@property
318
def table(self) -> Table:
319
"""Underlying table."""
320
```
321
322
### Window Operations
323
324
Utility functions and operations for working with windowed data, including aggregation helpers and window management utilities.
325
326
```python { .api }
327
def current_window() -> tuple:
328
"""
329
Get current window range from stream context.
330
331
Returns:
332
(start, end) tuple for current window
333
334
Raises:
335
RuntimeError: If called outside windowed stream context
336
"""
337
338
def windowed_count(table: Table, window: Window) -> callable:
339
"""
340
Create windowed counting aggregator.
341
342
Args:
343
table: Table for storing counts
344
window: Window specification
345
346
Returns:
347
Function that increments count for windowed keys
348
"""
349
350
def windowed_sum(table: Table, window: Window) -> callable:
351
"""
352
Create windowed sum aggregator.
353
354
Args:
355
table: Table for storing sums
356
window: Window specification
357
358
Returns:
359
Function that adds values to windowed sums
360
"""
361
362
def windowed_average(
363
sum_table: Table,
364
count_table: Table,
365
window: Window
366
) -> callable:
367
"""
368
Create windowed average aggregator.
369
370
Args:
371
sum_table: Table for storing sums
372
count_table: Table for storing counts
373
window: Window specification
374
375
Returns:
376
Function that maintains windowed averages
377
"""
378
379
class WindowRange:
380
def __init__(self, start: float, end: float):
381
"""
382
Window range representation.
383
384
Args:
385
start: Window start timestamp
386
end: Window end timestamp
387
"""
388
389
def contains(self, timestamp: float) -> bool:
390
"""Check if timestamp falls within window range."""
391
392
def overlaps(self, other: 'WindowRange') -> bool:
393
"""Check if this window overlaps with another."""
394
395
@property
396
def duration(self) -> float:
397
"""Window duration in seconds."""
398
399
@property
400
def midpoint(self) -> float:
401
"""Window midpoint timestamp."""
402
```
403
404
## Usage Examples
405
406
### Tumbling Window Aggregation
407
408
```python
409
import faust
410
from faust import TumblingWindow
411
412
app = faust.App('windowing-app', broker='kafka://localhost:9092')
413
414
# 5-minute tumbling windows for counting events
415
event_counts = app.Table(
416
'event-counts',
417
default=int,
418
window=TumblingWindow(300) # 5 minutes
419
)
420
421
events_topic = app.topic('events', value_type=dict)
422
423
@app.agent(events_topic)
424
async def count_events(events):
425
async for event in events:
426
# Count events per category in 5-minute windows
427
category = event['category']
428
event_counts[category] += 1
429
430
@app.timer(interval=60.0)
431
async def print_window_stats():
432
# Print counts for current windows
433
import time
434
current_time = time.time()
435
436
for category, count in event_counts.items():
437
window_start = (current_time // 300) * 300
438
print(f"Category {category}: {count} events in window {window_start}")
439
```
440
441
### Hopping Window Analytics
442
443
```python
444
from faust import HoppingWindow
445
446
# 10-minute windows advancing every 5 minutes
447
sliding_averages = app.Table(
448
'sliding-averages',
449
default=lambda: {'sum': 0, 'count': 0},
450
window=HoppingWindow(size=600, step=300)
451
)
452
453
metrics_topic = app.topic('metrics', value_type=dict)
454
455
@app.agent(metrics_topic)
456
async def compute_sliding_average(metrics):
457
async for metric in metrics:
458
metric_name = metric['name']
459
value = metric['value']
460
461
# Update sliding average for overlapping windows
462
stats = sliding_averages[metric_name]
463
stats['sum'] += value
464
stats['count'] += 1
465
sliding_averages[metric_name] = stats
466
467
@app.timer(interval=300.0) # Every 5 minutes (step size)
468
async def report_sliding_averages():
469
for metric_name, stats in sliding_averages.items():
470
if stats['count'] > 0:
471
avg = stats['sum'] / stats['count']
472
print(f"Sliding average for {metric_name}: {avg}")
473
```
474
475
### Session Windows with Sliding Window
476
477
```python
478
from faust import SlidingWindow
479
480
# Session tracking with 30-minute timeout
481
user_sessions = app.Table(
482
'user-sessions',
483
default=lambda: {'start_time': None, 'last_activity': None, 'events': []},
484
window=SlidingWindow(before=1800, after=0) # 30 minutes before
485
)
486
487
activity_topic = app.topic('user-activity', value_type=dict)
488
489
@app.agent(activity_topic)
490
async def track_sessions(activities):
491
async for activity in activities:
492
user_id = activity['user_id']
493
timestamp = activity['timestamp']
494
495
# Get session data
496
session = user_sessions[user_id]
497
498
# Check if this continues existing session or starts new one
499
if (session['last_activity'] is None or
500
timestamp - session['last_activity'] > 1800): # 30 min timeout
501
# New session
502
session['start_time'] = timestamp
503
session['events'] = []
504
505
# Update session
506
session['last_activity'] = timestamp
507
session['events'].append(activity)
508
user_sessions[user_id] = session
509
```
510
511
### Custom Window Implementation
512
513
```python
514
class BusinessHoursWindow(faust.Window):
515
"""Custom window that only includes business hours."""
516
517
def __init__(self, *, start_hour=9, end_hour=17, timezone='UTC'):
518
super().__init__()
519
self.start_hour = start_hour
520
self.end_hour = end_hour
521
self.timezone = timezone
522
523
def ranges(self, timestamp):
524
from datetime import datetime
525
import pytz
526
527
tz = pytz.timezone(self.timezone)
528
dt = datetime.fromtimestamp(timestamp, tz)
529
530
# Check if timestamp is within business hours
531
if self.start_hour <= dt.hour < self.end_hour:
532
# Return daily business hours window
533
day_start = dt.replace(
534
hour=self.start_hour, minute=0, second=0, microsecond=0
535
)
536
day_end = dt.replace(
537
hour=self.end_hour, minute=0, second=0, microsecond=0
538
)
539
return [(day_start.timestamp(), day_end.timestamp())]
540
else:
541
return [] # Outside business hours
542
543
# Use custom window
544
business_metrics = app.Table(
545
'business-metrics',
546
default=int,
547
window=BusinessHoursWindow(start_hour=9, end_hour=17, timezone='US/Eastern')
548
)
549
```
550
551
### Window Expiration and Cleanup
552
553
```python
554
from faust import TumblingWindow
555
556
# Configure window expiration
557
hourly_stats = app.Table(
558
'hourly-stats',
559
default=int,
560
window=TumblingWindow(
561
size=3600, # 1 hour windows
562
expires=7200 # Keep windows for 2 hours
563
)
564
)
565
566
@app.timer(interval=300.0) # Every 5 minutes
567
async def cleanup_expired_windows():
568
"""Clean up expired windows to manage memory."""
569
import time
570
current_time = time.time()
571
572
# Force window expiration check
573
if hasattr(hourly_stats, 'expire_windows'):
574
expired_count = hourly_stats.expire_windows(current_time)
575
if expired_count > 0:
576
print(f"Expired {expired_count} old windows")
577
```
578
579
### Multi-Window Analysis
580
581
```python
582
from faust import TumblingWindow, HoppingWindow
583
584
# Multiple window sizes for different analysis
585
minute_counts = app.Table('minute-counts', default=int,
586
window=TumblingWindow(60))
587
hour_counts = app.Table('hour-counts', default=int,
588
window=TumblingWindow(3600))
589
sliding_counts = app.Table('sliding-counts', default=int,
590
window=HoppingWindow(size=600, step=60))
591
592
@app.agent()
593
async def multi_window_analysis(events):
594
async for event in events:
595
event_type = event['type']
596
597
# Update all window types simultaneously
598
minute_counts[event_type] += 1
599
hour_counts[event_type] += 1
600
sliding_counts[event_type] += 1
601
602
@app.timer(interval=60.0)
603
async def report_multi_window_stats():
604
print("=== Multi-Window Analysis ===")
605
606
for event_type in set(minute_counts.keys()) | set(hour_counts.keys()):
607
minute_count = minute_counts.get(event_type, 0)
608
hour_count = hour_counts.get(event_type, 0)
609
sliding_count = sliding_counts.get(event_type, 0)
610
611
print(f"{event_type}:")
612
print(f" Last minute: {minute_count}")
613
print(f" Last hour: {hour_count}")
614
print(f" Sliding 10min: {sliding_count}")
615
```
616
617
## Type Interfaces
618
619
```python { .api }
620
from typing import Protocol, List, Tuple, Optional
621
622
class WindowT(Protocol):
623
"""Type interface for Window."""
624
625
expires: Optional[float]
626
ident: str
627
628
def ranges(self, timestamp: float) -> List[Tuple[float, float]]: ...
629
def stale(self, timestamp: float, latest_timestamp: float) -> bool: ...
630
def current(self, timestamp: float) -> Tuple[float, float]: ...
631
632
class TumblingWindowT(WindowT, Protocol):
633
"""Type interface for TumblingWindow."""
634
635
size: float
636
637
class HoppingWindowT(WindowT, Protocol):
638
"""Type interface for HoppingWindow."""
639
640
size: float
641
step: float
642
643
class SlidingWindowT(WindowT, Protocol):
644
"""Type interface for SlidingWindow."""
645
646
before: float
647
after: float
648
total_size: float
649
```