CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-prometheus-client

Python client for the Prometheus monitoring system.

Pending
Overview
Eval results
Files

advanced.mddocs/

Advanced Features

Advanced functionality for building custom metric collectors, multiprocess support, and specialized integrations including metric families, sample types, and bridge components. These features enable sophisticated monitoring scenarios and integration with complex deployment architectures.

Capabilities

Custom Metric Collectors

Building custom collectors that generate metrics dynamically, integrate with external systems, or provide specialized monitoring capabilities.

class Collector:
    def collect(self) -> Iterable[Metric]:
        """
        Collect and return metrics.
        
        This abstract method must be implemented by all collectors.
        
        Returns:
        Iterable of Metric objects
        """
    
    def describe(self) -> Iterable[Metric]:
        """
        Describe the metrics that this collector will provide.
        
        Default implementation returns collect() results.
        Override for better performance if metrics are expensive to collect.
        
        Returns:
        Iterable of Metric objects (without samples)
        """

Usage Example:

from prometheus_client import Collector, Metric, CollectorRegistry, start_http_server
import psutil
import json
import requests
import time

class SystemResourceCollector(Collector):
    """Comprehensive system resource collector."""
    
    def collect(self):
        # CPU metrics
        cpu_metric = Metric('system_cpu_usage_percent', 'CPU usage by core', 'gauge')
        for i, percent in enumerate(psutil.cpu_percent(percpu=True)):
            cpu_metric.add_sample('system_cpu_usage_percent', {'core': str(i)}, percent)
        yield cpu_metric
        
        # Memory metrics
        memory = psutil.virtual_memory()
        memory_metric = Metric('system_memory_bytes', 'System memory usage', 'gauge')
        memory_metric.add_sample('system_memory_bytes', {'type': 'total'}, memory.total)
        memory_metric.add_sample('system_memory_bytes', {'type': 'available'}, memory.available)
        memory_metric.add_sample('system_memory_bytes', {'type': 'used'}, memory.used)
        memory_metric.add_sample('system_memory_bytes', {'type': 'cached'}, memory.cached)
        yield memory_metric
        
        # Disk I/O
        disk_io = psutil.disk_io_counters()
        if disk_io:
            io_metric = Metric('system_disk_io_bytes_total', 'Disk I/O bytes', 'counter')
            io_metric.add_sample('system_disk_io_bytes_total', {'direction': 'read'}, disk_io.read_bytes)
            io_metric.add_sample('system_disk_io_bytes_total', {'direction': 'write'}, disk_io.write_bytes)
            yield io_metric

class APIHealthCollector(Collector):
    """Collector that monitors external API health."""
    
    def __init__(self, services):
        self.services = services  # Dict of service_name -> url
    
    def collect(self):
        health_metric = Metric('external_service_up', 'External service availability', 'gauge')
        response_time_metric = Metric('external_service_response_time_seconds', 'Response time', 'gauge')
        
        for service_name, url in self.services.items():
            try:
                start_time = time.time()
                response = requests.get(f"{url}/health", timeout=5)
                response_time = time.time() - start_time
                
                # Service is up if we get any response
                health_metric.add_sample('external_service_up', {'service': service_name}, 1.0)
                response_time_metric.add_sample(
                    'external_service_response_time_seconds', 
                    {'service': service_name}, 
                    response_time
                )
                
            except (requests.RequestException, requests.Timeout):
                # Service is down
                health_metric.add_sample('external_service_up', {'service': service_name}, 0.0)
        
        yield health_metric
        yield response_time_metric

class DatabaseCollector(Collector):
    """Collector for database metrics."""
    
    def __init__(self, db_connection):
        self.db_connection = db_connection
    
    def collect(self):
        # Query database for metrics
        try:
            cursor = self.db_connection.cursor()
            
            # Table sizes
            cursor.execute("SELECT table_name, table_rows FROM information_schema.tables WHERE table_schema = 'myapp'")
            table_metric = Metric('database_table_rows', 'Rows in database tables', 'gauge')
            
            for table_name, row_count in cursor.fetchall():
                table_metric.add_sample('database_table_rows', {'table': table_name}, row_count or 0)
            
            yield table_metric
            
            # Connection pool status
            cursor.execute("SHOW STATUS LIKE 'Threads_connected'")
            result = cursor.fetchone()
            if result:
                conn_metric = Metric('database_connections_active', 'Active database connections', 'gauge')
                conn_metric.add_sample('database_connections_active', {}, float(result[1]))
                yield conn_metric
                
        except Exception as e:
            # Return error metric if database is unavailable
            error_metric = Metric('database_collector_errors_total', 'Database collector errors', 'counter')
            error_metric.add_sample('database_collector_errors_total', {'error': str(type(e).__name__)}, 1.0)
            yield error_metric

