CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-faust

Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines

Pending
Overview
Eval results
Files

windowing.mddocs/

Windowing

Time-based windowing operations for temporal data aggregation in Faust applications. Provides tumbling, hopping, and sliding window implementations for stream analytics with configurable time boundaries, expiration policies, and efficient state management.

Capabilities

Window Base Class

Abstract base class for all window implementations. Defines the common interface and behavior for time-based data partitioning and aggregation operations.

class Window:
    def __init__(self, *, expires: float = None, **kwargs):
        """
        Base window implementation.
        
        Args:
            expires: Window expiration time in seconds
        """

    def ranges(self, timestamp: float) -> list:
        """
        Get window ranges for a given timestamp.
        
        Args:
            timestamp: Event timestamp in seconds
            
        Returns:
            List of (start, end) tuples for applicable windows
        """

    def stale(self, timestamp: float, latest_timestamp: float) -> bool:
        """
        Check if window is stale and should be expired.
        
        Args:
            timestamp: Window timestamp
            latest_timestamp: Latest observed timestamp
            
        Returns:
            True if window should be expired
        """

    def current(self, timestamp: float) -> tuple:
        """
        Get current window range for timestamp.
        
        Args:
            timestamp: Event timestamp
            
        Returns:
            (start, end) tuple for current window
        """

    @property
    def expires(self) -> float:
        """Window expiration time in seconds."""

    @property
    def ident(self) -> str:
        """Unique identifier for this window type."""

Tumbling Windows

Fixed-size, non-overlapping windows that partition time into discrete intervals. Each event belongs to exactly one window, making them ideal for aggregations like counts, sums, and averages over regular time periods.

class TumblingWindow(Window):
    def __init__(self, size: float, *, expires: float = None):
        """
        Create tumbling window with fixed size.
        
        Args:
            size: Window size in seconds
            expires: Window expiration time (defaults to size * 2)
            
        Example:
            # 5-minute tumbling windows
            window = TumblingWindow(300)  # 300 seconds = 5 minutes
        """

    def ranges(self, timestamp: float) -> list:
        """
        Get single window range for timestamp.
        
        Args:
            timestamp: Event timestamp
            
        Returns:
            List containing single (start, end) tuple
        """

    def current(self, timestamp: float) -> tuple:
        """
        Get current window boundaries.
        
        Args:
            timestamp: Event timestamp
            
        Returns:
            (start, end) tuple for window containing timestamp
        """

    @property
    def size(self) -> float:
        """Window size in seconds."""

    @property
    def ident(self) -> str:
        """Window identifier: 'tumbling_{size}'."""

Hopping Windows

Fixed-size, overlapping windows that advance by a smaller step size. Events can belong to multiple windows, enabling sliding aggregations and overlapping time-based analysis.

class HoppingWindow(Window):
    def __init__(self, size: float, step: float, *, expires: float = None):
        """
        Create hopping window with size and step.
        
        Args:
            size: Window size in seconds
            step: Step size (advance interval) in seconds
            expires: Window expiration time (defaults to size * 2)
            
        Example:
            # 10-minute windows advancing every 5 minutes
            window = HoppingWindow(size=600, step=300)
        """

    def ranges(self, timestamp: float) -> list:
        """
        Get multiple overlapping window ranges.
        
        Args:
            timestamp: Event timestamp
            
        Returns:
            List of (start, end) tuples for overlapping windows
        """

    def current(self, timestamp: float) -> tuple:
        """
        Get most recent window containing timestamp.
        
        Args:
            timestamp: Event timestamp
            
        Returns:
            (start, end) tuple for latest applicable window
        """

    @property
    def size(self) -> float:
        """Window size in seconds."""

    @property
    def step(self) -> float:
        """Step size in seconds."""

    @property
    def ident(self) -> str:
        """Window identifier: 'hopping_{size}_{step}'."""

Sliding Windows

