or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdcli-framework.mdcore-application.mddata-management.mdindex.mdmonitoring.mdserialization.mdstream-processing.mdtopics-channels.mdwindowing.mdworker-management.md

monitoring.mddocs/

0

# Monitoring and Sensors

1

2

Monitoring, metrics collection, and sensor framework for observability in Faust applications. Provides comprehensive instrumentation for tracking message flow, table operations, consumer lag, performance metrics, and custom application-specific measurements.

3

4

## Capabilities

5

6

### Sensor Interface

7

8

Base sensor interface for collecting metrics and monitoring events throughout the Faust application lifecycle. Sensors receive callbacks for various system events and can implement custom monitoring logic.

9

10

```python { .api }

11

class Sensor:

12

def __init__(self, **kwargs):

13

"""

14

Base sensor implementation for collecting metrics.

15

16

Args:

17

**kwargs: Sensor-specific configuration

18

"""

19

20

def on_message_in(self, tp: str, offset: int, message: any) -> None:

21

"""

22

Called when message is received from broker.

23

24

Args:

25

tp: Topic partition identifier

26

offset: Message offset

27

message: Raw message object

28

"""

29

30

def on_message_out(self, tp: str, offset: int, message: any) -> None:

31

"""

32

Called when message is sent to broker.

33

34

Args:

35

tp: Topic partition identifier

36

offset: Message offset

37

message: Raw message object

38

"""

39

40

def on_stream_event_in(self, tp: str, offset: int, stream: any, event: any) -> None:

41

"""

42

Called when event enters stream processing.

43

44

Args:

45

tp: Topic partition

46

offset: Message offset

47

stream: Stream instance

48

event: Stream event

49

"""

50

51

def on_stream_event_out(self, tp: str, offset: int, stream: any, event: any) -> None:

52

"""

53

Called when event exits stream processing.

54

55

Args:

56

tp: Topic partition

57

offset: Message offset

58

stream: Stream instance

59

event: Stream event

60

"""

61

62

def on_table_get(self, table: any, key: any) -> None:

63

"""

64

Called when table key is accessed.

65

66

Args:

67

table: Table instance

68

key: Accessed key

69

"""

70

71

def on_table_set(self, table: any, key: any, value: any) -> None:

72

"""

73

Called when table key is modified.

74

75

Args:

76

table: Table instance

77

key: Modified key

78

value: New value

79

"""

80

81

def on_table_del(self, table: any, key: any) -> None:

82

"""

83

Called when table key is deleted.

84

85

Args:

86

table: Table instance

87

key: Deleted key

88

"""

89

90

def on_commit_initiated(self, consumer: any) -> None:

91

"""

92

Called when consumer commit starts.

93

94

Args:

95

consumer: Consumer instance

96

"""

97

98

def on_commit_completed(self, consumer: any, state: dict) -> None:

99

"""

100

Called when consumer commit completes.

101

102

Args:

103

consumer: Consumer instance

104

state: Commit state information

105

"""

106

107

def on_send_initiated(self, producer: any, topic: str, message: any) -> None:

108

"""

109

Called when producer send starts.

110

111

Args:

112

producer: Producer instance

113

topic: Target topic

114

message: Message being sent

115

"""

116

117

def on_send_completed(self, producer: any, state: dict) -> None:

118

"""

119

Called when producer send completes.

120

121

Args:

122

producer: Producer instance

123

state: Send completion state

124

"""

125

126

def on_send_error(self, producer: any, exc: Exception, state: dict) -> None:

127

"""

128

Called when producer send fails.

129

130

Args:

131

producer: Producer instance

132

exc: Exception that occurred

133

state: Send error state

134

"""

135

136

def asdict(self) -> dict:

137

"""

138

Return sensor metrics as dictionary.

139

140

Returns:

141

Dictionary of collected metrics

142

"""

143

```

144

145

### Monitor Implementation

146

147

Comprehensive monitoring implementation that extends the sensor interface with built-in metrics collection, performance tracking, and health monitoring capabilities.

148

149

