Python client for the Prometheus monitoring system.
—
Advanced functionality for building custom metric collectors, multiprocess support, and specialized integrations including metric families, sample types, and bridge components. These features enable sophisticated monitoring scenarios and integration with complex deployment architectures.
Building custom collectors that generate metrics dynamically, integrate with external systems, or provide specialized monitoring capabilities.
class Collector:
def collect(self) -> Iterable[Metric]:
"""
Collect and return metrics.
This abstract method must be implemented by all collectors.
Returns:
Iterable of Metric objects
"""
def describe(self) -> Iterable[Metric]:
"""
Describe the metrics that this collector will provide.
Default implementation returns collect() results.
Override for better performance if metrics are expensive to collect.
Returns:
Iterable of Metric objects (without samples)
"""Usage Example:
from prometheus_client import Collector, Metric, CollectorRegistry, start_http_server
import psutil
import json
import requests
import time
class SystemResourceCollector(Collector):
"""Comprehensive system resource collector."""
def collect(self):
# CPU metrics
cpu_metric = Metric('system_cpu_usage_percent', 'CPU usage by core', 'gauge')
for i, percent in enumerate(psutil.cpu_percent(percpu=True)):
cpu_metric.add_sample('system_cpu_usage_percent', {'core': str(i)}, percent)
yield cpu_metric
# Memory metrics
memory = psutil.virtual_memory()
memory_metric = Metric('system_memory_bytes', 'System memory usage', 'gauge')
memory_metric.add_sample('system_memory_bytes', {'type': 'total'}, memory.total)
memory_metric.add_sample('system_memory_bytes', {'type': 'available'}, memory.available)
memory_metric.add_sample('system_memory_bytes', {'type': 'used'}, memory.used)
memory_metric.add_sample('system_memory_bytes', {'type': 'cached'}, memory.cached)
yield memory_metric
# Disk I/O
disk_io = psutil.disk_io_counters()
if disk_io:
io_metric = Metric('system_disk_io_bytes_total', 'Disk I/O bytes', 'counter')
io_metric.add_sample('system_disk_io_bytes_total', {'direction': 'read'}, disk_io.read_bytes)
io_metric.add_sample('system_disk_io_bytes_total', {'direction': 'write'}, disk_io.write_bytes)
yield io_metric
class APIHealthCollector(Collector):
"""Collector that monitors external API health."""
def __init__(self, services):
self.services = services # Dict of service_name -> url
def collect(self):
health_metric = Metric('external_service_up', 'External service availability', 'gauge')
response_time_metric = Metric('external_service_response_time_seconds', 'Response time', 'gauge')
for service_name, url in self.services.items():
try:
start_time = time.time()
response = requests.get(f"{url}/health", timeout=5)
response_time = time.time() - start_time
# Service is up if we get any response
health_metric.add_sample('external_service_up', {'service': service_name}, 1.0)
response_time_metric.add_sample(
'external_service_response_time_seconds',
{'service': service_name},
response_time
)
except (requests.RequestException, requests.Timeout):
# Service is down
health_metric.add_sample('external_service_up', {'service': service_name}, 0.0)
yield health_metric
yield response_time_metric
class DatabaseCollector(Collector):
"""Collector for database metrics."""
def __init__(self, db_connection):
self.db_connection = db_connection
def collect(self):
# Query database for metrics
try:
cursor = self.db_connection.cursor()
# Table sizes
cursor.execute("SELECT table_name, table_rows FROM information_schema.tables WHERE table_schema = 'myapp'")
table_metric = Metric('database_table_rows', 'Rows in database tables', 'gauge')
for table_name, row_count in cursor.fetchall():
table_metric.add_sample('database_table_rows', {'table': table_name}, row_count or 0)
yield table_metric
# Connection pool status
cursor.execute("SHOW STATUS LIKE 'Threads_connected'")
result = cursor.fetchone()
if result:
conn_metric = Metric('database_connections_active', 'Active database connections', 'gauge')
conn_metric.add_sample('database_connections_active', {}, float(result[1]))
yield conn_metric
except Exception as e:
# Return error metric if database is unavailable
error_metric = Metric('database_collector_errors_total', 'Database collector errors', 'counter')
error_metric.add_sample('database_collector_errors_total', {'error': str(type(e).__name__)}, 1.0)
yield error_metric
# Register custom collectors
registry = CollectorRegistry()
# System resources
system_collector = SystemResourceCollector()
registry.register(system_collector)
# External services
api_services = {
'user_service': 'http://user-service:8080',
'payment_service': 'http://payment-service:8080',
'inventory_service': 'http://inventory-service:8080'
}
api_collector = APIHealthCollector(api_services)
registry.register(api_collector)
# Database (mock connection for example)
class MockDBConnection:
def cursor(self):
return MockCursor()
class MockCursor:
def execute(self, query):
pass
def fetchall(self):
return [('users', 1000), ('orders', 500), ('products', 200)]
def fetchone(self):
return ('Threads_connected', '25')
db_collector = DatabaseCollector(MockDBConnection())
registry.register(db_collector)
# Start server with custom collectors
start_http_server(8000, registry=registry)Pre-built metric family classes that simplify creating metrics in custom collectors.
class CounterMetricFamily(Metric):
def __init__(self, name: str, documentation: str, value: Optional[float] = None,
labels: Optional[Sequence[str]] = None, created: Optional[float] = None,
unit: str = '', exemplar: Optional[Exemplar] = None) -> None:
"""Create a CounterMetricFamily for custom collectors."""
def add_metric(self, labels: Sequence[str], value: float, created: Optional[float] = None,
timestamp: Optional[Union[Timestamp, float]] = None,
exemplar: Optional[Exemplar] = None) -> None:
"""Add a sample to this counter metric family."""
class GaugeMetricFamily(Metric):
def __init__(self, name: str, documentation: str, value=None, labels=None, unit='') -> None:
"""Create a GaugeMetricFamily for custom collectors."""
def add_metric(self, labels: Sequence[str], value: float, timestamp=None) -> None:
"""Add a sample to this gauge metric family."""
class HistogramMetricFamily(Metric):
def __init__(self, name: str, documentation: str, buckets=None, sum_value=None, labels=None, unit='') -> None:
"""Create a HistogramMetricFamily for custom collectors."""
def add_metric(self, labels: Sequence[str], buckets: Sequence, sum_value: Optional[float], timestamp=None) -> None:
"""Add a sample to this histogram metric family."""
class SummaryMetricFamily(Metric):
def __init__(self, name: str, documentation: str, count_value=None, sum_value=None, labels=None, unit='') -> None:
"""Create a SummaryMetricFamily for custom collectors."""
def add_metric(self, labels: Sequence[str], count_value: int, sum_value: float, timestamp=None) -> None:
"""Add a sample to this summary metric family."""
class InfoMetricFamily(Metric):
def __init__(self, name: str, documentation: str, value=None, labels=None) -> None:
"""Create an InfoMetricFamily for custom collectors."""
def add_metric(self, labels: Sequence[str], value: Dict[str, str], timestamp=None) -> None:
"""Add a sample to this info metric family."""
class StateSetMetricFamily(Metric):
def __init__(self, name: str, documentation: str, value=None, labels=None) -> None:
"""Create a StateSetMetricFamily for custom collectors."""
def add_metric(self, labels: Sequence[str], value: Dict[str, bool], timestamp=None) -> None:
"""Add a sample to this state set metric family."""
class GaugeHistogramMetricFamily(Metric):
def __init__(self, name: str, documentation: str, buckets: Optional[Sequence[Tuple[str, float]]] = None,
gsum_value: Optional[float] = None, labels: Optional[Sequence[str]] = None,
unit: str = '') -> None:
"""Create a GaugeHistogramMetricFamily for custom collectors."""
def add_metric(self, labels: Sequence[str], buckets: Sequence[Tuple[str, float]],
gsum_value: Optional[float], timestamp: Optional[Union[float, Timestamp]] = None) -> None:
"""Add a sample to this gauge histogram metric family."""Usage Example:
from prometheus_client import (
Collector, CounterMetricFamily, GaugeMetricFamily,
HistogramMetricFamily, SummaryMetricFamily, InfoMetricFamily,
CollectorRegistry
)
import time
import random
class ApplicationMetricsCollector(Collector):
"""Example collector using metric families."""
def collect(self):
# Counter family - request counts by endpoint
request_counter = CounterMetricFamily(
'app_requests_total',
'Total application requests',
labels=['endpoint', 'method']
)
# Simulate request data
endpoints_data = {
('/api/users', 'GET'): 1500,
('/api/users', 'POST'): 300,
('/api/orders', 'GET'): 800,
('/api/orders', 'POST'): 200,
}
for (endpoint, method), count in endpoints_data.items():
request_counter.add_metric([endpoint, method], count)
yield request_counter
# Gauge family - current resource usage
resource_gauge = GaugeMetricFamily(
'app_resource_usage',
'Current resource usage',
labels=['resource', 'unit']
)
resource_gauge.add_metric(['memory', 'bytes'], 1073741824) # 1GB
resource_gauge.add_metric(['cpu', 'percent'], 45.2)
resource_gauge.add_metric(['connections', 'count'], 127)
yield resource_gauge
# Histogram family - response times
response_histogram = HistogramMetricFamily(
'app_response_time_seconds',
'Response time distribution',
labels=['endpoint']
)
# Simulate histogram data (buckets with cumulative counts)
buckets = [
('0.1', 100), # 100 requests <= 0.1s
('0.25', 200), # 200 requests <= 0.25s
('0.5', 280), # 280 requests <= 0.5s
('1.0', 300), # 300 requests <= 1.0s
('+Inf', 305), # 305 total requests
]
response_histogram.add_metric(['/api/users'], buckets, 45.7) # sum=45.7
response_histogram.add_metric(['/api/orders'], buckets, 32.1) # sum=32.1
yield response_histogram
# Summary family - request sizes
size_summary = SummaryMetricFamily(
'app_request_size_bytes',
'Request size summary',
labels=['endpoint']
)
size_summary.add_metric(['/api/users'], count_value=500, sum_value=250000)
size_summary.add_metric(['/api/orders'], count_value=300, sum_value=150000)
yield size_summary
# Info family - application metadata
app_info = InfoMetricFamily(
'app_info',
'Application information'
)
app_info.add_metric([], {
'version': '2.1.0',
'build_date': '2023-10-15',
'git_commit': 'abc123def',
'environment': 'production'
})
yield app_info
# Use the collector
registry = CollectorRegistry()
app_collector = ApplicationMetricsCollector()
registry.register(app_collector)
# Generate output
from prometheus_client import generate_latest
output = generate_latest(registry)
print(output.decode('utf-8'))Support for collecting metrics across multiple processes using shared memory files.
class MultiProcessCollector:
def __init__(self, registry, path=None) -> None:
"""
Create a MultiProcessCollector.
Parameters:
- registry: Registry to register with
- path: Path to multiprocess metrics directory (defaults to PROMETHEUS_MULTIPROC_DIR env var)
"""
@staticmethod
def merge(files, accumulate=True):
"""
Merge metrics from multiple process files.
Parameters:
- files: List of file paths containing metrics
- accumulate: Whether to accumulate values across processes
Returns:
Dictionary of merged metrics
"""Usage Example:
import os
import multiprocessing
import time
from prometheus_client import Counter, Gauge, Histogram, CollectorRegistry, generate_latest
from prometheus_client.multiprocess import MultiProcessCollector
# Set up multiprocess directory
os.environ['PROMETHEUS_MULTIPROC_DIR'] = '/tmp/prometheus_multiproc'
os.makedirs('/tmp/prometheus_multiproc', exist_ok=True)
def worker_function(worker_id, duration):
"""Worker function that generates metrics."""
# Create metrics in worker process
work_counter = Counter('worker_tasks_completed_total', 'Tasks completed by worker', ['worker_id'])
work_duration = Histogram('worker_task_duration_seconds', 'Task duration', ['worker_id'])
for i in range(10):
start_time = time.time()
# Simulate work
time.sleep(duration)
# Record metrics
work_counter.labels(worker_id=str(worker_id)).inc()
work_duration.labels(worker_id=str(worker_id)).observe(time.time() - start_time)
print(f"Worker {worker_id} completed task {i+1}")
def main_multiprocess_example():
"""Example of multiprocess metrics collection."""
# Create main process registry with multiprocess collector
registry = CollectorRegistry()
MultiProcessCollector(registry)
# Create some metrics in main process
main_counter = Counter('main_process_operations_total', 'Main process operations')
main_counter.inc(5)
# Start worker processes
processes = []
for worker_id in range(3):
p = multiprocessing.Process(
target=worker_function,
args=(worker_id, 0.1)
)
p.start()
processes.append(p)
# Wait for workers to complete
for p in processes:
p.join()
# Collect all metrics from all processes
print("Multiprocess metrics:")
output = generate_latest(registry)
print(output.decode('utf-8'))
# Clean up
import shutil
shutil.rmtree('/tmp/prometheus_multiproc')
if __name__ == '__main__':
main_multiprocess_example()Advanced data types for representing metrics samples, timestamps, and exemplars.
class Sample(NamedTuple):
name: str
labels: Dict[str, str]
value: float
timestamp: Optional[Union[float, Timestamp]] = None
exemplar: Optional[Exemplar] = None
native_histogram: Optional[NativeHistogram] = None
class Exemplar(NamedTuple):
labels: Dict[str, str]
value: float
timestamp: Optional[Union[float, Timestamp]] = None
class Timestamp:
def __init__(self, sec: float, nsec: float) -> None:
"""
Create a timestamp with nanosecond precision.
Parameters:
- sec: Seconds since Unix epoch
- nsec: Nanoseconds component
"""
def __str__(self) -> str:
"""String representation of timestamp."""
def __float__(self) -> float:
"""Convert to float (seconds since epoch)."""
class BucketSpan(NamedTuple):
offset: int
length: int
class NativeHistogram(NamedTuple):
count_value: float
sum_value: float
schema: int
zero_threshold: float
zero_count: float
pos_spans: Optional[Sequence[BucketSpan]] = None
neg_spans: Optional[Sequence[BucketSpan]] = None
pos_deltas: Optional[Sequence[int]] = None
neg_deltas: Optional[Sequence[int]] = NoneUsage Example:
from prometheus_client import Metric, Sample, Exemplar, Timestamp, CollectorRegistry
import time
class CustomSampleCollector:
"""Collector that demonstrates advanced sample types."""
def collect(self):
# Create metric with custom samples
custom_metric = Metric('custom_samples', 'Custom sample demonstration', 'gauge')
# Add sample with timestamp
now = time.time()
timestamp = Timestamp(int(now), int((now % 1) * 1e9))
custom_metric.add_sample(
'custom_samples',
{'instance': 'server1', 'job': 'app'},
42.0,
timestamp=timestamp
)
# Add sample with exemplar (for tracing)
exemplar = Exemplar(
labels={'trace_id': 'abc123', 'span_id': 'def456'},
value=42.0,
timestamp=timestamp
)
custom_metric.add_sample(
'custom_samples',
{'instance': 'server2', 'job': 'app'},
38.5,
exemplar=exemplar
)
yield custom_metric
# Access sample properties
for sample in custom_metric.samples:
print(f"Sample: {sample.name}")
print(f"Labels: {sample.labels}")
print(f"Value: {sample.value}")
if sample.timestamp:
print(f"Timestamp: {sample.timestamp}")
if sample.exemplar:
print(f"Exemplar: {sample.exemplar}")
# Use the collector
registry = CollectorRegistry()
collector = CustomSampleCollector()
registry.register(collector)
from prometheus_client import generate_latest
output = generate_latest(registry)
print(output.decode('utf-8'))Utilities for metric validation and library configuration.
def get_legacy_validation() -> bool:
"""Get current legacy validation setting."""
def disable_legacy_validation() -> None:
"""Disable legacy metric name validation."""
def enable_legacy_validation() -> None:
"""Enable legacy metric name validation."""
# Utility constants
INF: float = float("inf")
MINUS_INF: float = float("-inf")
NaN: float = float("NaN")
def floatToGoString(d) -> str:
"""Convert float to Go-style string representation."""Usage Example:
from prometheus_client import Counter, disable_legacy_validation, enable_legacy_validation
from prometheus_client.utils import INF, floatToGoString
# Configuration
print(f"Legacy validation enabled: {get_legacy_validation()}")
# Allow more flexible metric names
disable_legacy_validation()
# Create metric with name that would fail legacy validation
flexible_counter = Counter('my-app:request_count', 'Requests with flexible naming')
flexible_counter.inc()
# Re-enable strict validation
enable_legacy_validation()
# Utility functions
print(f"Infinity as Go string: {floatToGoString(INF)}")
print(f"Large number as Go string: {floatToGoString(1.23e10)}")
# Constants usage
histogram_buckets = [0.1, 0.5, 1.0, 2.5, 5.0, 10.0, INF]Integration components for connecting with other monitoring systems.
# Graphite Bridge
class GraphiteBridge:
def __init__(
self,
address: Tuple[str, int],
registry: CollectorRegistry = REGISTRY,
_push_interval: float = 30,
tags: bool = False
) -> None:
"""
Create a bridge to Graphite.
Parameters:
- address: (host, port) tuple for Graphite server
- registry: Registry to read metrics from
- _push_interval: Interval between pushes in seconds
- tags: Whether to use Graphite tags format
"""Usage Example:
from prometheus_client import Counter, Gauge, CollectorRegistry
from prometheus_client.bridge.graphite import GraphiteBridge
import time
import threading
# Create metrics
registry = CollectorRegistry()
requests = Counter('app_requests_total', 'Total requests', ['endpoint'], registry=registry)
memory_usage = Gauge('app_memory_bytes', 'Memory usage', registry=registry)
# Generate some data
requests.labels('/api/users').inc(100)
requests.labels('/api/orders').inc(50)
memory_usage.set(1024 * 1024 * 512) # 512MB
# Bridge to Graphite (example with mock address)
bridge = GraphiteBridge(
address=('graphite.example.com', 2003),
registry=registry,
_push_interval=10, # Push every 10 seconds
tags=True # Use Graphite tags format
)
# The bridge will automatically push metrics to Graphite
# Metrics will be formatted as:
# app_requests_total;endpoint=/api/users 100 timestamp
# app_requests_total;endpoint=/api/orders 50 timestamp
# app_memory_bytes 536870912 timestamp
print("Bridge configured to push metrics to Graphite")Support for the OpenMetrics exposition format.
# From prometheus_client.openmetrics.exposition
CONTENT_TYPE_LATEST: str = 'application/openmetrics-text; version=1.0.0; charset=utf-8'
def generate_latest(registry) -> bytes:
"""Generate OpenMetrics format output."""
def escape_metric_name(s: str) -> str:
"""Escape metric name for OpenMetrics format."""
def escape_label_name(s: str) -> str:
"""Escape label name for OpenMetrics format."""Usage Example:
from prometheus_client import Counter, Gauge, CollectorRegistry
from prometheus_client.openmetrics.exposition import generate_latest as openmetrics_generate
# Create metrics
registry = CollectorRegistry()
counter = Counter('test_counter', 'Test counter', registry=registry)
gauge = Gauge('test_gauge', 'Test gauge', ['label'], registry=registry)
counter.inc(42)
gauge.labels('value1').set(3.14)
# Generate OpenMetrics format
openmetrics_output = openmetrics_generate(registry)
print("OpenMetrics format:")
print(openmetrics_output.decode('utf-8'))
# Compare with Prometheus format
from prometheus_client import generate_latest as prometheus_generate
prometheus_output = prometheus_generate(registry)
print("\nPrometheus format:")
print(prometheus_output.decode('utf-8'))Install with Tessl CLI
npx tessl i tessl/pypi-prometheus-client