Variable-size windows that expand around each event timestamp. Useful for time-range queries and event correlation within flexible time boundaries.

class SlidingWindow(Window):
    def __init__(self, before: float, after: float, *, expires: float = None):
        """
        Create sliding window with before/after ranges.
        
        Args:
            before: Time range before event timestamp (seconds)
            after: Time range after event timestamp (seconds)
            expires: Window expiration time (defaults to before + after + 60)
            
        Example:
            # 5 minutes before, 2 minutes after each event
            window = SlidingWindow(before=300, after=120)
        """

    def ranges(self, timestamp: float) -> list:
        """
        Get window range centered on timestamp.
        
        Args:
            timestamp: Event timestamp
            
        Returns:
            List containing single (start, end) tuple
        """

    def current(self, timestamp: float) -> tuple:
        """
        Get window boundaries around timestamp.
        
        Args:
            timestamp: Event timestamp
            
        Returns:
            (start, end) tuple: (timestamp - before, timestamp + after)
        """

    @property
    def before(self) -> float:
        """Time range before event in seconds."""

    @property
    def after(self) -> float:
        """Time range after event in seconds."""

    @property
    def total_size(self) -> float:
        """Total window size (before + after)."""

    @property
    def ident(self) -> str:
        """Window identifier: 'sliding_{before}_{after}'."""

Windowed Tables

Integration between windows and tables for time-based stateful aggregations. Windowed tables automatically partition data by time windows and manage window lifecycle.

class WindowedTable:
    def __init__(
        self,
        table: Table,
        window: Window,
        *,
        key_index_size: int = None
    ):
        """
        Create windowed table wrapper.
        
        Args:
            table: Base table for storage
            window: Window specification
            key_index_size: Size of key index for cleanup
        """

    def __getitem__(self, key_and_timestamp: tuple) -> any:
        """
        Get value for key at specific timestamp.
        
        Args:
            key_and_timestamp: (key, timestamp) tuple
            
        Returns:
            Value in applicable window
        """

    def __setitem__(self, key_and_timestamp: tuple, value: any) -> None:
        """
        Set value for key at specific timestamp.
        
        Args:
            key_and_timestamp: (key, timestamp) tuple
            value: Value to store
        """

    def get_window(self, key: any, window_range: tuple) -> any:
        """
        Get value for specific window range.
        
        Args:
            key: Table key
            window_range: (start, end) window tuple
            
        Returns:
            Value in specified window
        """

    def set_window(self, key: any, window_range: tuple, value: any) -> None:
        """
        Set value for specific window range.
        
        Args:
            key: Table key
            window_range: (start, end) window tuple
            value: Value to store
        """

    def expire_windows(self, latest_timestamp: float) -> int:
        """
        Expire stale windows based on latest timestamp.
        
        Args:
            latest_timestamp: Latest observed timestamp
            
        Returns:
            Number of windows expired
        """

    def windows_for_key(self, key: any) -> list:
        """
        Get all active windows for a key.
        
        Args:
            key: Table key
            
        Returns:
            List of (window_range, value) tuples
        """

    @property
    def window(self) -> Window:
        """Window specification."""

    @property
    def table(self) -> Table:
        """Underlying table."""

Window Operations

Utility functions and operations for working with windowed data, including aggregation helpers and window management utilities.

def current_window() -> tuple:
    """
    Get current window range from stream context.
    
    Returns:
        (start, end) tuple for current window
        
    Raises:
        RuntimeError: If called outside windowed stream context
    """

def windowed_count(table: Table, window: Window) -> callable:
    """
    Create windowed counting aggregator.
    
    Args:
        table: Table for storing counts
        window: Window specification
        
    Returns:
        Function that increments count for windowed keys
    """

def windowed_sum(table: Table, window: Window) -> callable:
    """
    Create windowed sum aggregator.
    
    Args:
        table: Table for storing sums
        window: Window specification
        
    Returns:
        Function that adds values to windowed sums
    """

