CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-faust

Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines

Pending
Overview
Eval results
Files

monitoring.mddocs/

Monitoring and Sensors

Monitoring, metrics collection, and sensor framework for observability in Faust applications. Provides comprehensive instrumentation for tracking message flow, table operations, consumer lag, performance metrics, and custom application-specific measurements.

Capabilities

Sensor Interface

Base sensor interface for collecting metrics and monitoring events throughout the Faust application lifecycle. Sensors receive callbacks for various system events and can implement custom monitoring logic.

class Sensor:
    def __init__(self, **kwargs):
        """
        Base sensor implementation for collecting metrics.
        
        Args:
            **kwargs: Sensor-specific configuration
        """

    def on_message_in(self, tp: str, offset: int, message: any) -> None:
        """
        Called when message is received from broker.
        
        Args:
            tp: Topic partition identifier
            offset: Message offset
            message: Raw message object
        """

    def on_message_out(self, tp: str, offset: int, message: any) -> None:
        """
        Called when message is sent to broker.
        
        Args:
            tp: Topic partition identifier
            offset: Message offset
            message: Raw message object
        """

    def on_stream_event_in(self, tp: str, offset: int, stream: any, event: any) -> None:
        """
        Called when event enters stream processing.
        
        Args:
            tp: Topic partition
            offset: Message offset
            stream: Stream instance
            event: Stream event
        """

    def on_stream_event_out(self, tp: str, offset: int, stream: any, event: any) -> None:
        """
        Called when event exits stream processing.
        
        Args:
            tp: Topic partition
            offset: Message offset
            stream: Stream instance
            event: Stream event
        """

    def on_table_get(self, table: any, key: any) -> None:
        """
        Called when table key is accessed.
        
        Args:
            table: Table instance
            key: Accessed key
        """

    def on_table_set(self, table: any, key: any, value: any) -> None:
        """
        Called when table key is modified.
        
        Args:
            table: Table instance
            key: Modified key
            value: New value
        """

    def on_table_del(self, table: any, key: any) -> None:
        """
        Called when table key is deleted.
        
        Args:
            table: Table instance
            key: Deleted key
        """

    def on_commit_initiated(self, consumer: any) -> None:
        """
        Called when consumer commit starts.
        
        Args:
            consumer: Consumer instance
        """

    def on_commit_completed(self, consumer: any, state: dict) -> None:
        """
        Called when consumer commit completes.
        
        Args:
            consumer: Consumer instance
            state: Commit state information
        """

    def on_send_initiated(self, producer: any, topic: str, message: any) -> None:
        """
        Called when producer send starts.
        
        Args:
            producer: Producer instance
            topic: Target topic
            message: Message being sent
        """

    def on_send_completed(self, producer: any, state: dict) -> None:
        """
        Called when producer send completes.
        
        Args:
            producer: Producer instance
            state: Send completion state
        """

    def on_send_error(self, producer: any, exc: Exception, state: dict) -> None:
        """
        Called when producer send fails.
        
        Args:
            producer: Producer instance
            exc: Exception that occurred
            state: Send error state
        """

    def asdict(self) -> dict:
        """
        Return sensor metrics as dictionary.
        
        Returns:
            Dictionary of collected metrics
        """

Monitor Implementation

Comprehensive monitoring implementation that extends the sensor interface with built-in metrics collection, performance tracking, and health monitoring capabilities.