# Register custom collectors
registry = CollectorRegistry()

# System resources
system_collector = SystemResourceCollector()
registry.register(system_collector)

# External services
api_services = {
    'user_service': 'http://user-service:8080',
    'payment_service': 'http://payment-service:8080',
    'inventory_service': 'http://inventory-service:8080'
}
api_collector = APIHealthCollector(api_services)
registry.register(api_collector)

# Database (mock connection for example)
class MockDBConnection:
    def cursor(self):
        return MockCursor()

class MockCursor:
    def execute(self, query):
        pass
    def fetchall(self):
        return [('users', 1000), ('orders', 500), ('products', 200)]
    def fetchone(self):
        return ('Threads_connected', '25')

db_collector = DatabaseCollector(MockDBConnection())
registry.register(db_collector)

# Start server with custom collectors
start_http_server(8000, registry=registry)

Metric Families for Custom Collectors

Pre-built metric family classes that simplify creating metrics in custom collectors.

class CounterMetricFamily(Metric):
    def __init__(self, name: str, documentation: str, value: Optional[float] = None, 
                 labels: Optional[Sequence[str]] = None, created: Optional[float] = None, 
                 unit: str = '', exemplar: Optional[Exemplar] = None) -> None:
        """Create a CounterMetricFamily for custom collectors."""
    
    def add_metric(self, labels: Sequence[str], value: float, created: Optional[float] = None, 
                   timestamp: Optional[Union[Timestamp, float]] = None, 
                   exemplar: Optional[Exemplar] = None) -> None:
        """Add a sample to this counter metric family."""

class GaugeMetricFamily(Metric):
    def __init__(self, name: str, documentation: str, value=None, labels=None, unit='') -> None:
        """Create a GaugeMetricFamily for custom collectors."""
    
    def add_metric(self, labels: Sequence[str], value: float, timestamp=None) -> None:
        """Add a sample to this gauge metric family."""

class HistogramMetricFamily(Metric):
    def __init__(self, name: str, documentation: str, buckets=None, sum_value=None, labels=None, unit='') -> None:
        """Create a HistogramMetricFamily for custom collectors."""
    
    def add_metric(self, labels: Sequence[str], buckets: Sequence, sum_value: Optional[float], timestamp=None) -> None:
        """Add a sample to this histogram metric family."""

class SummaryMetricFamily(Metric):
    def __init__(self, name: str, documentation: str, count_value=None, sum_value=None, labels=None, unit='') -> None:
        """Create a SummaryMetricFamily for custom collectors."""
    
    def add_metric(self, labels: Sequence[str], count_value: int, sum_value: float, timestamp=None) -> None:
        """Add a sample to this summary metric family."""

class InfoMetricFamily(Metric):
    def __init__(self, name: str, documentation: str, value=None, labels=None) -> None:
        """Create an InfoMetricFamily for custom collectors."""
    
    def add_metric(self, labels: Sequence[str], value: Dict[str, str], timestamp=None) -> None:
        """Add a sample to this info metric family."""

class StateSetMetricFamily(Metric):
    def __init__(self, name: str, documentation: str, value=None, labels=None) -> None:
        """Create a StateSetMetricFamily for custom collectors."""
    
    def add_metric(self, labels: Sequence[str], value: Dict[str, bool], timestamp=None) -> None:
        """Add a sample to this state set metric family."""

class GaugeHistogramMetricFamily(Metric):
    def __init__(self, name: str, documentation: str, buckets: Optional[Sequence[Tuple[str, float]]] = None, 
                 gsum_value: Optional[float] = None, labels: Optional[Sequence[str]] = None, 
                 unit: str = '') -> None:
        """Create a GaugeHistogramMetricFamily for custom collectors."""
    
    def add_metric(self, labels: Sequence[str], buckets: Sequence[Tuple[str, float]], 
                   gsum_value: Optional[float], timestamp: Optional[Union[float, Timestamp]] = None) -> None:
        """Add a sample to this gauge histogram metric family."""

Usage Example:

