0
# Advanced Features
1
2
Advanced functionality for building custom metric collectors, multiprocess support, and specialized integrations including metric families, sample types, and bridge components. These features enable sophisticated monitoring scenarios and integration with complex deployment architectures.
3
4
## Capabilities
5
6
### Custom Metric Collectors
7
8
Building custom collectors that generate metrics dynamically, integrate with external systems, or provide specialized monitoring capabilities.
9
10
```python { .api }
11
class Collector:
12
def collect(self) -> Iterable[Metric]:
13
"""
14
Collect and return metrics.
15
16
This abstract method must be implemented by all collectors.
17
18
Returns:
19
Iterable of Metric objects
20
"""
21
22
def describe(self) -> Iterable[Metric]:
23
"""
24
Describe the metrics that this collector will provide.
25
26
Default implementation returns collect() results.
27
Override for better performance if metrics are expensive to collect.
28
29
Returns:
30
Iterable of Metric objects (without samples)
31
"""
32
```
33
34
**Usage Example:**
35
36
```python
37
from prometheus_client import Collector, Metric, CollectorRegistry, start_http_server
38
import psutil
39
import json
40
import requests
41
import time
42
43
class SystemResourceCollector(Collector):
44
"""Comprehensive system resource collector."""
45
46
def collect(self):
47
# CPU metrics
48
cpu_metric = Metric('system_cpu_usage_percent', 'CPU usage by core', 'gauge')
49
for i, percent in enumerate(psutil.cpu_percent(percpu=True)):
50
cpu_metric.add_sample('system_cpu_usage_percent', {'core': str(i)}, percent)
51
yield cpu_metric
52
53
# Memory metrics
54
memory = psutil.virtual_memory()
55
memory_metric = Metric('system_memory_bytes', 'System memory usage', 'gauge')
56
memory_metric.add_sample('system_memory_bytes', {'type': 'total'}, memory.total)
57
memory_metric.add_sample('system_memory_bytes', {'type': 'available'}, memory.available)
58
memory_metric.add_sample('system_memory_bytes', {'type': 'used'}, memory.used)
59
memory_metric.add_sample('system_memory_bytes', {'type': 'cached'}, memory.cached)
60
yield memory_metric
61
62
# Disk I/O
63
disk_io = psutil.disk_io_counters()
64
if disk_io:
65
io_metric = Metric('system_disk_io_bytes_total', 'Disk I/O bytes', 'counter')
66
io_metric.add_sample('system_disk_io_bytes_total', {'direction': 'read'}, disk_io.read_bytes)
67
io_metric.add_sample('system_disk_io_bytes_total', {'direction': 'write'}, disk_io.write_bytes)
68
yield io_metric
69
70
class APIHealthCollector(Collector):
71
"""Collector that monitors external API health."""
72
73
def __init__(self, services):
74
self.services = services # Dict of service_name -> url
75
76
def collect(self):
77
health_metric = Metric('external_service_up', 'External service availability', 'gauge')
78
response_time_metric = Metric('external_service_response_time_seconds', 'Response time', 'gauge')
79
80
for service_name, url in self.services.items():
81
try:
82
start_time = time.time()
83
response = requests.get(f"{url}/health", timeout=5)
84
response_time = time.time() - start_time
85
86
# Service is up if we get any response
87
health_metric.add_sample('external_service_up', {'service': service_name}, 1.0)
88
response_time_metric.add_sample(
89
'external_service_response_time_seconds',
90
{'service': service_name},
91
response_time
92
)
93
94
except (requests.RequestException, requests.Timeout):
95
# Service is down
96
health_metric.add_sample('external_service_up', {'service': service_name}, 0.0)
97
98
yield health_metric
99
yield response_time_metric
100
101
class DatabaseCollector(Collector):
102
"""Collector for database metrics."""
103
104
def __init__(self, db_connection):
105
self.db_connection = db_connection
106
107
def collect(self):
108
# Query database for metrics
109
try:
110
cursor = self.db_connection.cursor()
111
112
# Table sizes
113
cursor.execute("SELECT table_name, table_rows FROM information_schema.tables WHERE table_schema = 'myapp'")
114
table_metric = Metric('database_table_rows', 'Rows in database tables', 'gauge')
115
116
for table_name, row_count in cursor.fetchall():
117
table_metric.add_sample('database_table_rows', {'table': table_name}, row_count or 0)
118
119
yield table_metric
120
121
# Connection pool status
122
cursor.execute("SHOW STATUS LIKE 'Threads_connected'")
123
result = cursor.fetchone()
124
if result:
125
conn_metric = Metric('database_connections_active', 'Active database connections', 'gauge')
126
conn_metric.add_sample('database_connections_active', {}, float(result[1]))
127
yield conn_metric
128
129
except Exception as e:
130
# Return error metric if database is unavailable
131
error_metric = Metric('database_collector_errors_total', 'Database collector errors', 'counter')
132
error_metric.add_sample('database_collector_errors_total', {'error': str(type(e).__name__)}, 1.0)
133
yield error_metric
134
135
# Register custom collectors
136
registry = CollectorRegistry()
137
138
# System resources
139
system_collector = SystemResourceCollector()
140
registry.register(system_collector)
141
142
# External services
143
api_services = {
144
'user_service': 'http://user-service:8080',
145
'payment_service': 'http://payment-service:8080',
146
'inventory_service': 'http://inventory-service:8080'
147
}
148
api_collector = APIHealthCollector(api_services)
149
registry.register(api_collector)
150
151
# Database (mock connection for example)
152
class MockDBConnection:
153
def cursor(self):
154
return MockCursor()
155
156
class MockCursor:
157
def execute(self, query):
158
pass
159
def fetchall(self):
160
return [('users', 1000), ('orders', 500), ('products', 200)]
161
def fetchone(self):
162
return ('Threads_connected', '25')
163
164
db_collector = DatabaseCollector(MockDBConnection())
165
registry.register(db_collector)
166
167
# Start server with custom collectors
168
start_http_server(8000, registry=registry)
169
```
170
171
### Metric Families for Custom Collectors
172
173
Pre-built metric family classes that simplify creating metrics in custom collectors.
174
175
```python { .api }
176
class CounterMetricFamily(Metric):
177
def __init__(self, name: str, documentation: str, value: Optional[float] = None,
178
labels: Optional[Sequence[str]] = None, created: Optional[float] = None,
179
unit: str = '', exemplar: Optional[Exemplar] = None) -> None:
180
"""Create a CounterMetricFamily for custom collectors."""
181
182
def add_metric(self, labels: Sequence[str], value: float, created: Optional[float] = None,
183
timestamp: Optional[Union[Timestamp, float]] = None,
184
exemplar: Optional[Exemplar] = None) -> None:
185
"""Add a sample to this counter metric family."""
186
187
class GaugeMetricFamily(Metric):
188
def __init__(self, name: str, documentation: str, value=None, labels=None, unit='') -> None:
189
"""Create a GaugeMetricFamily for custom collectors."""
190
191
def add_metric(self, labels: Sequence[str], value: float, timestamp=None) -> None:
192
"""Add a sample to this gauge metric family."""
193
194
class HistogramMetricFamily(Metric):
195
def __init__(self, name: str, documentation: str, buckets=None, sum_value=None, labels=None, unit='') -> None:
196
"""Create a HistogramMetricFamily for custom collectors."""
197
198
def add_metric(self, labels: Sequence[str], buckets: Sequence, sum_value: Optional[float], timestamp=None) -> None:
199
"""Add a sample to this histogram metric family."""
200
201
class SummaryMetricFamily(Metric):
202
def __init__(self, name: str, documentation: str, count_value=None, sum_value=None, labels=None, unit='') -> None:
203
"""Create a SummaryMetricFamily for custom collectors."""
204
205
def add_metric(self, labels: Sequence[str], count_value: int, sum_value: float, timestamp=None) -> None:
206
"""Add a sample to this summary metric family."""
207
208
class InfoMetricFamily(Metric):
209
def __init__(self, name: str, documentation: str, value=None, labels=None) -> None:
210
"""Create an InfoMetricFamily for custom collectors."""
211
212
def add_metric(self, labels: Sequence[str], value: Dict[str, str], timestamp=None) -> None:
213
"""Add a sample to this info metric family."""
214
215
class StateSetMetricFamily(Metric):
216
def __init__(self, name: str, documentation: str, value=None, labels=None) -> None:
217
"""Create a StateSetMetricFamily for custom collectors."""
218
219
def add_metric(self, labels: Sequence[str], value: Dict[str, bool], timestamp=None) -> None:
220
"""Add a sample to this state set metric family."""
221
222
class GaugeHistogramMetricFamily(Metric):
223
def __init__(self, name: str, documentation: str, buckets: Optional[Sequence[Tuple[str, float]]] = None,
224
gsum_value: Optional[float] = None, labels: Optional[Sequence[str]] = None,
225
unit: str = '') -> None:
226
"""Create a GaugeHistogramMetricFamily for custom collectors."""
227
228
def add_metric(self, labels: Sequence[str], buckets: Sequence[Tuple[str, float]],
229
gsum_value: Optional[float], timestamp: Optional[Union[float, Timestamp]] = None) -> None:
230
"""Add a sample to this gauge histogram metric family."""
231
```
232
233
**Usage Example:**
234
235
```python
236
from prometheus_client import (
237
Collector, CounterMetricFamily, GaugeMetricFamily,
238
HistogramMetricFamily, SummaryMetricFamily, InfoMetricFamily,
239
CollectorRegistry
240
)
241
import time
242
import random
243
244
class ApplicationMetricsCollector(Collector):
245
"""Example collector using metric families."""
246
247
def collect(self):
248
# Counter family - request counts by endpoint
249
request_counter = CounterMetricFamily(
250
'app_requests_total',
251
'Total application requests',
252
labels=['endpoint', 'method']
253
)
254
255
# Simulate request data
256
endpoints_data = {
257
('/api/users', 'GET'): 1500,
258
('/api/users', 'POST'): 300,
259
('/api/orders', 'GET'): 800,
260
('/api/orders', 'POST'): 200,
261
}
262
263
for (endpoint, method), count in endpoints_data.items():
264
request_counter.add_metric([endpoint, method], count)
265
266
yield request_counter
267
268
# Gauge family - current resource usage
269
resource_gauge = GaugeMetricFamily(
270
'app_resource_usage',
271
'Current resource usage',
272
labels=['resource', 'unit']
273
)
274
275
resource_gauge.add_metric(['memory', 'bytes'], 1073741824) # 1GB
276
resource_gauge.add_metric(['cpu', 'percent'], 45.2)
277
resource_gauge.add_metric(['connections', 'count'], 127)
278
279
yield resource_gauge
280
281
# Histogram family - response times
282
response_histogram = HistogramMetricFamily(
283
'app_response_time_seconds',
284
'Response time distribution',
285
labels=['endpoint']
286
)
287
288
# Simulate histogram data (buckets with cumulative counts)
289
buckets = [
290
('0.1', 100), # 100 requests <= 0.1s
291
('0.25', 200), # 200 requests <= 0.25s
292
('0.5', 280), # 280 requests <= 0.5s
293
('1.0', 300), # 300 requests <= 1.0s
294
('+Inf', 305), # 305 total requests
295
]
296
297
response_histogram.add_metric(['/api/users'], buckets, 45.7) # sum=45.7
298
response_histogram.add_metric(['/api/orders'], buckets, 32.1) # sum=32.1
299
300
yield response_histogram
301
302
# Summary family - request sizes
303
size_summary = SummaryMetricFamily(
304
'app_request_size_bytes',
305
'Request size summary',
306
labels=['endpoint']
307
)
308
309
size_summary.add_metric(['/api/users'], count_value=500, sum_value=250000)
310
size_summary.add_metric(['/api/orders'], count_value=300, sum_value=150000)
311
312
yield size_summary
313
314
# Info family - application metadata
315
app_info = InfoMetricFamily(
316
'app_info',
317
'Application information'
318
)
319
320
app_info.add_metric([], {
321
'version': '2.1.0',
322
'build_date': '2023-10-15',
323
'git_commit': 'abc123def',
324
'environment': 'production'
325
})
326
327
yield app_info
328
329
# Use the collector
330
registry = CollectorRegistry()
331
app_collector = ApplicationMetricsCollector()
332
registry.register(app_collector)
333
334
# Generate output
335
from prometheus_client import generate_latest
336
output = generate_latest(registry)
337
print(output.decode('utf-8'))
338
```
339
340
### Multiprocess Support
341
342
Support for collecting metrics across multiple processes using shared memory files.
343
344
```python { .api }
345
class MultiProcessCollector:
346
def __init__(self, registry, path=None) -> None:
347
"""
348
Create a MultiProcessCollector.
349
350
Parameters:
351
- registry: Registry to register with
352
- path: Path to multiprocess metrics directory (defaults to PROMETHEUS_MULTIPROC_DIR env var)
353
"""
354
355
@staticmethod
356
def merge(files, accumulate=True):
357
"""
358
Merge metrics from multiple process files.
359
360
Parameters:
361
- files: List of file paths containing metrics
362
- accumulate: Whether to accumulate values across processes
363
364
Returns:
365
Dictionary of merged metrics
366
"""
367
```
368
369
**Usage Example:**
370
371
```python
372
import os
373
import multiprocessing
374
import time
375
from prometheus_client import Counter, Gauge, Histogram, CollectorRegistry, generate_latest
376
from prometheus_client.multiprocess import MultiProcessCollector
377
378
# Set up multiprocess directory
379
os.environ['PROMETHEUS_MULTIPROC_DIR'] = '/tmp/prometheus_multiproc'
380
os.makedirs('/tmp/prometheus_multiproc', exist_ok=True)
381
382
def worker_function(worker_id, duration):
383
"""Worker function that generates metrics."""
384
385
# Create metrics in worker process
386
work_counter = Counter('worker_tasks_completed_total', 'Tasks completed by worker', ['worker_id'])
387
work_duration = Histogram('worker_task_duration_seconds', 'Task duration', ['worker_id'])
388
389
for i in range(10):
390
start_time = time.time()
391
392
# Simulate work
393
time.sleep(duration)
394
395
# Record metrics
396
work_counter.labels(worker_id=str(worker_id)).inc()
397
work_duration.labels(worker_id=str(worker_id)).observe(time.time() - start_time)
398
399
print(f"Worker {worker_id} completed task {i+1}")
400
401
def main_multiprocess_example():
402
"""Example of multiprocess metrics collection."""
403
404
# Create main process registry with multiprocess collector
405
registry = CollectorRegistry()
406
MultiProcessCollector(registry)
407
408
# Create some metrics in main process
409
main_counter = Counter('main_process_operations_total', 'Main process operations')
410
main_counter.inc(5)
411
412
# Start worker processes
413
processes = []
414
for worker_id in range(3):
415
p = multiprocessing.Process(
416
target=worker_function,
417
args=(worker_id, 0.1)
418
)
419
p.start()
420
processes.append(p)
421
422
# Wait for workers to complete
423
for p in processes:
424
p.join()
425
426
# Collect all metrics from all processes
427
print("Multiprocess metrics:")
428
output = generate_latest(registry)
429
print(output.decode('utf-8'))
430
431
# Clean up
432
import shutil
433
shutil.rmtree('/tmp/prometheus_multiproc')
434
435
if __name__ == '__main__':
436
main_multiprocess_example()
437
```
438
439
### Sample and Data Types
440
441
Advanced data types for representing metrics samples, timestamps, and exemplars.
442
443
```python { .api }
444
class Sample(NamedTuple):
445
name: str
446
labels: Dict[str, str]
447
value: float
448
timestamp: Optional[Union[float, Timestamp]] = None
449
exemplar: Optional[Exemplar] = None
450
native_histogram: Optional[NativeHistogram] = None
451
452
class Exemplar(NamedTuple):
453
labels: Dict[str, str]
454
value: float
455
timestamp: Optional[Union[float, Timestamp]] = None
456
457
class Timestamp:
458
def __init__(self, sec: float, nsec: float) -> None:
459
"""
460
Create a timestamp with nanosecond precision.
461
462
Parameters:
463
- sec: Seconds since Unix epoch
464
- nsec: Nanoseconds component
465
"""
466
467
def __str__(self) -> str:
468
"""String representation of timestamp."""
469
470
def __float__(self) -> float:
471
"""Convert to float (seconds since epoch)."""
472
473
class BucketSpan(NamedTuple):
474
offset: int
475
length: int
476
477
class NativeHistogram(NamedTuple):
478
count_value: float
479
sum_value: float
480
schema: int
481
zero_threshold: float
482
zero_count: float
483
pos_spans: Optional[Sequence[BucketSpan]] = None
484
neg_spans: Optional[Sequence[BucketSpan]] = None
485
pos_deltas: Optional[Sequence[int]] = None
486
neg_deltas: Optional[Sequence[int]] = None
487
```
488
489
**Usage Example:**
490
491
```python
492
from prometheus_client import Metric, Sample, Exemplar, Timestamp, CollectorRegistry
493
import time
494
495
class CustomSampleCollector:
496
"""Collector that demonstrates advanced sample types."""
497
498
def collect(self):
499
# Create metric with custom samples
500
custom_metric = Metric('custom_samples', 'Custom sample demonstration', 'gauge')
501
502
# Add sample with timestamp
503
now = time.time()
504
timestamp = Timestamp(int(now), int((now % 1) * 1e9))
505
506
custom_metric.add_sample(
507
'custom_samples',
508
{'instance': 'server1', 'job': 'app'},
509
42.0,
510
timestamp=timestamp
511
)
512
513
# Add sample with exemplar (for tracing)
514
exemplar = Exemplar(
515
labels={'trace_id': 'abc123', 'span_id': 'def456'},
516
value=42.0,
517
timestamp=timestamp
518
)
519
520
custom_metric.add_sample(
521
'custom_samples',
522
{'instance': 'server2', 'job': 'app'},
523
38.5,
524
exemplar=exemplar
525
)
526
527
yield custom_metric
528
529
# Access sample properties
530
for sample in custom_metric.samples:
531
print(f"Sample: {sample.name}")
532
print(f"Labels: {sample.labels}")
533
print(f"Value: {sample.value}")
534
if sample.timestamp:
535
print(f"Timestamp: {sample.timestamp}")
536
if sample.exemplar:
537
print(f"Exemplar: {sample.exemplar}")
538
539
# Use the collector
540
registry = CollectorRegistry()
541
collector = CustomSampleCollector()
542
registry.register(collector)
543
544
from prometheus_client import generate_latest
545
output = generate_latest(registry)
546
print(output.decode('utf-8'))
547
```
548
549
### Validation and Configuration
550
551
Utilities for metric validation and library configuration.
552
553
```python { .api }
554
def get_legacy_validation() -> bool:
555
"""Get current legacy validation setting."""
556
557
def disable_legacy_validation() -> None:
558
"""Disable legacy metric name validation."""
559
560
def enable_legacy_validation() -> None:
561
"""Enable legacy metric name validation."""
562
563
# Utility constants
564
INF: float = float("inf")
565
MINUS_INF: float = float("-inf")
566
NaN: float = float("NaN")
567
568
def floatToGoString(d) -> str:
569
"""Convert float to Go-style string representation."""
570
```
571
572
**Usage Example:**
573
574
```python
575
from prometheus_client import Counter, disable_legacy_validation, enable_legacy_validation
576
from prometheus_client.utils import INF, floatToGoString
577
578
# Configuration
579
print(f"Legacy validation enabled: {get_legacy_validation()}")
580
581
# Allow more flexible metric names
582
disable_legacy_validation()
583
584
# Create metric with name that would fail legacy validation
585
flexible_counter = Counter('my-app:request_count', 'Requests with flexible naming')
586
flexible_counter.inc()
587
588
# Re-enable strict validation
589
enable_legacy_validation()
590
591
# Utility functions
592
print(f"Infinity as Go string: {floatToGoString(INF)}")
593
print(f"Large number as Go string: {floatToGoString(1.23e10)}")
594
595
# Constants usage
596
histogram_buckets = [0.1, 0.5, 1.0, 2.5, 5.0, 10.0, INF]
597
```
598
599
### Bridge Integrations
600
601
Integration components for connecting with other monitoring systems.
602
603
```python { .api }
604
# Graphite Bridge
605
class GraphiteBridge:
606
def __init__(
607
self,
608
address: Tuple[str, int],
609
registry: CollectorRegistry = REGISTRY,
610
_push_interval: float = 30,
611
tags: bool = False
612
) -> None:
613
"""
614
Create a bridge to Graphite.
615
616
Parameters:
617
- address: (host, port) tuple for Graphite server
618
- registry: Registry to read metrics from
619
- _push_interval: Interval between pushes in seconds
620
- tags: Whether to use Graphite tags format
621
"""
622
```
623
624
**Usage Example:**
625
626
```python
627
from prometheus_client import Counter, Gauge, CollectorRegistry
628
from prometheus_client.bridge.graphite import GraphiteBridge
629
import time
630
import threading
631
632
# Create metrics
633
registry = CollectorRegistry()
634
requests = Counter('app_requests_total', 'Total requests', ['endpoint'], registry=registry)
635
memory_usage = Gauge('app_memory_bytes', 'Memory usage', registry=registry)
636
637
# Generate some data
638
requests.labels('/api/users').inc(100)
639
requests.labels('/api/orders').inc(50)
640
memory_usage.set(1024 * 1024 * 512) # 512MB
641
642
# Bridge to Graphite (example with mock address)
643
bridge = GraphiteBridge(
644
address=('graphite.example.com', 2003),
645
registry=registry,
646
_push_interval=10, # Push every 10 seconds
647
tags=True # Use Graphite tags format
648
)
649
650
# The bridge will automatically push metrics to Graphite
651
# Metrics will be formatted as:
652
# app_requests_total;endpoint=/api/users 100 timestamp
653
# app_requests_total;endpoint=/api/orders 50 timestamp
654
# app_memory_bytes 536870912 timestamp
655
656
print("Bridge configured to push metrics to Graphite")
657
```
658
659
### OpenMetrics Support
660
661
Support for the OpenMetrics exposition format.
662
663
```python { .api }
664
# From prometheus_client.openmetrics.exposition
665
CONTENT_TYPE_LATEST: str = 'application/openmetrics-text; version=1.0.0; charset=utf-8'
666
667
def generate_latest(registry) -> bytes:
668
"""Generate OpenMetrics format output."""
669
670
def escape_metric_name(s: str) -> str:
671
"""Escape metric name for OpenMetrics format."""
672
673
def escape_label_name(s: str) -> str:
674
"""Escape label name for OpenMetrics format."""
675
```
676
677
**Usage Example:**
678
679
```python
680
from prometheus_client import Counter, Gauge, CollectorRegistry
681
from prometheus_client.openmetrics.exposition import generate_latest as openmetrics_generate
682
683
# Create metrics
684
registry = CollectorRegistry()
685
counter = Counter('test_counter', 'Test counter', registry=registry)
686
gauge = Gauge('test_gauge', 'Test gauge', ['label'], registry=registry)
687
688
counter.inc(42)
689
gauge.labels('value1').set(3.14)
690
691
# Generate OpenMetrics format
692
openmetrics_output = openmetrics_generate(registry)
693
print("OpenMetrics format:")
694
print(openmetrics_output.decode('utf-8'))
695
696
# Compare with Prometheus format
697
from prometheus_client import generate_latest as prometheus_generate
698
prometheus_output = prometheus_generate(registry)
699
print("\nPrometheus format:")
700
print(prometheus_output.decode('utf-8'))
701
```