class Monitor(Sensor):
    def __init__(
        self,
        *,
        max_avg_history: int = 100,
        max_commit_latency_history: int = 30,
        max_send_latency_history: int = 30,
        **kwargs
    ):
        """
        Enhanced sensor with built-in metrics collection.
        
        Args:
            max_avg_history: Size of averaging window for metrics
            max_commit_latency_history: History size for commit latency
            max_send_latency_history: History size for send latency
        """

    def messages_received_total(self) -> int:
        """Total number of messages received."""

    def messages_sent_total(self) -> int:
        """Total number of messages sent."""

    def messages_received_per_second(self) -> float:
        """Messages received per second (recent average)."""

    def messages_sent_per_second(self) -> float:
        """Messages sent per second (recent average)."""

    def events_total(self) -> int:
        """Total number of stream events processed."""

    def events_per_second(self) -> float:
        """Stream events per second (recent average)."""

    def tables_contains_total(self) -> int:
        """Total table key lookups."""

    def tables_get_total(self) -> int:
        """Total table get operations."""

    def tables_set_total(self) -> int:
        """Total table set operations."""

    def tables_del_total(self) -> int:
        """Total table delete operations."""

    def commit_latency_avg(self) -> float:
        """Average commit latency in seconds."""

    def send_latency_avg(self) -> float:
        """Average send latency in seconds."""

    def commit_latency_max(self) -> float:
        """Maximum commit latency in seconds."""

    def send_latency_max(self) -> float:
        """Maximum send latency in seconds."""

    def assignment_latency_avg(self) -> float:
        """Average partition assignment latency."""

    def assignment_error_total(self) -> int:
        """Total partition assignment errors."""

    def rebalances_total(self) -> int:
        """Total consumer rebalances."""

    def rebalance_return_latency_avg(self) -> float:
        """Average rebalance completion latency."""

    def topic_buffer_full_total(self) -> int:
        """Total topic buffer full events."""

    @property
    def max_avg_history(self) -> int:
        """Size of averaging window."""

    @property
    def max_commit_latency_history(self) -> int:
        """Commit latency history size."""

    @property
    def max_send_latency_history(self) -> int:
        """Send latency history size."""

Custom Sensors

Framework for implementing custom sensors tailored to specific monitoring requirements, including application-specific metrics, external system integration, and alerting capabilities.

class CustomSensor(Sensor):
    def __init__(self, name: str, **kwargs):
        """
        Base class for custom sensor implementations.
        
        Args:
            name: Sensor name for identification
            **kwargs: Custom configuration
        """
        super().__init__(**kwargs)
        self.name = name
        self._metrics = {}

    def record_metric(self, key: str, value: any, *, timestamp: float = None) -> None:
        """
        Record custom metric value.
        
        Args:
            key: Metric name
            value: Metric value
            timestamp: Optional timestamp (defaults to current time)
        """

    def increment_counter(self, key: str, delta: int = 1) -> None:
        """
        Increment counter metric.
        
        Args:
            key: Counter name
            delta: Increment amount
        """

    def record_histogram(self, key: str, value: float) -> None:
        """
        Record histogram value.
        
        Args:
            key: Histogram name
            value: Sample value
        """

    def set_gauge(self, key: str, value: float) -> None:
        """
        Set gauge metric value.
        
        Args:
            key: Gauge name
            value: Current value
        """

    def get_metrics(self) -> dict:
        """
        Get all collected metrics.
        
        Returns:
            Dictionary of metrics
        """

    @property
    def name(self) -> str:
        """Sensor name."""

class PrometheusMonitor(Monitor):
    """Monitor that exports metrics in Prometheus format."""
    
    def __init__(self, *, registry=None, namespace='faust', **kwargs):
        """
        Prometheus metrics exporter.
        
        Args:
            registry: Prometheus registry (optional)
            namespace: Metric namespace prefix
        """
        super().__init__(**kwargs)

    def setup_prometheus_metrics(self) -> None:
        """Initialize Prometheus metric objects."""

    def export_metrics(self) -> str:
        """
        Export metrics in Prometheus format.
        
        Returns:
            Prometheus-formatted metrics string
        """

class StatsDMonitor(Monitor):
    """Monitor that sends metrics to StatsD."""
    
    def __init__(self, *, host='localhost', port=8125, prefix='faust', **kwargs):
        """
        StatsD metrics exporter.
        
        Args:
            host: StatsD server host
            port: StatsD server port
            prefix: Metric prefix
        """
        super().__init__(**kwargs)

    def send_metric(self, name: str, value: any, metric_type: str) -> None:
        """Send metric to StatsD server."""

Logging Integration

Integration with Python logging system for structured logging, log correlation, and centralized log management with contextual information from stream processing.