```python { .api }

150

class Monitor(Sensor):

151

def __init__(

152

self,

153

*,

154

max_avg_history: int = 100,

155

max_commit_latency_history: int = 30,

156

max_send_latency_history: int = 30,

157

**kwargs

158

):

159

"""

160

Enhanced sensor with built-in metrics collection.

161

162

Args:

163

max_avg_history: Size of averaging window for metrics

164

max_commit_latency_history: History size for commit latency

165

max_send_latency_history: History size for send latency

166

"""

167

168

def messages_received_total(self) -> int:

169

"""Total number of messages received."""

170

171

def messages_sent_total(self) -> int:

172

"""Total number of messages sent."""

173

174

def messages_received_per_second(self) -> float:

175

"""Messages received per second (recent average)."""

176

177

def messages_sent_per_second(self) -> float:

178

"""Messages sent per second (recent average)."""

179

180

def events_total(self) -> int:

181

"""Total number of stream events processed."""

182

183

def events_per_second(self) -> float:

184

"""Stream events per second (recent average)."""

185

186

def tables_contains_total(self) -> int:

187

"""Total table key lookups."""

188

189

def tables_get_total(self) -> int:

190

"""Total table get operations."""

191

192

def tables_set_total(self) -> int:

193

"""Total table set operations."""

194

195

def tables_del_total(self) -> int:

196

"""Total table delete operations."""

197

198

def commit_latency_avg(self) -> float:

199

"""Average commit latency in seconds."""

200

201

def send_latency_avg(self) -> float:

202

"""Average send latency in seconds."""

203

204

def commit_latency_max(self) -> float:

205

"""Maximum commit latency in seconds."""

206

207

def send_latency_max(self) -> float:

208

"""Maximum send latency in seconds."""

209

210

def assignment_latency_avg(self) -> float:

211

"""Average partition assignment latency."""

212

213

def assignment_error_total(self) -> int:

214

"""Total partition assignment errors."""

215

216

def rebalances_total(self) -> int:

217

"""Total consumer rebalances."""

218

219

def rebalance_return_latency_avg(self) -> float:

220

"""Average rebalance completion latency."""

221

222

def topic_buffer_full_total(self) -> int:

223

"""Total topic buffer full events."""

224

225

@property

226

def max_avg_history(self) -> int:

227

"""Size of averaging window."""

228

229

@property

230

def max_commit_latency_history(self) -> int:

231

"""Commit latency history size."""

232

233

@property

234

def max_send_latency_history(self) -> int:

235

"""Send latency history size."""

236

```

237

238

### Custom Sensors

239

240

Framework for implementing custom sensors tailored to specific monitoring requirements, including application-specific metrics, external system integration, and alerting capabilities.

241

242

```python { .api }

243

class CustomSensor(Sensor):

244

def __init__(self, name: str, **kwargs):

245

"""

246

Base class for custom sensor implementations.

247

248

Args:

249

name: Sensor name for identification

250

**kwargs: Custom configuration

251

"""

252

super().__init__(**kwargs)

253

self.name = name

254

self._metrics = {}

255

256

def record_metric(self, key: str, value: any, *, timestamp: float = None) -> None:

257

"""

258

Record custom metric value.

259

260

Args:

261

key: Metric name

262

value: Metric value

263

timestamp: Optional timestamp (defaults to current time)

264

"""

265

266

def increment_counter(self, key: str, delta: int = 1) -> None:

267

"""

268

Increment counter metric.

269

270

Args:

271

key: Counter name

272

delta: Increment amount

273

"""

274

275

def record_histogram(self, key: str, value: float) -> None:

276

"""

277

Record histogram value.

278

279

Args:

280

key: Histogram name

281

value: Sample value

282

"""

283

284

def set_gauge(self, key: str, value: float) -> None:

285

"""

286

Set gauge metric value.

287

288

Args:

289

key: Gauge name

290

value: Current value

291

"""

292

293

def get_metrics(self) -> dict:

294

"""

295

Get all collected metrics.

296

297

Returns:

298

Dictionary of metrics

299

"""

300

301

@property

302

def name(self) -> str:

303

"""Sensor name."""

304

305

class PrometheusMonitor(Monitor):

306

"""Monitor that exports metrics in Prometheus format."""

307

308

def __init__(self, *, registry=None, namespace='faust', **kwargs):

309

"""

310

Prometheus metrics exporter.

311

312

Args:

313

registry: Prometheus registry (optional)

314

namespace: Metric namespace prefix

315

"""

316

super().__init__(**kwargs)

317

318

def setup_prometheus_metrics(self) -> None:

319

"""Initialize Prometheus metric objects."""

320

321

def export_metrics(self) -> str:

322

"""

323

Export metrics in Prometheus format.

324

325

Returns:

326

Prometheus-formatted metrics string

327

"""

328

329

class StatsDMonitor(Monitor):

330

"""Monitor that sends metrics to StatsD."""

331

332

def __init__(self, *, host='localhost', port=8125, prefix='faust', **kwargs):

333

"""

334

StatsD metrics exporter.

335

336

Args:

337

host: StatsD server host

338

port: StatsD server port

339

prefix: Metric prefix

340

"""

341

super().__init__(**kwargs)

342

343

def send_metric(self, name: str, value: any, metric_type: str) -> None:

344

"""Send metric to StatsD server."""

345

```