from prometheus_client import (
    Collector, CounterMetricFamily, GaugeMetricFamily, 
    HistogramMetricFamily, SummaryMetricFamily, InfoMetricFamily,
    CollectorRegistry
)
import time
import random

class ApplicationMetricsCollector(Collector):
    """Example collector using metric families."""
    
    def collect(self):
        # Counter family - request counts by endpoint
        request_counter = CounterMetricFamily(
            'app_requests_total',
            'Total application requests',
            labels=['endpoint', 'method']
        )
        
        # Simulate request data
        endpoints_data = {
            ('/api/users', 'GET'): 1500,
            ('/api/users', 'POST'): 300,
            ('/api/orders', 'GET'): 800,
            ('/api/orders', 'POST'): 200,
        }
        
        for (endpoint, method), count in endpoints_data.items():
            request_counter.add_metric([endpoint, method], count)
        
        yield request_counter
        
        # Gauge family - current resource usage
        resource_gauge = GaugeMetricFamily(
            'app_resource_usage',
            'Current resource usage',
            labels=['resource', 'unit']
        )
        
        resource_gauge.add_metric(['memory', 'bytes'], 1073741824)  # 1GB
        resource_gauge.add_metric(['cpu', 'percent'], 45.2)
        resource_gauge.add_metric(['connections', 'count'], 127)
        
        yield resource_gauge
        
        # Histogram family - response times
        response_histogram = HistogramMetricFamily(
            'app_response_time_seconds',
            'Response time distribution',
            labels=['endpoint']
        )
        
        # Simulate histogram data (buckets with cumulative counts)
        buckets = [
            ('0.1', 100),   # 100 requests <= 0.1s
            ('0.25', 200),  # 200 requests <= 0.25s  
            ('0.5', 280),   # 280 requests <= 0.5s
            ('1.0', 300),   # 300 requests <= 1.0s
            ('+Inf', 305),  # 305 total requests
        ]
        
        response_histogram.add_metric(['/api/users'], buckets, 45.7)  # sum=45.7
        response_histogram.add_metric(['/api/orders'], buckets, 32.1)  # sum=32.1
        
        yield response_histogram
        
        # Summary family - request sizes
        size_summary = SummaryMetricFamily(
            'app_request_size_bytes',
            'Request size summary',
            labels=['endpoint']
        )
        
        size_summary.add_metric(['/api/users'], count_value=500, sum_value=250000)
        size_summary.add_metric(['/api/orders'], count_value=300, sum_value=150000)
        
        yield size_summary
        
        # Info family - application metadata
        app_info = InfoMetricFamily(
            'app_info',
            'Application information'
        )
        
        app_info.add_metric([], {
            'version': '2.1.0',
            'build_date': '2023-10-15',
            'git_commit': 'abc123def',
            'environment': 'production'
        })
        
        yield app_info

# Use the collector
registry = CollectorRegistry()
app_collector = ApplicationMetricsCollector()
registry.register(app_collector)

# Generate output
from prometheus_client import generate_latest
output = generate_latest(registry)
print(output.decode('utf-8'))

Multiprocess Support

Support for collecting metrics across multiple processes using shared memory files.

class MultiProcessCollector:
    def __init__(self, registry, path=None) -> None:
        """
        Create a MultiProcessCollector.
        
        Parameters:
        - registry: Registry to register with
        - path: Path to multiprocess metrics directory (defaults to PROMETHEUS_MULTIPROC_DIR env var)
        """
    
    @staticmethod
    def merge(files, accumulate=True):
        """
        Merge metrics from multiple process files.
        
        Parameters:
        - files: List of file paths containing metrics
        - accumulate: Whether to accumulate values across processes
        
        Returns:
        Dictionary of merged metrics
        """

Usage Example:

import os
import multiprocessing
import time
from prometheus_client import Counter, Gauge, Histogram, CollectorRegistry, generate_latest
from prometheus_client.multiprocess import MultiProcessCollector

# Set up multiprocess directory
os.environ['PROMETHEUS_MULTIPROC_DIR'] = '/tmp/prometheus_multiproc'
os.makedirs('/tmp/prometheus_multiproc', exist_ok=True)

