CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-prometheus-client

Python client for the Prometheus monitoring system.

Pending
Overview
Eval results
Files

exposition.mddocs/

Exposition

Functions and servers for outputting metrics in Prometheus format, including HTTP servers, WSGI/ASGI applications, file output, and push gateway integration. These components enable metrics to be scraped by Prometheus or pushed to centralized gateways.

Capabilities

Content Generation

Core functions for generating Prometheus format output from collected metrics.

CONTENT_TYPE_LATEST: str = 'text/plain; version=0.0.4; charset=utf-8'

def generate_latest(registry: CollectorRegistry = REGISTRY) -> bytes:
    """
    Generate the latest Prometheus text format output.
    
    Parameters:
    - registry: Registry to collect metrics from
    
    Returns:
    Metrics in Prometheus text format as bytes
    """

def choose_encoder(accept_header: str) -> Tuple[Callable, str]:
    """
    Choose appropriate encoder based on Accept header.
    
    Parameters:
    - accept_header: HTTP Accept header string
    
    Returns:
    Tuple of (encoder_function, content_type)
    """

Usage Example:

from prometheus_client import Counter, generate_latest, CONTENT_TYPE_LATEST

# Create some metrics
requests = Counter('http_requests_total', 'Total HTTP requests')
requests.inc(42)

# Generate Prometheus format output
metrics_data = generate_latest()
print(metrics_data.decode('utf-8'))

# Use in HTTP response
content_type = CONTENT_TYPE_LATEST
# Return metrics_data with content_type header

HTTP Servers

Built-in HTTP servers for exposing metrics endpoints that Prometheus can scrape.

def start_http_server(
    port: int, 
    addr: str = '0.0.0.0', 
    registry: CollectorRegistry = REGISTRY,
    certfile=None,
    keyfile=None,
    client_cafile=None,
    client_capath=None,
    protocol=ssl.PROTOCOL_TLS_SERVER,
    client_auth_required: bool = False
) -> Tuple[WSGIServer, threading.Thread]:
    """
    Start HTTP server for metrics.
    
    Parameters:
    - port: Port to listen on
    - addr: Address to bind to
    - registry: Registry to serve metrics from
    - certfile: SSL certificate file for HTTPS
    - keyfile: SSL private key file for HTTPS
    - client_cafile: CA file for client certificate validation
    - client_capath: CA directory for client certificate validation
    - protocol: SSL protocol version
    - client_auth_required: Whether to require client certificates
    
    Returns:
    Tuple of (server, thread) - server can be used to shutdown
    """

start_wsgi_server = start_http_server  # Alias for backward compatibility

Usage Example:

from prometheus_client import Counter, start_http_server
import time
import threading

# Create metrics
requests = Counter('app_requests_total', 'Total application requests')

# Start metrics server
server, thread = start_http_server(8000)
print("Metrics server started on http://localhost:8000")

# Application logic
def app_work():
    while True:
        requests.inc()
        time.sleep(1)

# Run application
app_thread = threading.Thread(target=app_work, daemon=True)
app_thread.start()

# Keep main thread alive
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("Shutting down...")
    server.shutdown()

# HTTPS example
server, thread = start_http_server(
    8443,
    certfile='/path/to/cert.pem',
    keyfile='/path/to/key.pem'
)

WSGI Integration

WSGI application factory for integrating metrics into existing web applications.

def make_wsgi_app(
    registry: CollectorRegistry = REGISTRY, 
    disable_compression: bool = False
) -> Callable:
    """
    Create a WSGI application that serves metrics.
    
    Parameters:
    - registry: Registry to serve metrics from
    - disable_compression: Whether to disable gzip compression
    
    Returns:
    WSGI application callable
    """

Usage Example:

from prometheus_client import Counter, make_wsgi_app
from wsgiref.simple_server import make_server
from urllib.parse import parse_qs

# Create metrics
requests = Counter('wsgi_requests_total', 'Total WSGI requests', ['path'])

# Create metrics WSGI app
metrics_app = make_wsgi_app()

# Main application
def application(environ, start_response):
    path = environ['PATH_INFO']
    
    if path == '/metrics':
        # Serve metrics
        return metrics_app(environ, start_response)
    elif path == '/':
        # Main application logic
        requests.labels(path='/').inc()
        status = '200 OK'
        headers = [('Content-type', 'text/plain')]
        start_response(status, headers)
        return [b'Hello World!']
    else:
        # 404
        requests.labels(path=path).inc()
        status = '404 Not Found'
        headers = [('Content-type', 'text/plain')]
        start_response(status, headers)
        return [b'Not Found']

# Run server
with make_server('', 8000, application) as httpd:
    print("Server running on http://localhost:8000")
    print("Metrics available at http://localhost:8000/metrics")
    httpd.serve_forever()

ASGI Integration

ASGI application factory for integrating metrics into modern async web applications.

