Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines
—
Time-based windowing operations for temporal data aggregation in Faust applications. Provides tumbling, hopping, and sliding window implementations for stream analytics with configurable time boundaries, expiration policies, and efficient state management.
Abstract base class for all window implementations. Defines the common interface and behavior for time-based data partitioning and aggregation operations.
class Window:
def __init__(self, *, expires: float = None, **kwargs):
"""
Base window implementation.
Args:
expires: Window expiration time in seconds
"""
def ranges(self, timestamp: float) -> list:
"""
Get window ranges for a given timestamp.
Args:
timestamp: Event timestamp in seconds
Returns:
List of (start, end) tuples for applicable windows
"""
def stale(self, timestamp: float, latest_timestamp: float) -> bool:
"""
Check if window is stale and should be expired.
Args:
timestamp: Window timestamp
latest_timestamp: Latest observed timestamp
Returns:
True if window should be expired
"""
def current(self, timestamp: float) -> tuple:
"""
Get current window range for timestamp.
Args:
timestamp: Event timestamp
Returns:
(start, end) tuple for current window
"""
@property
def expires(self) -> float:
"""Window expiration time in seconds."""
@property
def ident(self) -> str:
"""Unique identifier for this window type."""Fixed-size, non-overlapping windows that partition time into discrete intervals. Each event belongs to exactly one window, making them ideal for aggregations like counts, sums, and averages over regular time periods.
class TumblingWindow(Window):
def __init__(self, size: float, *, expires: float = None):
"""
Create tumbling window with fixed size.
Args:
size: Window size in seconds
expires: Window expiration time (defaults to size * 2)
Example:
# 5-minute tumbling windows
window = TumblingWindow(300) # 300 seconds = 5 minutes
"""
def ranges(self, timestamp: float) -> list:
"""
Get single window range for timestamp.
Args:
timestamp: Event timestamp
Returns:
List containing single (start, end) tuple
"""
def current(self, timestamp: float) -> tuple:
"""
Get current window boundaries.
Args:
timestamp: Event timestamp
Returns:
(start, end) tuple for window containing timestamp
"""
@property
def size(self) -> float:
"""Window size in seconds."""
@property
def ident(self) -> str:
"""Window identifier: 'tumbling_{size}'."""Fixed-size, overlapping windows that advance by a smaller step size. Events can belong to multiple windows, enabling sliding aggregations and overlapping time-based analysis.
class HoppingWindow(Window):
def __init__(self, size: float, step: float, *, expires: float = None):
"""
Create hopping window with size and step.
Args:
size: Window size in seconds
step: Step size (advance interval) in seconds
expires: Window expiration time (defaults to size * 2)
Example:
# 10-minute windows advancing every 5 minutes
window = HoppingWindow(size=600, step=300)
"""
def ranges(self, timestamp: float) -> list:
"""
Get multiple overlapping window ranges.
Args:
timestamp: Event timestamp
Returns:
List of (start, end) tuples for overlapping windows
"""
def current(self, timestamp: float) -> tuple:
"""
Get most recent window containing timestamp.
Args:
timestamp: Event timestamp
Returns:
(start, end) tuple for latest applicable window
"""
@property
def size(self) -> float:
"""Window size in seconds."""
@property
def step(self) -> float:
"""Step size in seconds."""
@property
def ident(self) -> str:
"""Window identifier: 'hopping_{size}_{step}'."""Variable-size windows that expand around each event timestamp. Useful for time-range queries and event correlation within flexible time boundaries.
class SlidingWindow(Window):
def __init__(self, before: float, after: float, *, expires: float = None):
"""
Create sliding window with before/after ranges.
Args:
before: Time range before event timestamp (seconds)
after: Time range after event timestamp (seconds)
expires: Window expiration time (defaults to before + after + 60)
Example:
# 5 minutes before, 2 minutes after each event
window = SlidingWindow(before=300, after=120)
"""
def ranges(self, timestamp: float) -> list:
"""
Get window range centered on timestamp.
Args:
timestamp: Event timestamp
Returns:
List containing single (start, end) tuple
"""
def current(self, timestamp: float) -> tuple:
"""
Get window boundaries around timestamp.
Args:
timestamp: Event timestamp
Returns:
(start, end) tuple: (timestamp - before, timestamp + after)
"""
@property
def before(self) -> float:
"""Time range before event in seconds."""
@property
def after(self) -> float:
"""Time range after event in seconds."""
@property
def total_size(self) -> float:
"""Total window size (before + after)."""
@property
def ident(self) -> str:
"""Window identifier: 'sliding_{before}_{after}'."""Integration between windows and tables for time-based stateful aggregations. Windowed tables automatically partition data by time windows and manage window lifecycle.
class WindowedTable:
def __init__(
self,
table: Table,
window: Window,
*,
key_index_size: int = None
):
"""
Create windowed table wrapper.
Args:
table: Base table for storage
window: Window specification
key_index_size: Size of key index for cleanup
"""
def __getitem__(self, key_and_timestamp: tuple) -> any:
"""
Get value for key at specific timestamp.
Args:
key_and_timestamp: (key, timestamp) tuple
Returns:
Value in applicable window
"""
def __setitem__(self, key_and_timestamp: tuple, value: any) -> None:
"""
Set value for key at specific timestamp.
Args:
key_and_timestamp: (key, timestamp) tuple
value: Value to store
"""
def get_window(self, key: any, window_range: tuple) -> any:
"""
Get value for specific window range.
Args:
key: Table key
window_range: (start, end) window tuple
Returns:
Value in specified window
"""
def set_window(self, key: any, window_range: tuple, value: any) -> None:
"""
Set value for specific window range.
Args:
key: Table key
window_range: (start, end) window tuple
value: Value to store
"""
def expire_windows(self, latest_timestamp: float) -> int:
"""
Expire stale windows based on latest timestamp.
Args:
latest_timestamp: Latest observed timestamp
Returns:
Number of windows expired
"""
def windows_for_key(self, key: any) -> list:
"""
Get all active windows for a key.
Args:
key: Table key
Returns:
List of (window_range, value) tuples
"""
@property
def window(self) -> Window:
"""Window specification."""
@property
def table(self) -> Table:
"""Underlying table."""Utility functions and operations for working with windowed data, including aggregation helpers and window management utilities.
def current_window() -> tuple:
"""
Get current window range from stream context.
Returns:
(start, end) tuple for current window
Raises:
RuntimeError: If called outside windowed stream context
"""
def windowed_count(table: Table, window: Window) -> callable:
"""
Create windowed counting aggregator.
Args:
table: Table for storing counts
window: Window specification
Returns:
Function that increments count for windowed keys
"""
def windowed_sum(table: Table, window: Window) -> callable:
"""
Create windowed sum aggregator.
Args:
table: Table for storing sums
window: Window specification
Returns:
Function that adds values to windowed sums
"""
def windowed_average(
sum_table: Table,
count_table: Table,
window: Window
) -> callable:
"""
Create windowed average aggregator.
Args:
sum_table: Table for storing sums
count_table: Table for storing counts
window: Window specification
Returns:
Function that maintains windowed averages
"""
class WindowRange:
def __init__(self, start: float, end: float):
"""
Window range representation.
Args:
start: Window start timestamp
end: Window end timestamp
"""
def contains(self, timestamp: float) -> bool:
"""Check if timestamp falls within window range."""
def overlaps(self, other: 'WindowRange') -> bool:
"""Check if this window overlaps with another."""
@property
def duration(self) -> float:
"""Window duration in seconds."""
@property
def midpoint(self) -> float:
"""Window midpoint timestamp."""import faust
from faust import TumblingWindow
app = faust.App('windowing-app', broker='kafka://localhost:9092')
# 5-minute tumbling windows for counting events
event_counts = app.Table(
'event-counts',
default=int,
window=TumblingWindow(300) # 5 minutes
)
events_topic = app.topic('events', value_type=dict)
@app.agent(events_topic)
async def count_events(events):
async for event in events:
# Count events per category in 5-minute windows
category = event['category']
event_counts[category] += 1
@app.timer(interval=60.0)
async def print_window_stats():
# Print counts for current windows
import time
current_time = time.time()
for category, count in event_counts.items():
window_start = (current_time // 300) * 300
print(f"Category {category}: {count} events in window {window_start}")from faust import HoppingWindow
# 10-minute windows advancing every 5 minutes
sliding_averages = app.Table(
'sliding-averages',
default=lambda: {'sum': 0, 'count': 0},
window=HoppingWindow(size=600, step=300)
)
metrics_topic = app.topic('metrics', value_type=dict)
@app.agent(metrics_topic)
async def compute_sliding_average(metrics):
async for metric in metrics:
metric_name = metric['name']
value = metric['value']
# Update sliding average for overlapping windows
stats = sliding_averages[metric_name]
stats['sum'] += value
stats['count'] += 1
sliding_averages[metric_name] = stats
@app.timer(interval=300.0) # Every 5 minutes (step size)
async def report_sliding_averages():
for metric_name, stats in sliding_averages.items():
if stats['count'] > 0:
avg = stats['sum'] / stats['count']
print(f"Sliding average for {metric_name}: {avg}")from faust import SlidingWindow
# Session tracking with 30-minute timeout
user_sessions = app.Table(
'user-sessions',
default=lambda: {'start_time': None, 'last_activity': None, 'events': []},
window=SlidingWindow(before=1800, after=0) # 30 minutes before
)
activity_topic = app.topic('user-activity', value_type=dict)
@app.agent(activity_topic)
async def track_sessions(activities):
async for activity in activities:
user_id = activity['user_id']
timestamp = activity['timestamp']
# Get session data
session = user_sessions[user_id]
# Check if this continues existing session or starts new one
if (session['last_activity'] is None or
timestamp - session['last_activity'] > 1800): # 30 min timeout
# New session
session['start_time'] = timestamp
session['events'] = []
# Update session
session['last_activity'] = timestamp
session['events'].append(activity)
user_sessions[user_id] = sessionclass BusinessHoursWindow(faust.Window):
"""Custom window that only includes business hours."""
def __init__(self, *, start_hour=9, end_hour=17, timezone='UTC'):
super().__init__()
self.start_hour = start_hour
self.end_hour = end_hour
self.timezone = timezone
def ranges(self, timestamp):
from datetime import datetime
import pytz
tz = pytz.timezone(self.timezone)
dt = datetime.fromtimestamp(timestamp, tz)
# Check if timestamp is within business hours
if self.start_hour <= dt.hour < self.end_hour:
# Return daily business hours window
day_start = dt.replace(
hour=self.start_hour, minute=0, second=0, microsecond=0
)
day_end = dt.replace(
hour=self.end_hour, minute=0, second=0, microsecond=0
)
return [(day_start.timestamp(), day_end.timestamp())]
else:
return [] # Outside business hours
# Use custom window
business_metrics = app.Table(
'business-metrics',
default=int,
window=BusinessHoursWindow(start_hour=9, end_hour=17, timezone='US/Eastern')
)from faust import TumblingWindow
# Configure window expiration
hourly_stats = app.Table(
'hourly-stats',
default=int,
window=TumblingWindow(
size=3600, # 1 hour windows
expires=7200 # Keep windows for 2 hours
)
)
@app.timer(interval=300.0) # Every 5 minutes
async def cleanup_expired_windows():
"""Clean up expired windows to manage memory."""
import time
current_time = time.time()
# Force window expiration check
if hasattr(hourly_stats, 'expire_windows'):
expired_count = hourly_stats.expire_windows(current_time)
if expired_count > 0:
print(f"Expired {expired_count} old windows")from faust import TumblingWindow, HoppingWindow
# Multiple window sizes for different analysis
minute_counts = app.Table('minute-counts', default=int,
window=TumblingWindow(60))
hour_counts = app.Table('hour-counts', default=int,
window=TumblingWindow(3600))
sliding_counts = app.Table('sliding-counts', default=int,
window=HoppingWindow(size=600, step=60))
@app.agent()
async def multi_window_analysis(events):
async for event in events:
event_type = event['type']
# Update all window types simultaneously
minute_counts[event_type] += 1
hour_counts[event_type] += 1
sliding_counts[event_type] += 1
@app.timer(interval=60.0)
async def report_multi_window_stats():
print("=== Multi-Window Analysis ===")
for event_type in set(minute_counts.keys()) | set(hour_counts.keys()):
minute_count = minute_counts.get(event_type, 0)
hour_count = hour_counts.get(event_type, 0)
sliding_count = sliding_counts.get(event_type, 0)
print(f"{event_type}:")
print(f" Last minute: {minute_count}")
print(f" Last hour: {hour_count}")
print(f" Sliding 10min: {sliding_count}")from typing import Protocol, List, Tuple, Optional
class WindowT(Protocol):
"""Type interface for Window."""
expires: Optional[float]
ident: str
def ranges(self, timestamp: float) -> List[Tuple[float, float]]: ...
def stale(self, timestamp: float, latest_timestamp: float) -> bool: ...
def current(self, timestamp: float) -> Tuple[float, float]: ...
class TumblingWindowT(WindowT, Protocol):
"""Type interface for TumblingWindow."""
size: float
class HoppingWindowT(WindowT, Protocol):
"""Type interface for HoppingWindow."""
size: float
step: float
class SlidingWindowT(WindowT, Protocol):
"""Type interface for SlidingWindow."""
before: float
after: float
total_size: floatInstall with Tessl CLI
npx tessl i tessl/pypi-faust