class LoggingSensor(Sensor):
    def __init__(
        self,
        *,
        logger_name: str = 'faust.sensor',
        level: int = logging.INFO,
        format_string: str = None,
        **kwargs
    ):
        """
        Sensor that logs events to Python logging system.
        
        Args:
            logger_name: Logger name
            level: Default logging level
            format_string: Custom log format
        """

    def log_message_in(self, tp: str, offset: int, message: any) -> None:
        """Log incoming message details."""

    def log_message_out(self, tp: str, offset: int, message: any) -> None:
        """Log outgoing message details."""

    def log_table_operation(self, operation: str, table: any, key: any, value: any = None) -> None:
        """Log table operations with context."""

    def log_performance_metric(self, metric_name: str, value: float, context: dict = None) -> None:
        """Log performance metrics with context."""

def configure_sensor_logging(
    app: App,
    *,
    level: int = logging.INFO,
    format_string: str = None,
    include_tracing: bool = True
) -> LoggingSensor:
    """
    Configure logging sensor for application.
    
    Args:
        app: Faust application
        level: Logging level
        format_string: Log format
        include_tracing: Include trace information
        
    Returns:
        Configured logging sensor
    """

Health Monitoring

Health check and application status monitoring with automatic detection of unhealthy conditions, consumer lag monitoring, and system resource tracking.

class HealthMonitor(Monitor):
    def __init__(
        self,
        *,
        lag_threshold: float = 1000,
        error_rate_threshold: float = 0.1,
        check_interval: float = 30.0,
        **kwargs
    ):
        """
        Health monitoring sensor with automatic alerting.
        
        Args:
            lag_threshold: Consumer lag threshold for alerts
            error_rate_threshold: Error rate threshold (0.0-1.0)
            check_interval: Health check interval in seconds
        """

    def check_consumer_lag(self) -> dict:
        """
        Check consumer lag across all partitions.
        
        Returns:
            Dictionary of partition lag information
        """

    def check_error_rate(self) -> float:
        """
        Calculate recent error rate.
        
        Returns:
            Error rate as percentage (0.0-1.0)
        """

    def check_memory_usage(self) -> dict:
        """
        Check application memory usage.
        
        Returns:
            Memory usage statistics
        """

    def is_healthy(self) -> bool:
        """
        Overall health status check.
        
        Returns:
            True if application is healthy
        """

    def get_health_report(self) -> dict:
        """
        Generate comprehensive health report.
        
        Returns:
            Dictionary with health metrics and status
        """

    def alert_on_unhealthy_condition(self, condition: str, details: dict) -> None:
        """
        Trigger alert for unhealthy condition.
        
        Args:
            condition: Condition type (lag, errors, memory, etc.)
            details: Condition details for alert
        """

Sensor Management

Utilities for managing multiple sensors, sensor registration, and coordinated metrics collection across different monitoring systems.

class SensorDelegate:
    def __init__(self, *sensors: Sensor):
        """
        Delegate sensor that forwards events to multiple sensors.
        
        Args:
            *sensors: Sensor instances to delegate to
        """

    def add_sensor(self, sensor: Sensor) -> None:
        """
        Add sensor to delegation list.
        
        Args:
            sensor: Sensor instance to add
        """

    def remove_sensor(self, sensor: Sensor) -> None:
        """
        Remove sensor from delegation list.
        
        Args:
            sensor: Sensor instance to remove
        """

    def get_combined_metrics(self) -> dict:
        """
        Get combined metrics from all sensors.
        
        Returns:
            Merged metrics dictionary
        """

def setup_monitoring(
    app: App,
    *,
    prometheus: bool = False,
    statsd: bool = False,
    logging: bool = True,
    health_checks: bool = True,
    custom_sensors: list = None
) -> SensorDelegate:
    """
    Setup comprehensive monitoring for application.
    
    Args:
        app: Faust application
        prometheus: Enable Prometheus metrics
        statsd: Enable StatsD metrics
        logging: Enable logging sensor
        health_checks: Enable health monitoring
        custom_sensors: Additional custom sensors
        
    Returns:
        Configured sensor delegate
    """