346

347

### Logging Integration

348

349

Integration with Python logging system for structured logging, log correlation, and centralized log management with contextual information from stream processing.

350

351

```python { .api }

352

class LoggingSensor(Sensor):

353

def __init__(

354

self,

355

*,

356

logger_name: str = 'faust.sensor',

357

level: int = logging.INFO,

358

format_string: str = None,

359

**kwargs

360

):

361

"""

362

Sensor that logs events to Python logging system.

363

364

Args:

365

logger_name: Logger name

366

level: Default logging level

367

format_string: Custom log format

368

"""

369

370

def log_message_in(self, tp: str, offset: int, message: any) -> None:

371

"""Log incoming message details."""

372

373

def log_message_out(self, tp: str, offset: int, message: any) -> None:

374

"""Log outgoing message details."""

375

376

def log_table_operation(self, operation: str, table: any, key: any, value: any = None) -> None:

377

"""Log table operations with context."""

378

379

def log_performance_metric(self, metric_name: str, value: float, context: dict = None) -> None:

380

"""Log performance metrics with context."""

381

382

def configure_sensor_logging(

383

app: App,

384

*,

385

level: int = logging.INFO,

386

format_string: str = None,

387

include_tracing: bool = True

388

) -> LoggingSensor:

389

"""

390

Configure logging sensor for application.

391

392

Args:

393

app: Faust application

394

level: Logging level

395

format_string: Log format

396

include_tracing: Include trace information

397

398

Returns:

399

Configured logging sensor

400

"""

401

```

402

403

### Health Monitoring

404

405

Health check and application status monitoring with automatic detection of unhealthy conditions, consumer lag monitoring, and system resource tracking.

406

407

```python { .api }

408

class HealthMonitor(Monitor):

409

def __init__(

410

self,

411

*,

412

lag_threshold: float = 1000,

413

error_rate_threshold: float = 0.1,

414

check_interval: float = 30.0,

415

**kwargs

416

):

417

"""

418

Health monitoring sensor with automatic alerting.

419

420

Args:

421

lag_threshold: Consumer lag threshold for alerts

422

error_rate_threshold: Error rate threshold (0.0-1.0)

423

check_interval: Health check interval in seconds

424

"""

425

426

def check_consumer_lag(self) -> dict:

427

"""

428

Check consumer lag across all partitions.

429

430

Returns:

431

Dictionary of partition lag information

432

"""

433

434

def check_error_rate(self) -> float:

435

"""

436

Calculate recent error rate.

437

438

Returns:

439

Error rate as percentage (0.0-1.0)

440

"""

441

442

def check_memory_usage(self) -> dict:

443

"""

444

Check application memory usage.

445

446

Returns:

447

Memory usage statistics

448

"""

449

450

def is_healthy(self) -> bool:

451

"""

452

Overall health status check.

453

454

Returns:

455

True if application is healthy

456

"""

457

458

def get_health_report(self) -> dict:

459

"""

460

Generate comprehensive health report.

461

462

Returns:

463

Dictionary with health metrics and status

464

"""

465

466

def alert_on_unhealthy_condition(self, condition: str, details: dict) -> None:

467

"""

468

Trigger alert for unhealthy condition.

469

470

Args:

471

condition: Condition type (lag, errors, memory, etc.)

472

details: Condition details for alert

473

"""

474

```

475

476

### Sensor Management

477

478

Utilities for managing multiple sensors, sensor registration, and coordinated metrics collection across different monitoring systems.

479

480

```python { .api }

481

class SensorDelegate:

482

def __init__(self, *sensors: Sensor):

483

"""

484

Delegate sensor that forwards events to multiple sensors.

485

486

Args:

487

*sensors: Sensor instances to delegate to

488

"""

489

490

def add_sensor(self, sensor: Sensor) -> None:

491

"""

492

Add sensor to delegation list.

493

494

Args:

495

sensor: Sensor instance to add

496

"""

497

498

def remove_sensor(self, sensor: Sensor) -> None:

499

"""

500

Remove sensor from delegation list.

501

502

Args:

503

sensor: Sensor instance to remove

504

"""

505

506

def get_combined_metrics(self) -> dict:

507

"""

508

Get combined metrics from all sensors.

509

510

Returns:

511

Merged metrics dictionary

512

"""

513

514

def setup_monitoring(

515

app: App,

516

*,

517

prometheus: bool = False,

518

statsd: bool = False,

519

logging: bool = True,

520

health_checks: bool = True,

521

custom_sensors: list = None

522

) -> SensorDelegate:

523

"""

524

Setup comprehensive monitoring for application.

525

526

Args:

527

app: Faust application

528

prometheus: Enable Prometheus metrics

529

statsd: Enable StatsD metrics

530

logging: Enable logging sensor

531

health_checks: Enable health monitoring

532

custom_sensors: Additional custom sensors

533

534

Returns:

535

Configured sensor delegate

536

"""

537

```

