0
# Monitoring and Sensors
1
2
Monitoring, metrics collection, and sensor framework for observability in Faust applications. Provides comprehensive instrumentation for tracking message flow, table operations, consumer lag, performance metrics, and custom application-specific measurements.
3
4
## Capabilities
5
6
### Sensor Interface
7
8
Base sensor interface for collecting metrics and monitoring events throughout the Faust application lifecycle. Sensors receive callbacks for various system events and can implement custom monitoring logic.
9
10
```python { .api }
11
class Sensor:
12
def __init__(self, **kwargs):
13
"""
14
Base sensor implementation for collecting metrics.
15
16
Args:
17
**kwargs: Sensor-specific configuration
18
"""
19
20
def on_message_in(self, tp: str, offset: int, message: any) -> None:
21
"""
22
Called when message is received from broker.
23
24
Args:
25
tp: Topic partition identifier
26
offset: Message offset
27
message: Raw message object
28
"""
29
30
def on_message_out(self, tp: str, offset: int, message: any) -> None:
31
"""
32
Called when message is sent to broker.
33
34
Args:
35
tp: Topic partition identifier
36
offset: Message offset
37
message: Raw message object
38
"""
39
40
def on_stream_event_in(self, tp: str, offset: int, stream: any, event: any) -> None:
41
"""
42
Called when event enters stream processing.
43
44
Args:
45
tp: Topic partition
46
offset: Message offset
47
stream: Stream instance
48
event: Stream event
49
"""
50
51
def on_stream_event_out(self, tp: str, offset: int, stream: any, event: any) -> None:
52
"""
53
Called when event exits stream processing.
54
55
Args:
56
tp: Topic partition
57
offset: Message offset
58
stream: Stream instance
59
event: Stream event
60
"""
61
62
def on_table_get(self, table: any, key: any) -> None:
63
"""
64
Called when table key is accessed.
65
66
Args:
67
table: Table instance
68
key: Accessed key
69
"""
70
71
def on_table_set(self, table: any, key: any, value: any) -> None:
72
"""
73
Called when table key is modified.
74
75
Args:
76
table: Table instance
77
key: Modified key
78
value: New value
79
"""
80
81
def on_table_del(self, table: any, key: any) -> None:
82
"""
83
Called when table key is deleted.
84
85
Args:
86
table: Table instance
87
key: Deleted key
88
"""
89
90
def on_commit_initiated(self, consumer: any) -> None:
91
"""
92
Called when consumer commit starts.
93
94
Args:
95
consumer: Consumer instance
96
"""
97
98
def on_commit_completed(self, consumer: any, state: dict) -> None:
99
"""
100
Called when consumer commit completes.
101
102
Args:
103
consumer: Consumer instance
104
state: Commit state information
105
"""
106
107
def on_send_initiated(self, producer: any, topic: str, message: any) -> None:
108
"""
109
Called when producer send starts.
110
111
Args:
112
producer: Producer instance
113
topic: Target topic
114
message: Message being sent
115
"""
116
117
def on_send_completed(self, producer: any, state: dict) -> None:
118
"""
119
Called when producer send completes.
120
121
Args:
122
producer: Producer instance
123
state: Send completion state
124
"""
125
126
def on_send_error(self, producer: any, exc: Exception, state: dict) -> None:
127
"""
128
Called when producer send fails.
129
130
Args:
131
producer: Producer instance
132
exc: Exception that occurred
133
state: Send error state
134
"""
135
136
def asdict(self) -> dict:
137
"""
138
Return sensor metrics as dictionary.
139
140
Returns:
141
Dictionary of collected metrics
142
"""
143
```
144
145
### Monitor Implementation
146
147
Comprehensive monitoring implementation that extends the sensor interface with built-in metrics collection, performance tracking, and health monitoring capabilities.
148
149
```python { .api }
150
class Monitor(Sensor):
151
def __init__(
152
self,
153
*,
154
max_avg_history: int = 100,
155
max_commit_latency_history: int = 30,
156
max_send_latency_history: int = 30,
157
**kwargs
158
):
159
"""
160
Enhanced sensor with built-in metrics collection.
161
162
Args:
163
max_avg_history: Size of averaging window for metrics
164
max_commit_latency_history: History size for commit latency
165
max_send_latency_history: History size for send latency
166
"""
167
168
def messages_received_total(self) -> int:
169
"""Total number of messages received."""
170
171
def messages_sent_total(self) -> int:
172
"""Total number of messages sent."""
173
174
def messages_received_per_second(self) -> float:
175
"""Messages received per second (recent average)."""
176
177
def messages_sent_per_second(self) -> float:
178
"""Messages sent per second (recent average)."""
179
180
def events_total(self) -> int:
181
"""Total number of stream events processed."""
182
183
def events_per_second(self) -> float:
184
"""Stream events per second (recent average)."""
185
186
def tables_contains_total(self) -> int:
187
"""Total table key lookups."""
188
189
def tables_get_total(self) -> int:
190
"""Total table get operations."""
191
192
def tables_set_total(self) -> int:
193
"""Total table set operations."""
194
195
def tables_del_total(self) -> int:
196
"""Total table delete operations."""
197
198
def commit_latency_avg(self) -> float:
199
"""Average commit latency in seconds."""
200
201
def send_latency_avg(self) -> float:
202
"""Average send latency in seconds."""
203
204
def commit_latency_max(self) -> float:
205
"""Maximum commit latency in seconds."""
206
207
def send_latency_max(self) -> float:
208
"""Maximum send latency in seconds."""
209
210
def assignment_latency_avg(self) -> float:
211
"""Average partition assignment latency."""
212
213
def assignment_error_total(self) -> int:
214
"""Total partition assignment errors."""
215
216
def rebalances_total(self) -> int:
217
"""Total consumer rebalances."""
218
219
def rebalance_return_latency_avg(self) -> float:
220
"""Average rebalance completion latency."""
221
222
def topic_buffer_full_total(self) -> int:
223
"""Total topic buffer full events."""
224
225
@property
226
def max_avg_history(self) -> int:
227
"""Size of averaging window."""
228
229
@property
230
def max_commit_latency_history(self) -> int:
231
"""Commit latency history size."""
232
233
@property
234
def max_send_latency_history(self) -> int:
235
"""Send latency history size."""
236
```
237
238
### Custom Sensors
239
240
Framework for implementing custom sensors tailored to specific monitoring requirements, including application-specific metrics, external system integration, and alerting capabilities.
241
242
```python { .api }
243
class CustomSensor(Sensor):
244
def __init__(self, name: str, **kwargs):
245
"""
246
Base class for custom sensor implementations.
247
248
Args:
249
name: Sensor name for identification
250
**kwargs: Custom configuration
251
"""
252
super().__init__(**kwargs)
253
self.name = name
254
self._metrics = {}
255
256
def record_metric(self, key: str, value: any, *, timestamp: float = None) -> None:
257
"""
258
Record custom metric value.
259
260
Args:
261
key: Metric name
262
value: Metric value
263
timestamp: Optional timestamp (defaults to current time)
264
"""
265
266
def increment_counter(self, key: str, delta: int = 1) -> None:
267
"""
268
Increment counter metric.
269
270
Args:
271
key: Counter name
272
delta: Increment amount
273
"""
274
275
def record_histogram(self, key: str, value: float) -> None:
276
"""
277
Record histogram value.
278
279
Args:
280
key: Histogram name
281
value: Sample value
282
"""
283
284
def set_gauge(self, key: str, value: float) -> None:
285
"""
286
Set gauge metric value.
287
288
Args:
289
key: Gauge name
290
value: Current value
291
"""
292
293
def get_metrics(self) -> dict:
294
"""
295
Get all collected metrics.
296
297
Returns:
298
Dictionary of metrics
299
"""
300
301
@property
302
def name(self) -> str:
303
"""Sensor name."""
304
305
class PrometheusMonitor(Monitor):
306
"""Monitor that exports metrics in Prometheus format."""
307
308
def __init__(self, *, registry=None, namespace='faust', **kwargs):
309
"""
310
Prometheus metrics exporter.
311
312
Args:
313
registry: Prometheus registry (optional)
314
namespace: Metric namespace prefix
315
"""
316
super().__init__(**kwargs)
317
318
def setup_prometheus_metrics(self) -> None:
319
"""Initialize Prometheus metric objects."""
320
321
def export_metrics(self) -> str:
322
"""
323
Export metrics in Prometheus format.
324
325
Returns:
326
Prometheus-formatted metrics string
327
"""
328
329
class StatsDMonitor(Monitor):
330
"""Monitor that sends metrics to StatsD."""
331
332
def __init__(self, *, host='localhost', port=8125, prefix='faust', **kwargs):
333
"""
334
StatsD metrics exporter.
335
336
Args:
337
host: StatsD server host
338
port: StatsD server port
339
prefix: Metric prefix
340
"""
341
super().__init__(**kwargs)
342
343
def send_metric(self, name: str, value: any, metric_type: str) -> None:
344
"""Send metric to StatsD server."""
345
```
346
347
### Logging Integration
348
349
Integration with Python logging system for structured logging, log correlation, and centralized log management with contextual information from stream processing.
350
351
```python { .api }
352
class LoggingSensor(Sensor):
353
def __init__(
354
self,
355
*,
356
logger_name: str = 'faust.sensor',
357
level: int = logging.INFO,
358
format_string: str = None,
359
**kwargs
360
):
361
"""
362
Sensor that logs events to Python logging system.
363
364
Args:
365
logger_name: Logger name
366
level: Default logging level
367
format_string: Custom log format
368
"""
369
370
def log_message_in(self, tp: str, offset: int, message: any) -> None:
371
"""Log incoming message details."""
372
373
def log_message_out(self, tp: str, offset: int, message: any) -> None:
374
"""Log outgoing message details."""
375
376
def log_table_operation(self, operation: str, table: any, key: any, value: any = None) -> None:
377
"""Log table operations with context."""
378
379
def log_performance_metric(self, metric_name: str, value: float, context: dict = None) -> None:
380
"""Log performance metrics with context."""
381
382
def configure_sensor_logging(
383
app: App,
384
*,
385
level: int = logging.INFO,
386
format_string: str = None,
387
include_tracing: bool = True
388
) -> LoggingSensor:
389
"""
390
Configure logging sensor for application.
391
392
Args:
393
app: Faust application
394
level: Logging level
395
format_string: Log format
396
include_tracing: Include trace information
397
398
Returns:
399
Configured logging sensor
400
"""
401
```
402
403
### Health Monitoring
404
405
Health check and application status monitoring with automatic detection of unhealthy conditions, consumer lag monitoring, and system resource tracking.
406
407
```python { .api }
408
class HealthMonitor(Monitor):
409
def __init__(
410
self,
411
*,
412
lag_threshold: float = 1000,
413
error_rate_threshold: float = 0.1,
414
check_interval: float = 30.0,
415
**kwargs
416
):
417
"""
418
Health monitoring sensor with automatic alerting.
419
420
Args:
421
lag_threshold: Consumer lag threshold for alerts
422
error_rate_threshold: Error rate threshold (0.0-1.0)
423
check_interval: Health check interval in seconds
424
"""
425
426
def check_consumer_lag(self) -> dict:
427
"""
428
Check consumer lag across all partitions.
429
430
Returns:
431
Dictionary of partition lag information
432
"""
433
434
def check_error_rate(self) -> float:
435
"""
436
Calculate recent error rate.
437
438
Returns:
439
Error rate as percentage (0.0-1.0)
440
"""
441
442
def check_memory_usage(self) -> dict:
443
"""
444
Check application memory usage.
445
446
Returns:
447
Memory usage statistics
448
"""
449
450
def is_healthy(self) -> bool:
451
"""
452
Overall health status check.
453
454
Returns:
455
True if application is healthy
456
"""
457
458
def get_health_report(self) -> dict:
459
"""
460
Generate comprehensive health report.
461
462
Returns:
463
Dictionary with health metrics and status
464
"""
465
466
def alert_on_unhealthy_condition(self, condition: str, details: dict) -> None:
467
"""
468
Trigger alert for unhealthy condition.
469
470
Args:
471
condition: Condition type (lag, errors, memory, etc.)
472
details: Condition details for alert
473
"""
474
```
475
476
### Sensor Management
477
478
Utilities for managing multiple sensors, sensor registration, and coordinated metrics collection across different monitoring systems.
479
480
```python { .api }
481
class SensorDelegate:
482
def __init__(self, *sensors: Sensor):
483
"""
484
Delegate sensor that forwards events to multiple sensors.
485
486
Args:
487
*sensors: Sensor instances to delegate to
488
"""
489
490
def add_sensor(self, sensor: Sensor) -> None:
491
"""
492
Add sensor to delegation list.
493
494
Args:
495
sensor: Sensor instance to add
496
"""
497
498
def remove_sensor(self, sensor: Sensor) -> None:
499
"""
500
Remove sensor from delegation list.
501
502
Args:
503
sensor: Sensor instance to remove
504
"""
505
506
def get_combined_metrics(self) -> dict:
507
"""
508
Get combined metrics from all sensors.
509
510
Returns:
511
Merged metrics dictionary
512
"""
513
514
def setup_monitoring(
515
app: App,
516
*,
517
prometheus: bool = False,
518
statsd: bool = False,
519
logging: bool = True,
520
health_checks: bool = True,
521
custom_sensors: list = None
522
) -> SensorDelegate:
523
"""
524
Setup comprehensive monitoring for application.
525
526
Args:
527
app: Faust application
528
prometheus: Enable Prometheus metrics
529
statsd: Enable StatsD metrics
530
logging: Enable logging sensor
531
health_checks: Enable health monitoring
532
custom_sensors: Additional custom sensors
533
534
Returns:
535
Configured sensor delegate
536
"""
537
```
538
539
## Usage Examples
540
541
### Basic Monitoring Setup
542
543
```python
544
import faust
545
546
app = faust.App('monitored-app', broker='kafka://localhost:9092')
547
548
# Enable built-in monitoring
549
monitor = faust.Monitor()
550
app.monitor = monitor
551
552
events_topic = app.topic('events', value_type=dict)
553
554
@app.agent(events_topic)
555
async def process_events(events):
556
async for event in events:
557
# Processing logic here
558
print(f"Processing event: {event}")
559
560
@app.timer(interval=30.0)
561
async def print_metrics():
562
"""Print monitoring metrics every 30 seconds."""
563
print(f"Messages received: {monitor.messages_received_total()}")
564
print(f"Events per second: {monitor.events_per_second():.2f}")
565
print(f"Table operations: {monitor.tables_get_total()}")
566
print(f"Commit latency: {monitor.commit_latency_avg():.3f}s")
567
```
568
569
### Custom Sensor Implementation
570
571
```python
572
import time
573
from faust import Sensor
574
575
class BusinessMetricsSensor(Sensor):
576
def __init__(self):
577
super().__init__()
578
self.order_count = 0
579
self.revenue_total = 0.0
580
self.processing_times = []
581
self.start_time = time.time()
582
583
def on_stream_event_in(self, tp, offset, stream, event):
584
# Track event processing start
585
event._processing_start = time.time()
586
587
def on_stream_event_out(self, tp, offset, stream, event):
588
# Calculate processing time
589
if hasattr(event, '_processing_start'):
590
processing_time = time.time() - event._processing_start
591
self.processing_times.append(processing_time)
592
593
# Keep only recent processing times
594
if len(self.processing_times) > 1000:
595
self.processing_times = self.processing_times[-1000:]
596
597
def record_order(self, amount: float):
598
"""Record business order metrics."""
599
self.order_count += 1
600
self.revenue_total += amount
601
602
def get_business_metrics(self):
603
"""Get business-specific metrics."""
604
uptime = time.time() - self.start_time
605
avg_processing_time = (
606
sum(self.processing_times) / len(self.processing_times)
607
if self.processing_times else 0
608
)
609
610
return {
611
'orders_total': self.order_count,
612
'revenue_total': self.revenue_total,
613
'orders_per_hour': self.order_count / (uptime / 3600),
614
'avg_processing_time': avg_processing_time,
615
'uptime_seconds': uptime
616
}
617
618
# Use custom sensor
619
business_sensor = BusinessMetricsSensor()
620
app.monitor = business_sensor
621
622
@app.agent()
623
async def process_orders(orders):
624
async for order in orders:
625
# Process order
626
amount = order['amount']
627
business_sensor.record_order(amount)
628
```
629
630
### Prometheus Integration
631
632
```python
633
import faust
634
from prometheus_client import Counter, Histogram, Gauge, generate_latest
635
636
class PrometheusMonitor(faust.Monitor):
637
def __init__(self):
638
super().__init__()
639
640
# Prometheus metrics
641
self.messages_received = Counter('faust_messages_received_total', 'Total received messages')
642
self.messages_sent = Counter('faust_messages_sent_total', 'Total sent messages')
643
self.processing_duration = Histogram('faust_processing_duration_seconds', 'Processing duration')
644
self.consumer_lag = Gauge('faust_consumer_lag', 'Consumer lag', ['topic', 'partition'])
645
646
def on_message_in(self, tp, offset, message):
647
super().on_message_in(tp, offset, message)
648
self.messages_received.inc()
649
650
def on_message_out(self, tp, offset, message):
651
super().on_message_out(tp, offset, message)
652
self.messages_sent.inc()
653
654
def on_stream_event_in(self, tp, offset, stream, event):
655
super().on_stream_event_in(tp, offset, stream, event)
656
event._start_time = time.time()
657
658
def on_stream_event_out(self, tp, offset, stream, event):
659
super().on_stream_event_out(tp, offset, stream, event)
660
if hasattr(event, '_start_time'):
661
duration = time.time() - event._start_time
662
self.processing_duration.observe(duration)
663
664
def export_metrics(self):
665
"""Export metrics in Prometheus format."""
666
return generate_latest()
667
668
# Setup Prometheus endpoint
669
prometheus_monitor = PrometheusMonitor()
670
app.monitor = prometheus_monitor
671
672
@app.page('/metrics')
673
async def metrics_endpoint(web, request):
674
"""Expose Prometheus metrics endpoint."""
675
return web.Response(
676
text=prometheus_monitor.export_metrics(),
677
content_type='text/plain'
678
)
679
```
680
681
### Health Monitoring
682
683
```python
684
import psutil
685
from faust import Monitor
686
687
class HealthMonitor(Monitor):
688
def __init__(self):
689
super().__init__()
690
self.error_count = 0
691
self.last_health_check = time.time()
692
self.health_status = {'healthy': True, 'issues': []}
693
694
def on_send_error(self, producer, exc, state):
695
super().on_send_error(producer, exc, state)
696
self.error_count += 1
697
698
def check_health(self):
699
"""Comprehensive health check."""
700
issues = []
701
702
# Check error rate
703
if self.error_count > 10: # More than 10 errors
704
issues.append(f"High error count: {self.error_count}")
705
706
# Check memory usage
707
memory_percent = psutil.virtual_memory().percent
708
if memory_percent > 90:
709
issues.append(f"High memory usage: {memory_percent}%")
710
711
# Check processing lag
712
events_per_sec = self.events_per_second()
713
if events_per_sec == 0: # No events being processed
714
issues.append("No events being processed")
715
716
# Update health status
717
self.health_status = {
718
'healthy': len(issues) == 0,
719
'issues': issues,
720
'timestamp': time.time(),
721
'uptime': time.time() - self.last_health_check,
722
'memory_usage': memory_percent,
723
'events_per_second': events_per_sec
724
}
725
726
return self.health_status
727
728
app.monitor = HealthMonitor()
729
730
@app.timer(interval=60.0)
731
async def health_check():
732
"""Periodic health monitoring."""
733
health = app.monitor.check_health()
734
735
if not health['healthy']:
736
print(f"HEALTH WARNING: {health['issues']}")
737
738
# Could send alerts to external systems here
739
# await send_alert_to_slack(health)
740
# await send_alert_to_pagerduty(health)
741
742
@app.page('/health')
743
async def health_endpoint(web, request):
744
"""Health check endpoint."""
745
health = app.monitor.check_health()
746
status_code = 200 if health['healthy'] else 503
747
748
return web.json_response(health, status=status_code)
749
```
750
751
### Multi-Sensor Setup
752
753
```python
754
from faust import SensorDelegate
755
756
# Create multiple specialized sensors
757
prometheus_sensor = PrometheusMonitor()
758
business_sensor = BusinessMetricsSensor()
759
health_sensor = HealthMonitor()
760
logging_sensor = faust.LoggingSensor()
761
762
# Combine all sensors
763
multi_sensor = SensorDelegate(
764
prometheus_sensor,
765
business_sensor,
766
health_sensor,
767
logging_sensor
768
)
769
770
app.monitor = multi_sensor
771
772
@app.timer(interval=300.0) # Every 5 minutes
773
async def comprehensive_monitoring():
774
"""Comprehensive monitoring report."""
775
776
# Get metrics from all sensors
777
prometheus_metrics = prometheus_sensor.export_metrics()
778
business_metrics = business_sensor.get_business_metrics()
779
health_status = health_sensor.check_health()
780
781
print("=== Monitoring Report ===")
782
print(f"Business: {business_metrics}")
783
print(f"Health: {health_status}")
784
785
# Could send to monitoring dashboard
786
# await send_to_monitoring_dashboard({
787
# 'business': business_metrics,
788
# 'health': health_status,
789
# 'timestamp': time.time()
790
# })
791
```
792
793
## Type Interfaces
794
795
```python { .api }
796
from typing import Protocol, Dict, Any, Optional
797
798
class SensorT(Protocol):
799
"""Type interface for Sensor."""
800
801
def on_message_in(self, tp: str, offset: int, message: Any) -> None: ...
802
def on_message_out(self, tp: str, offset: int, message: Any) -> None: ...
803
def on_table_get(self, table: Any, key: Any) -> None: ...
804
def on_table_set(self, table: Any, key: Any, value: Any) -> None: ...
805
def on_commit_completed(self, consumer: Any, state: Dict) -> None: ...
806
def asdict(self) -> Dict[str, Any]: ...
807
808
class MonitorT(SensorT, Protocol):
809
"""Type interface for Monitor."""
810
811
def messages_received_total(self) -> int: ...
812
def messages_sent_total(self) -> int: ...
813
def events_per_second(self) -> float: ...
814
def commit_latency_avg(self) -> float: ...
815
```