Usage Examples

Basic Monitoring Setup

import faust

app = faust.App('monitored-app', broker='kafka://localhost:9092')

# Enable built-in monitoring
monitor = faust.Monitor()
app.monitor = monitor

events_topic = app.topic('events', value_type=dict)

@app.agent(events_topic)
async def process_events(events):
    async for event in events:
        # Processing logic here
        print(f"Processing event: {event}")

@app.timer(interval=30.0)
async def print_metrics():
    """Print monitoring metrics every 30 seconds."""
    print(f"Messages received: {monitor.messages_received_total()}")
    print(f"Events per second: {monitor.events_per_second():.2f}")
    print(f"Table operations: {monitor.tables_get_total()}")
    print(f"Commit latency: {monitor.commit_latency_avg():.3f}s")

Custom Sensor Implementation

import time
from faust import Sensor

class BusinessMetricsSensor(Sensor):
    def __init__(self):
        super().__init__()
        self.order_count = 0
        self.revenue_total = 0.0
        self.processing_times = []
        self.start_time = time.time()

    def on_stream_event_in(self, tp, offset, stream, event):
        # Track event processing start
        event._processing_start = time.time()

    def on_stream_event_out(self, tp, offset, stream, event):
        # Calculate processing time
        if hasattr(event, '_processing_start'):
            processing_time = time.time() - event._processing_start
            self.processing_times.append(processing_time)
            
            # Keep only recent processing times
            if len(self.processing_times) > 1000:
                self.processing_times = self.processing_times[-1000:]

    def record_order(self, amount: float):
        """Record business order metrics."""
        self.order_count += 1
        self.revenue_total += amount

    def get_business_metrics(self):
        """Get business-specific metrics."""
        uptime = time.time() - self.start_time
        avg_processing_time = (
            sum(self.processing_times) / len(self.processing_times)
            if self.processing_times else 0
        )
        
        return {
            'orders_total': self.order_count,
            'revenue_total': self.revenue_total,
            'orders_per_hour': self.order_count / (uptime / 3600),
            'avg_processing_time': avg_processing_time,
            'uptime_seconds': uptime
        }

# Use custom sensor
business_sensor = BusinessMetricsSensor()
app.monitor = business_sensor

@app.agent()
async def process_orders(orders):
    async for order in orders:
        # Process order
        amount = order['amount']
        business_sensor.record_order(amount)

Prometheus Integration

import faust
from prometheus_client import Counter, Histogram, Gauge, generate_latest

class PrometheusMonitor(faust.Monitor):
    def __init__(self):
        super().__init__()
        
        # Prometheus metrics
        self.messages_received = Counter('faust_messages_received_total', 'Total received messages')
        self.messages_sent = Counter('faust_messages_sent_total', 'Total sent messages')
        self.processing_duration = Histogram('faust_processing_duration_seconds', 'Processing duration')
        self.consumer_lag = Gauge('faust_consumer_lag', 'Consumer lag', ['topic', 'partition'])

    def on_message_in(self, tp, offset, message):
        super().on_message_in(tp, offset, message)
        self.messages_received.inc()

    def on_message_out(self, tp, offset, message):
        super().on_message_out(tp, offset, message)
        self.messages_sent.inc()

    def on_stream_event_in(self, tp, offset, stream, event):
        super().on_stream_event_in(tp, offset, stream, event)
        event._start_time = time.time()

    def on_stream_event_out(self, tp, offset, stream, event):
        super().on_stream_event_out(tp, offset, stream, event)
        if hasattr(event, '_start_time'):
            duration = time.time() - event._start_time
            self.processing_duration.observe(duration)

    def export_metrics(self):
        """Export metrics in Prometheus format."""
        return generate_latest()

# Setup Prometheus endpoint
prometheus_monitor = PrometheusMonitor()
app.monitor = prometheus_monitor

@app.page('/metrics')
async def metrics_endpoint(web, request):
    """Expose Prometheus metrics endpoint."""
    return web.Response(
        text=prometheus_monitor.export_metrics(),
        content_type='text/plain'
    )