538

539

## Usage Examples

540

541

### Basic Monitoring Setup

542

543

```python

544

import faust

545

546

app = faust.App('monitored-app', broker='kafka://localhost:9092')

547

548

# Enable built-in monitoring

549

monitor = faust.Monitor()

550

app.monitor = monitor

551

552

events_topic = app.topic('events', value_type=dict)

553

554

@app.agent(events_topic)

555

async def process_events(events):

556

async for event in events:

557

# Processing logic here

558

print(f"Processing event: {event}")

559

560

@app.timer(interval=30.0)

561

async def print_metrics():

562

"""Print monitoring metrics every 30 seconds."""

563

print(f"Messages received: {monitor.messages_received_total()}")

564

print(f"Events per second: {monitor.events_per_second():.2f}")

565

print(f"Table operations: {monitor.tables_get_total()}")

566

print(f"Commit latency: {monitor.commit_latency_avg():.3f}s")

567

```

568

569

### Custom Sensor Implementation

570

571

```python

572

import time

573

from faust import Sensor

574

575

class BusinessMetricsSensor(Sensor):

576

def __init__(self):

577

super().__init__()

578

self.order_count = 0

579

self.revenue_total = 0.0

580

self.processing_times = []

581

self.start_time = time.time()

582

583

def on_stream_event_in(self, tp, offset, stream, event):

584

# Track event processing start

585

event._processing_start = time.time()

586

587

def on_stream_event_out(self, tp, offset, stream, event):

588

# Calculate processing time

589

if hasattr(event, '_processing_start'):

590

processing_time = time.time() - event._processing_start

591

self.processing_times.append(processing_time)

592

593

# Keep only recent processing times

594

if len(self.processing_times) > 1000:

595

self.processing_times = self.processing_times[-1000:]

596

597

def record_order(self, amount: float):

598

"""Record business order metrics."""

599

self.order_count += 1

600

self.revenue_total += amount

601

602

def get_business_metrics(self):

603

"""Get business-specific metrics."""

604

uptime = time.time() - self.start_time

605

avg_processing_time = (

606

sum(self.processing_times) / len(self.processing_times)

607

if self.processing_times else 0

608

)

609

610

return {

611

'orders_total': self.order_count,

612

'revenue_total': self.revenue_total,

613

'orders_per_hour': self.order_count / (uptime / 3600),

614

'avg_processing_time': avg_processing_time,

615

'uptime_seconds': uptime

616

}

617

618

# Use custom sensor

619

business_sensor = BusinessMetricsSensor()

620

app.monitor = business_sensor

621

622

@app.agent()

623

async def process_orders(orders):

624

async for order in orders:

625

# Process order

626

amount = order['amount']

627

business_sensor.record_order(amount)

628

```

629

630

### Prometheus Integration

631

632

```python

633

import faust

634

from prometheus_client import Counter, Histogram, Gauge, generate_latest

635

636

class PrometheusMonitor(faust.Monitor):

637

def __init__(self):

638

super().__init__()

639

640

# Prometheus metrics

641

self.messages_received = Counter('faust_messages_received_total', 'Total received messages')

642

self.messages_sent = Counter('faust_messages_sent_total', 'Total sent messages')

643

self.processing_duration = Histogram('faust_processing_duration_seconds', 'Processing duration')

644

self.consumer_lag = Gauge('faust_consumer_lag', 'Consumer lag', ['topic', 'partition'])

645

646

def on_message_in(self, tp, offset, message):

647

super().on_message_in(tp, offset, message)

648

self.messages_received.inc()

649

650

def on_message_out(self, tp, offset, message):

651

super().on_message_out(tp, offset, message)

652

self.messages_sent.inc()

653

654

def on_stream_event_in(self, tp, offset, stream, event):

655

super().on_stream_event_in(tp, offset, stream, event)

656

event._start_time = time.time()

657

658

def on_stream_event_out(self, tp, offset, stream, event):

659

super().on_stream_event_out(tp, offset, stream, event)

660

if hasattr(event, '_start_time'):

661

duration = time.time() - event._start_time

662

self.processing_duration.observe(duration)

663

664

def export_metrics(self):

665

"""Export metrics in Prometheus format."""

666

return generate_latest()

667

668

# Setup Prometheus endpoint

669

prometheus_monitor = PrometheusMonitor()

670

app.monitor = prometheus_monitor

671

672

@app.page('/metrics')

673

async def metrics_endpoint(web, request):

674

"""Expose Prometheus metrics endpoint."""

675

return web.Response(

676

text=prometheus_monitor.export_metrics(),

677

content_type='text/plain'

678

)

679

```