def make_asgi_app(
    registry: CollectorRegistry = REGISTRY, 
    disable_compression: bool = False
) -> Callable:
    """
    Create an ASGI application that serves metrics.
    
    Parameters:
    - registry: Registry to serve metrics from
    - disable_compression: Whether to disable gzip compression
    
    Returns:
    ASGI application callable
    """

Usage Example:

from prometheus_client import Counter, make_asgi_app
import uvicorn

# Create metrics
requests = Counter('asgi_requests_total', 'Total ASGI requests', ['path'])

# Create metrics ASGI app
metrics_app = make_asgi_app()

# Main ASGI application
async def application(scope, receive, send):
    if scope['type'] == 'http':
        path = scope['path']
        
        if path == '/metrics':
            # Serve metrics
            await metrics_app(scope, receive, send)
        elif path == '/':
            # Main application logic
            requests.labels(path='/').inc()
            await send({
                'type': 'http.response.start',
                'status': 200,
                'headers': [[b'content-type', b'text/plain']],
            })
            await send({
                'type': 'http.response.body',
                'body': b'Hello World!',
            })
        else:
            # 404
            requests.labels(path=path).inc()
            await send({
                'type': 'http.response.start',
                'status': 404,
                'headers': [[b'content-type', b'text/plain']],
            })
            await send({
                'type': 'http.response.body',
                'body': b'Not Found',
            })

# Run with uvicorn
if __name__ == "__main__":
    uvicorn.run(application, host="0.0.0.0", port=8000)

MetricsHandler

HTTP request handler class for serving metrics, useful for custom HTTP server implementations.

class MetricsHandler(BaseHTTPRequestHandler):
    registry: CollectorRegistry = REGISTRY
    
    def do_GET(self) -> None:
        """Handle GET requests to serve metrics."""
    
    @classmethod
    def factory(cls, registry: CollectorRegistry) -> type:
        """
        Create a MetricsHandler class with custom registry.
        
        Parameters:
        - registry: Registry to serve metrics from
        
        Returns:
        MetricsHandler class configured with the registry
        """

Usage Example:

from prometheus_client import Counter, MetricsHandler, CollectorRegistry
from http.server import HTTPServer
import threading

# Create custom registry
custom_registry = CollectorRegistry()
requests = Counter('custom_requests_total', 'Custom requests', registry=custom_registry)
requests.inc(10)

# Create handler with custom registry
CustomHandler = MetricsHandler.factory(custom_registry)