Health Monitoring

import psutil
from faust import Monitor

class HealthMonitor(Monitor):
    def __init__(self):
        super().__init__()
        self.error_count = 0
        self.last_health_check = time.time()
        self.health_status = {'healthy': True, 'issues': []}

    def on_send_error(self, producer, exc, state):
        super().on_send_error(producer, exc, state)
        self.error_count += 1

    def check_health(self):
        """Comprehensive health check."""
        issues = []
        
        # Check error rate
        if self.error_count > 10:  # More than 10 errors
            issues.append(f"High error count: {self.error_count}")
        
        # Check memory usage
        memory_percent = psutil.virtual_memory().percent
        if memory_percent > 90:
            issues.append(f"High memory usage: {memory_percent}%")
        
        # Check processing lag
        events_per_sec = self.events_per_second()
        if events_per_sec == 0:  # No events being processed
            issues.append("No events being processed")
        
        # Update health status
        self.health_status = {
            'healthy': len(issues) == 0,
            'issues': issues,
            'timestamp': time.time(),
            'uptime': time.time() - self.last_health_check,
            'memory_usage': memory_percent,
            'events_per_second': events_per_sec
        }
        
        return self.health_status

app.monitor = HealthMonitor()

@app.timer(interval=60.0)
async def health_check():
    """Periodic health monitoring."""
    health = app.monitor.check_health()
    
    if not health['healthy']:
        print(f"HEALTH WARNING: {health['issues']}")
        
        # Could send alerts to external systems here
        # await send_alert_to_slack(health)
        # await send_alert_to_pagerduty(health)

@app.page('/health')
async def health_endpoint(web, request):
    """Health check endpoint."""
    health = app.monitor.check_health()
    status_code = 200 if health['healthy'] else 503
    
    return web.json_response(health, status=status_code)

Multi-Sensor Setup

from faust import SensorDelegate

# Create multiple specialized sensors
prometheus_sensor = PrometheusMonitor()
business_sensor = BusinessMetricsSensor()
health_sensor = HealthMonitor()
logging_sensor = faust.LoggingSensor()

# Combine all sensors
multi_sensor = SensorDelegate(
    prometheus_sensor,
    business_sensor, 
    health_sensor,
    logging_sensor
)

app.monitor = multi_sensor

@app.timer(interval=300.0)  # Every 5 minutes
async def comprehensive_monitoring():
    """Comprehensive monitoring report."""
    
    # Get metrics from all sensors
    prometheus_metrics = prometheus_sensor.export_metrics()
    business_metrics = business_sensor.get_business_metrics()
    health_status = health_sensor.check_health()
    
    print("=== Monitoring Report ===")
    print(f"Business: {business_metrics}")
    print(f"Health: {health_status}")
    
    # Could send to monitoring dashboard
    # await send_to_monitoring_dashboard({
    #     'business': business_metrics,
    #     'health': health_status,
    #     'timestamp': time.time()
    # })

Type Interfaces

from typing import Protocol, Dict, Any, Optional

class SensorT(Protocol):
    """Type interface for Sensor."""
    
    def on_message_in(self, tp: str, offset: int, message: Any) -> None: ...
    def on_message_out(self, tp: str, offset: int, message: Any) -> None: ...
    def on_table_get(self, table: Any, key: Any) -> None: ...
    def on_table_set(self, table: Any, key: Any, value: Any) -> None: ...
    def on_commit_completed(self, consumer: Any, state: Dict) -> None: ...
    def asdict(self) -> Dict[str, Any]: ...

class MonitorT(SensorT, Protocol):
    """Type interface for Monitor."""
    
    def messages_received_total(self) -> int: ...
    def messages_sent_total(self) -> int: ...
    def events_per_second(self) -> float: ...
    def commit_latency_avg(self) -> float: ...

Install with Tessl CLI

npx tessl i tessl/pypi-faust

docs

authentication.md

cli-framework.md

core-application.md

data-management.md

index.md

monitoring.md

serialization.md

stream-processing.md

topics-channels.md

windowing.md

worker-management.md

tile.json