680

681

### Health Monitoring

682

683

```python

684

import psutil

685

from faust import Monitor

686

687

class HealthMonitor(Monitor):

688

def __init__(self):

689

super().__init__()

690

self.error_count = 0

691

self.last_health_check = time.time()

692

self.health_status = {'healthy': True, 'issues': []}

693

694

def on_send_error(self, producer, exc, state):

695

super().on_send_error(producer, exc, state)

696

self.error_count += 1

697

698

def check_health(self):

699

"""Comprehensive health check."""

700

issues = []

701

702

# Check error rate

703

if self.error_count > 10: # More than 10 errors

704

issues.append(f"High error count: {self.error_count}")

705

706

# Check memory usage

707

memory_percent = psutil.virtual_memory().percent

708

if memory_percent > 90:

709

issues.append(f"High memory usage: {memory_percent}%")

710

711

# Check processing lag

712

events_per_sec = self.events_per_second()

713

if events_per_sec == 0: # No events being processed

714

issues.append("No events being processed")

715

716

# Update health status

717

self.health_status = {

718

'healthy': len(issues) == 0,

719

'issues': issues,

720

'timestamp': time.time(),

721

'uptime': time.time() - self.last_health_check,

722

'memory_usage': memory_percent,

723

'events_per_second': events_per_sec

724

}

725

726

return self.health_status

727

728

app.monitor = HealthMonitor()

729

730

@app.timer(interval=60.0)

731

async def health_check():

732

"""Periodic health monitoring."""

733

health = app.monitor.check_health()

734

735

if not health['healthy']:

736

print(f"HEALTH WARNING: {health['issues']}")

737

738

# Could send alerts to external systems here

739

# await send_alert_to_slack(health)

740

# await send_alert_to_pagerduty(health)

741

742

@app.page('/health')

743

async def health_endpoint(web, request):

744

"""Health check endpoint."""

745

health = app.monitor.check_health()

746

status_code = 200 if health['healthy'] else 503

747

748

return web.json_response(health, status=status_code)

749

```

750

751

### Multi-Sensor Setup

752

753

```python

754

from faust import SensorDelegate

755

756

# Create multiple specialized sensors

757

prometheus_sensor = PrometheusMonitor()

758

business_sensor = BusinessMetricsSensor()

759

health_sensor = HealthMonitor()

760

logging_sensor = faust.LoggingSensor()

761

762

# Combine all sensors

763

multi_sensor = SensorDelegate(

764

prometheus_sensor,

765

business_sensor,

766

health_sensor,

767

logging_sensor

768

)

769

770

app.monitor = multi_sensor

771

772

@app.timer(interval=300.0) # Every 5 minutes

773

async def comprehensive_monitoring():

774

"""Comprehensive monitoring report."""

775

776

# Get metrics from all sensors

777

prometheus_metrics = prometheus_sensor.export_metrics()

778

business_metrics = business_sensor.get_business_metrics()

779

health_status = health_sensor.check_health()

780

781

print("=== Monitoring Report ===")

782

print(f"Business: {business_metrics}")

783

print(f"Health: {health_status}")

784

785

# Could send to monitoring dashboard

786

# await send_to_monitoring_dashboard({

787

# 'business': business_metrics,

788

# 'health': health_status,

789

# 'timestamp': time.time()

790

# })

791

```

792

793

## Type Interfaces

794

795

```python { .api }

796

from typing import Protocol, Dict, Any, Optional

797

798

class SensorT(Protocol):

799

"""Type interface for Sensor."""

800

801

def on_message_in(self, tp: str, offset: int, message: Any) -> None: ...

802

def on_message_out(self, tp: str, offset: int, message: Any) -> None: ...

803

def on_table_get(self, table: Any, key: Any) -> None: ...

804

def on_table_set(self, table: Any, key: Any, value: Any) -> None: ...

805

def on_commit_completed(self, consumer: Any, state: Dict) -> None: ...

806

def asdict(self) -> Dict[str, Any]: ...

807

808

class MonitorT(SensorT, Protocol):

809

"""Type interface for Monitor."""

810

811

def messages_received_total(self) -> int: ...

812

def messages_sent_total(self) -> int: ...

813

def events_per_second(self) -> float: ...

814

def commit_latency_avg(self) -> float: ...

815

```