def windowed_average(
    sum_table: Table,
    count_table: Table,
    window: Window
) -> callable:
    """
    Create windowed average aggregator.
    
    Args:
        sum_table: Table for storing sums
        count_table: Table for storing counts
        window: Window specification
        
    Returns:
        Function that maintains windowed averages
    """

class WindowRange:
    def __init__(self, start: float, end: float):
        """
        Window range representation.
        
        Args:
            start: Window start timestamp
            end: Window end timestamp
        """

    def contains(self, timestamp: float) -> bool:
        """Check if timestamp falls within window range."""

    def overlaps(self, other: 'WindowRange') -> bool:
        """Check if this window overlaps with another."""

    @property
    def duration(self) -> float:
        """Window duration in seconds."""

    @property
    def midpoint(self) -> float:
        """Window midpoint timestamp."""

Usage Examples

Tumbling Window Aggregation

import faust
from faust import TumblingWindow

app = faust.App('windowing-app', broker='kafka://localhost:9092')

# 5-minute tumbling windows for counting events
event_counts = app.Table(
    'event-counts',
    default=int,
    window=TumblingWindow(300)  # 5 minutes
)

events_topic = app.topic('events', value_type=dict)

@app.agent(events_topic)
async def count_events(events):
    async for event in events:
        # Count events per category in 5-minute windows
        category = event['category']
        event_counts[category] += 1