def worker_function(worker_id, duration):
    """Worker function that generates metrics."""
    
    # Create metrics in worker process
    work_counter = Counter('worker_tasks_completed_total', 'Tasks completed by worker', ['worker_id'])
    work_duration = Histogram('worker_task_duration_seconds', 'Task duration', ['worker_id'])
    
    for i in range(10):
        start_time = time.time()
        
        # Simulate work
        time.sleep(duration)
        
        # Record metrics
        work_counter.labels(worker_id=str(worker_id)).inc()
        work_duration.labels(worker_id=str(worker_id)).observe(time.time() - start_time)
        
        print(f"Worker {worker_id} completed task {i+1}")

def main_multiprocess_example():
    """Example of multiprocess metrics collection."""
    
    # Create main process registry with multiprocess collector
    registry = CollectorRegistry()
    MultiProcessCollector(registry)
    
    # Create some metrics in main process
    main_counter = Counter('main_process_operations_total', 'Main process operations')
    main_counter.inc(5)
    
    # Start worker processes
    processes = []
    for worker_id in range(3):
        p = multiprocessing.Process(
            target=worker_function, 
            args=(worker_id, 0.1)
        )
        p.start()
        processes.append(p)
    
    # Wait for workers to complete
    for p in processes:
        p.join()
    
    # Collect all metrics from all processes
    print("Multiprocess metrics:")
    output = generate_latest(registry)
    print(output.decode('utf-8'))
    
    # Clean up
    import shutil
    shutil.rmtree('/tmp/prometheus_multiproc')

if __name__ == '__main__':
    main_multiprocess_example()

Sample and Data Types

Advanced data types for representing metrics samples, timestamps, and exemplars.

class Sample(NamedTuple):
    name: str
    labels: Dict[str, str]
    value: float
    timestamp: Optional[Union[float, Timestamp]] = None
    exemplar: Optional[Exemplar] = None
    native_histogram: Optional[NativeHistogram] = None

class Exemplar(NamedTuple):
    labels: Dict[str, str]
    value: float
    timestamp: Optional[Union[float, Timestamp]] = None

class Timestamp:
    def __init__(self, sec: float, nsec: float) -> None:
        """
        Create a timestamp with nanosecond precision.
        
        Parameters:
        - sec: Seconds since Unix epoch
        - nsec: Nanoseconds component
        """
    
    def __str__(self) -> str:
        """String representation of timestamp."""
    
    def __float__(self) -> float:
        """Convert to float (seconds since epoch)."""

class BucketSpan(NamedTuple):
    offset: int
    length: int

class NativeHistogram(NamedTuple):
    count_value: float
    sum_value: float
    schema: int
    zero_threshold: float
    zero_count: float
    pos_spans: Optional[Sequence[BucketSpan]] = None
    neg_spans: Optional[Sequence[BucketSpan]] = None
    pos_deltas: Optional[Sequence[int]] = None
    neg_deltas: Optional[Sequence[int]] = None

Usage Example:

from prometheus_client import Metric, Sample, Exemplar, Timestamp, CollectorRegistry
import time

class CustomSampleCollector:
    """Collector that demonstrates advanced sample types."""
    
    def collect(self):
        # Create metric with custom samples
        custom_metric = Metric('custom_samples', 'Custom sample demonstration', 'gauge')
        
        # Add sample with timestamp
        now = time.time()
        timestamp = Timestamp(int(now), int((now % 1) * 1e9))
        
        custom_metric.add_sample(
            'custom_samples',
            {'instance': 'server1', 'job': 'app'},
            42.0,
            timestamp=timestamp
        )
        
        # Add sample with exemplar (for tracing)
        exemplar = Exemplar(
            labels={'trace_id': 'abc123', 'span_id': 'def456'},
            value=42.0,
            timestamp=timestamp
        )
        
        custom_metric.add_sample(
            'custom_samples',
            {'instance': 'server2', 'job': 'app'},
            38.5,
            exemplar=exemplar
        )
        
        yield custom_metric
        
        # Access sample properties
        for sample in custom_metric.samples:
            print(f"Sample: {sample.name}")
            print(f"Labels: {sample.labels}")
            print(f"Value: {sample.value}")
            if sample.timestamp:
                print(f"Timestamp: {sample.timestamp}")
            if sample.exemplar:
                print(f"Exemplar: {sample.exemplar}")

# Use the collector
registry = CollectorRegistry()
collector = CustomSampleCollector()
registry.register(collector)

from prometheus_client import generate_latest
output = generate_latest(registry)
print(output.decode('utf-8'))

Validation and Configuration

Utilities for metric validation and library configuration.

def get_legacy_validation() -> bool:
    """Get current legacy validation setting."""