# Start server
server = HTTPServer(('', 8000), CustomHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()

print("Custom metrics server running on http://localhost:8000")

File Output

Write metrics to text files for consumption by node exporter or other file-based collectors.

def write_to_textfile(path: str, registry: CollectorRegistry) -> None:
    """
    Write metrics to a text file in Prometheus format.
    
    Parameters:
    - path: File path to write to
    - registry: Registry to collect metrics from
    
    Note: This function is atomic - it writes to a temporary file and then renames it.
    """

Usage Example:

from prometheus_client import Counter, CollectorRegistry, write_to_textfile
import os
import time

# Create registry with metrics
registry = CollectorRegistry()
batch_jobs = Counter('batch_jobs_completed_total', 'Completed batch jobs', registry=registry)
batch_duration = Counter('batch_processing_seconds_total', 'Total batch processing time', registry=registry)

# Simulate batch processing
start_time = time.time()
# ... do batch work ...
batch_jobs.inc()
batch_duration.inc(time.time() - start_time)

# Write metrics to file for node exporter
metrics_dir = '/var/lib/node_exporter/textfile_collector'
os.makedirs(metrics_dir, exist_ok=True)
write_to_textfile(f'{metrics_dir}/batch_metrics.prom', registry)

# File will contain:
# # HELP batch_jobs_completed_total Completed batch jobs
# # TYPE batch_jobs_completed_total counter
# batch_jobs_completed_total 1.0
# # HELP batch_processing_seconds_total Total batch processing time
# # TYPE batch_processing_seconds_total counter
# batch_processing_seconds_total 0.123

Push Gateway Integration

Functions for pushing metrics to Prometheus push gateway for batch jobs and short-lived processes.

def push_to_gateway(
    gateway: str, 
    job: str, 
    registry: CollectorRegistry, 
    grouping_key=None, 
    timeout: Optional[float] = 30, 
    handler: Callable = default_handler
) -> None:
    """
    Push metrics to the push gateway (PUT method - replaces all metrics for job).
    
    Parameters:
    - gateway: Push gateway URL (e.g., 'localhost:9091')
    - job: Job name for grouping
    - registry: Registry to push metrics from
    - grouping_key: Additional labels for grouping
    - timeout: Request timeout in seconds
    - handler: Custom authentication/request handler
    """

def pushadd_to_gateway(
    gateway: str, 
    job: str, 
    registry: Optional[CollectorRegistry], 
    grouping_key=None, 
    timeout: Optional[float] = 30, 
    handler: Callable = default_handler
) -> None:
    """
    Push-add metrics to the push gateway (POST method - adds to existing metrics).
    
    Parameters:
    - gateway: Push gateway URL
    - job: Job name for grouping
    - registry: Registry to push metrics from
    - grouping_key: Additional labels for grouping
    - timeout: Request timeout in seconds
    - handler: Custom authentication/request handler
    """

def delete_from_gateway(
    gateway: str, 
    job: str, 
    grouping_key=None, 
    timeout: Optional[float] = 30, 
    handler: Callable = default_handler
) -> None:
    """
    Delete metrics from the push gateway.
    
    Parameters:
    - gateway: Push gateway URL
    - job: Job name to delete
    - grouping_key: Additional labels for grouping
    - timeout: Request timeout in seconds
    - handler: Custom authentication/request handler
    """

def instance_ip_grouping_key() -> Dict[str, Any]:
    """
    Get grouping key with instance IP address.
    
    Returns:
    Dictionary with 'instance' key containing local IP address
    """

Usage Example:

from prometheus_client import Counter, CollectorRegistry, push_to_gateway, instance_ip_grouping_key
import time

# Create registry for batch job
registry = CollectorRegistry()
job_duration = Counter('batch_job_duration_seconds', 'Batch job duration', registry=registry)
records_processed = Counter('batch_records_processed_total', 'Records processed', registry=registry)

# Simulate batch job
start_time = time.time()

# Process records
for i in range(1000):
    # Process record
    records_processed.inc()
    time.sleep(0.001)  # Simulate work

# Record total duration  
job_duration.inc(time.time() - start_time)

# Push metrics to gateway
push_to_gateway(
    'pushgateway.example.com:9091', 
    job='batch_processor',
    registry=registry,
    grouping_key={'batch_id': '20231015_001'}
)

# Use instance IP for grouping
push_to_gateway(
    'localhost:9091',
    job='worker',
    registry=registry,
    grouping_key=instance_ip_grouping_key()
)

# Clean up when job is done
delete_from_gateway(
    'localhost:9091',
    job='batch_processor',
    grouping_key={'batch_id': '20231015_001'}
)

Authentication Handlers

Authentication handlers for push gateway and other HTTP-based exposition methods.

def default_handler(
    url: str, 
    method: str, 
    timeout: Optional[float], 
    headers: List, 
    data: bytes
) -> Callable:
    """Default HTTP handler with no authentication."""

def basic_auth_handler(
    url: str, 
    method: str, 
    timeout: Optional[float], 
    headers: List, 
    data: bytes, 
    username=None, 
    password=None
) -> Callable:
    """
    HTTP handler with basic authentication.
    
    Parameters:
    - username: Username for basic auth
    - password: Password for basic auth
    """

def tls_auth_handler(
    url: str, 
    method: str, 
    timeout: Optional[float], 
    headers: List, 
    data: bytes, 
    certfile: str, 
    keyfile: str, 
    cafile=None, 
    protocol=ssl.PROTOCOL_TLS_CLIENT, 
    insecure_skip_verify: bool = False
) -> Callable:
    """
    HTTP handler with TLS client certificate authentication.
    
    Parameters:
    - certfile: Client certificate file
    - keyfile: Client private key file
    - cafile: CA certificate file for server verification
    - protocol: SSL protocol version
    - insecure_skip_verify: Skip server certificate verification
    """

def passthrough_redirect_handler(
    url: str, 
    method: str, 
    timeout: Optional[float], 
    headers: List, 
    data: bytes
) -> Callable:
    """HTTP handler that follows redirects."""

Usage Example:

from prometheus_client import Counter, push_to_gateway, basic_auth_handler, tls_auth_handler

registry = CollectorRegistry()
counter = Counter('authenticated_pushes', 'Pushes with auth', registry=registry)
counter.inc()

# Push with basic authentication
push_to_gateway(
    'secure-gateway.example.com:9091',
    job='authenticated_job',
    registry=registry,
    handler=lambda url, method, timeout, headers, data: basic_auth_handler(
        url, method, timeout, headers, data,
        username='user',
        password='pass'
    )
)

# Push with TLS client certificate
push_to_gateway(
    'tls-gateway.example.com:9091',
    job='tls_job',
    registry=registry,
    handler=lambda url, method, timeout, headers, data: tls_auth_handler(
        url, method, timeout, headers, data,
        certfile='/path/to/client.crt',
        keyfile='/path/to/client.key',
        cafile='/path/to/ca.crt'
    )
)

Install with Tessl CLI

npx tessl i tessl/pypi-prometheus-client

docs

advanced.md

collectors.md

context-managers.md

core-metrics.md

exposition.md

index.md

registry.md

tile.json