@app.timer(interval=60.0)
async def print_window_stats():
    # Print counts for current windows
    import time
    current_time = time.time()
    
    for category, count in event_counts.items():
        window_start = (current_time // 300) * 300
        print(f"Category {category}: {count} events in window {window_start}")

Hopping Window Analytics

from faust import HoppingWindow

# 10-minute windows advancing every 5 minutes
sliding_averages = app.Table(
    'sliding-averages',
    default=lambda: {'sum': 0, 'count': 0},
    window=HoppingWindow(size=600, step=300)
)

metrics_topic = app.topic('metrics', value_type=dict)

@app.agent(metrics_topic)
async def compute_sliding_average(metrics):
    async for metric in metrics:
        metric_name = metric['name']
        value = metric['value']
        
        # Update sliding average for overlapping windows
        stats = sliding_averages[metric_name]
        stats['sum'] += value
        stats['count'] += 1
        sliding_averages[metric_name] = stats

@app.timer(interval=300.0)  # Every 5 minutes (step size)
async def report_sliding_averages():
    for metric_name, stats in sliding_averages.items():
        if stats['count'] > 0:
            avg = stats['sum'] / stats['count']
            print(f"Sliding average for {metric_name}: {avg}")

Session Windows with Sliding Window

from faust import SlidingWindow

# Session tracking with 30-minute timeout
user_sessions = app.Table(
    'user-sessions',
    default=lambda: {'start_time': None, 'last_activity': None, 'events': []},
    window=SlidingWindow(before=1800, after=0)  # 30 minutes before
)

activity_topic = app.topic('user-activity', value_type=dict)

@app.agent(activity_topic)
async def track_sessions(activities):
    async for activity in activities:
        user_id = activity['user_id']
        timestamp = activity['timestamp']
        
        # Get session data
        session = user_sessions[user_id]
        
        # Check if this continues existing session or starts new one
        if (session['last_activity'] is None or
            timestamp - session['last_activity'] > 1800):  # 30 min timeout
            # New session
            session['start_time'] = timestamp
            session['events'] = []
        
        # Update session
        session['last_activity'] = timestamp
        session['events'].append(activity)
        user_sessions[user_id] = session

Custom Window Implementation

class BusinessHoursWindow(faust.Window):
    """Custom window that only includes business hours."""
    
    def __init__(self, *, start_hour=9, end_hour=17, timezone='UTC'):
        super().__init__()
        self.start_hour = start_hour
        self.end_hour = end_hour
        self.timezone = timezone
    
    def ranges(self, timestamp):
        from datetime import datetime
        import pytz
        
        tz = pytz.timezone(self.timezone)
        dt = datetime.fromtimestamp(timestamp, tz)
        
        # Check if timestamp is within business hours
        if self.start_hour <= dt.hour < self.end_hour:
            # Return daily business hours window
            day_start = dt.replace(
                hour=self.start_hour, minute=0, second=0, microsecond=0
            )
            day_end = dt.replace(
                hour=self.end_hour, minute=0, second=0, microsecond=0
            )
            return [(day_start.timestamp(), day_end.timestamp())]
        else:
            return []  # Outside business hours

# Use custom window
business_metrics = app.Table(
    'business-metrics',
    default=int,
    window=BusinessHoursWindow(start_hour=9, end_hour=17, timezone='US/Eastern')
)

Window Expiration and Cleanup

from faust import TumblingWindow

# Configure window expiration
hourly_stats = app.Table(
    'hourly-stats',
    default=int,
    window=TumblingWindow(
        size=3600,  # 1 hour windows
        expires=7200  # Keep windows for 2 hours
    )
)

@app.timer(interval=300.0)  # Every 5 minutes
async def cleanup_expired_windows():
    """Clean up expired windows to manage memory."""
    import time
    current_time = time.time()
    
    # Force window expiration check
    if hasattr(hourly_stats, 'expire_windows'):
        expired_count = hourly_stats.expire_windows(current_time)
        if expired_count > 0:
            print(f"Expired {expired_count} old windows")

Multi-Window Analysis

from faust import TumblingWindow, HoppingWindow

# Multiple window sizes for different analysis
minute_counts = app.Table('minute-counts', default=int, 
                         window=TumblingWindow(60))
hour_counts = app.Table('hour-counts', default=int,
                       window=TumblingWindow(3600))
sliding_counts = app.Table('sliding-counts', default=int,
                          window=HoppingWindow(size=600, step=60))

@app.agent()
async def multi_window_analysis(events):
    async for event in events:
        event_type = event['type']
        
        # Update all window types simultaneously
        minute_counts[event_type] += 1
        hour_counts[event_type] += 1
        sliding_counts[event_type] += 1

@app.timer(interval=60.0)
async def report_multi_window_stats():
    print("=== Multi-Window Analysis ===")
    
    for event_type in set(minute_counts.keys()) | set(hour_counts.keys()):
        minute_count = minute_counts.get(event_type, 0)
        hour_count = hour_counts.get(event_type, 0)
        sliding_count = sliding_counts.get(event_type, 0)
        
        print(f"{event_type}:")
        print(f"  Last minute: {minute_count}")
        print(f"  Last hour: {hour_count}")
        print(f"  Sliding 10min: {sliding_count}")

Type Interfaces

from typing import Protocol, List, Tuple, Optional

class WindowT(Protocol):
    """Type interface for Window."""
    
    expires: Optional[float]
    ident: str
    
    def ranges(self, timestamp: float) -> List[Tuple[float, float]]: ...
    def stale(self, timestamp: float, latest_timestamp: float) -> bool: ...
    def current(self, timestamp: float) -> Tuple[float, float]: ...

class TumblingWindowT(WindowT, Protocol):
    """Type interface for TumblingWindow."""
    
    size: float

class HoppingWindowT(WindowT, Protocol):
    """Type interface for HoppingWindow."""
    
    size: float
    step: float

class SlidingWindowT(WindowT, Protocol):
    """Type interface for SlidingWindow."""
    
    before: float
    after: float
    total_size: float

Install with Tessl CLI

npx tessl i tessl/pypi-faust

docs

authentication.md

cli-framework.md

core-application.md

data-management.md

index.md

monitoring.md

serialization.md

stream-processing.md

topics-channels.md

windowing.md

worker-management.md

tile.json