def disable_legacy_validation() -> None:
    """Disable legacy metric name validation."""

def enable_legacy_validation() -> None:
    """Enable legacy metric name validation."""

# Utility constants
INF: float = float("inf")
MINUS_INF: float = float("-inf")
NaN: float = float("NaN")

def floatToGoString(d) -> str:
    """Convert float to Go-style string representation."""

Usage Example:

from prometheus_client import Counter, disable_legacy_validation, enable_legacy_validation
from prometheus_client.utils import INF, floatToGoString

# Configuration
print(f"Legacy validation enabled: {get_legacy_validation()}")

# Allow more flexible metric names
disable_legacy_validation()

# Create metric with name that would fail legacy validation
flexible_counter = Counter('my-app:request_count', 'Requests with flexible naming')
flexible_counter.inc()

# Re-enable strict validation
enable_legacy_validation()

# Utility functions
print(f"Infinity as Go string: {floatToGoString(INF)}")
print(f"Large number as Go string: {floatToGoString(1.23e10)}")

# Constants usage
histogram_buckets = [0.1, 0.5, 1.0, 2.5, 5.0, 10.0, INF]

Bridge Integrations

Integration components for connecting with other monitoring systems.

# Graphite Bridge
class GraphiteBridge:
    def __init__(
        self, 
        address: Tuple[str, int], 
        registry: CollectorRegistry = REGISTRY, 
        _push_interval: float = 30, 
        tags: bool = False
    ) -> None:
        """
        Create a bridge to Graphite.
        
        Parameters:
        - address: (host, port) tuple for Graphite server
        - registry: Registry to read metrics from
        - _push_interval: Interval between pushes in seconds
        - tags: Whether to use Graphite tags format
        """

Usage Example:

from prometheus_client import Counter, Gauge, CollectorRegistry
from prometheus_client.bridge.graphite import GraphiteBridge
import time
import threading

# Create metrics
registry = CollectorRegistry()
requests = Counter('app_requests_total', 'Total requests', ['endpoint'], registry=registry)
memory_usage = Gauge('app_memory_bytes', 'Memory usage', registry=registry)

# Generate some data
requests.labels('/api/users').inc(100)
requests.labels('/api/orders').inc(50)
memory_usage.set(1024 * 1024 * 512)  # 512MB

# Bridge to Graphite (example with mock address)
bridge = GraphiteBridge(
    address=('graphite.example.com', 2003),
    registry=registry,
    _push_interval=10,  # Push every 10 seconds
    tags=True  # Use Graphite tags format
)

# The bridge will automatically push metrics to Graphite
# Metrics will be formatted as:
# app_requests_total;endpoint=/api/users 100 timestamp
# app_requests_total;endpoint=/api/orders 50 timestamp  
# app_memory_bytes 536870912 timestamp

print("Bridge configured to push metrics to Graphite")

OpenMetrics Support

Support for the OpenMetrics exposition format.

# From prometheus_client.openmetrics.exposition
CONTENT_TYPE_LATEST: str = 'application/openmetrics-text; version=1.0.0; charset=utf-8'

def generate_latest(registry) -> bytes:
    """Generate OpenMetrics format output."""

def escape_metric_name(s: str) -> str:
    """Escape metric name for OpenMetrics format."""

def escape_label_name(s: str) -> str:
    """Escape label name for OpenMetrics format."""

Usage Example:

from prometheus_client import Counter, Gauge, CollectorRegistry
from prometheus_client.openmetrics.exposition import generate_latest as openmetrics_generate

# Create metrics
registry = CollectorRegistry()
counter = Counter('test_counter', 'Test counter', registry=registry)
gauge = Gauge('test_gauge', 'Test gauge', ['label'], registry=registry)

counter.inc(42)
gauge.labels('value1').set(3.14)

# Generate OpenMetrics format
openmetrics_output = openmetrics_generate(registry)
print("OpenMetrics format:")
print(openmetrics_output.decode('utf-8'))

# Compare with Prometheus format
from prometheus_client import generate_latest as prometheus_generate
prometheus_output = prometheus_generate(registry)
print("\nPrometheus format:")
print(prometheus_output.decode('utf-8'))

Install with Tessl CLI

npx tessl i tessl/pypi-prometheus-client

docs

advanced.md

collectors.md

context-managers.md

core-metrics.md

exposition.md

index.md

registry.md

tile.json