Python client for the Prometheus monitoring system.
—
Context managers and decorators for automatic instrumentation including timing operations, counting exceptions, and tracking in-progress work. These utilities provide convenient ways to instrument code without manual metric updates.
A context manager and decorator that automatically measures elapsed time and records it to a metric. Works with Histogram, Summary, and Gauge metrics.
class Timer:
def __init__(self, metric, callback_name):
"""
Create a Timer for the given metric.
Parameters:
- metric: Metric instance (Histogram, Summary, or Gauge)
- callback_name: Method name to call on metric ('observe' for Histogram/Summary, 'set' for Gauge)
"""
def __enter__(self):
"""Enter the timing context."""
return self
def __exit__(self, typ, value, traceback):
"""Exit the timing context and record elapsed time."""
def labels(self, *args, **kw) -> None:
"""
Update the Timer to use the labeled metric instance.
Parameters:
- args: Label values as positional arguments
- kw: Label values as keyword arguments
Note: Modifies the Timer instance in place
"""
def __call__(self, f) -> Callable:
"""
Use as a decorator.
Parameters:
- f: Function to decorate
Returns:
Decorated function that times execution
"""Usage Example:
from prometheus_client import Histogram, Summary, Gauge
import time
import random
# Create metrics that support timing
request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint']
)
response_time = Summary(
'http_response_time_seconds',
'HTTP response time summary'
)
last_request_duration = Gauge(
'last_request_duration_seconds',
'Duration of the last request'
)
# Context manager usage
with request_duration.labels('GET', '/api/users').time():
# Simulate API call
time.sleep(random.uniform(0.1, 0.5))
with response_time.time():
# Simulate processing
time.sleep(0.2)
# Gauge timing (records last duration)
with last_request_duration.time():
time.sleep(0.3)
# Decorator usage
@request_duration.labels('POST', '/api/orders').time()
def create_order():
time.sleep(random.uniform(0.2, 0.8))
return {'order_id': 12345}
@response_time.time()
def process_request():
time.sleep(random.uniform(0.1, 0.4))
return "processed"
# Call decorated functions
order = create_order()
result = process_request()
# Multiple label combinations
endpoints = ['/users', '/orders', '/products']
methods = ['GET', 'POST', 'PUT']
for endpoint in endpoints:
for method in methods:
with request_duration.labels(method, endpoint).time():
time.sleep(random.uniform(0.05, 0.2))A context manager and decorator that automatically counts exceptions of specified types. Useful for monitoring error rates and debugging application issues.
class ExceptionCounter:
def __init__(self, counter: "Counter", exception: Union[Type[BaseException], Tuple[Type[BaseException], ...]]) -> None:
"""
Create an ExceptionCounter.
Parameters:
- counter: Counter metric instance
- exception: Exception type(s) to count (single type or tuple of types)
"""
def __enter__(self) -> None:
"""Enter the exception counting context."""
def __exit__(self, typ, value, traceback) -> Literal[False]:
"""
Exit the context and count exceptions if they occurred.
Parameters:
- typ: Exception type (None if no exception)
- value: Exception instance
- traceback: Exception traceback
Returns:
False (does not suppress exceptions)
"""
def __call__(self, f) -> Callable:
"""
Use as a decorator.
Parameters:
- f: Function to decorate
Returns:
Decorated function that counts exceptions
"""Usage Example:
from prometheus_client import Counter
import random
import time
# Create counters for different exception types
http_errors = Counter(
'http_errors_total',
'Total HTTP errors',
['error_type', 'endpoint']
)
database_errors = Counter(
'database_errors_total',
'Database connection errors'
)
general_errors = Counter(
'application_errors_total',
'General application errors',
['function']
)
# Context manager usage - count specific exceptions
def risky_api_call():
if random.random() < 0.3:
raise ConnectionError("API unavailable")
elif random.random() < 0.2:
raise ValueError("Invalid response")
return "success"
# Count ConnectionError exceptions
with http_errors.labels('connection_error', '/api/external').count_exceptions(ConnectionError):
result = risky_api_call()
# Count multiple exception types
with http_errors.labels('client_error', '/api/users').count_exceptions((ValueError, TypeError)):
result = risky_api_call()
# Database operation with exception counting
def connect_to_database():
if random.random() < 0.1:
raise ConnectionError("Database unavailable")
return "connected"
with database_errors.count_exceptions(): # Counts all exceptions
connection = connect_to_database()
# Decorator usage
@general_errors.labels('data_processing').count_exceptions()
def process_data(data):
if not data:
raise ValueError("Empty data")
if len(data) > 1000:
raise MemoryError("Data too large")
return f"processed {len(data)} items"
@http_errors.labels('timeout', '/api/slow').count_exceptions(TimeoutError)
def slow_api_call():
time.sleep(2)
if random.random() < 0.2:
raise TimeoutError("Request timeout")
return "completed"
# Call decorated functions
try:
result = process_data([]) # Will raise ValueError and increment counter
except ValueError:
pass
try:
result = slow_api_call() # May raise TimeoutError and increment counter
except TimeoutError:
pass
# Labeled counters with exception counting
services = ['auth', 'payments', 'inventory']
error_types = ['connection', 'timeout', 'validation']
def simulate_service_call(service):
if random.random() < 0.15:
if random.random() < 0.5:
raise ConnectionError(f"{service} unavailable")
else:
raise TimeoutError(f"{service} timeout")
return f"{service} success"
for service in services:
# Count different error types separately
with http_errors.labels('connection', service).count_exceptions(ConnectionError):
with http_errors.labels('timeout', service).count_exceptions(TimeoutError):
try:
result = simulate_service_call(service)
print(f"{service}: {result}")
except (ConnectionError, TimeoutError) as e:
print(f"{service}: {e}")A context manager and decorator that tracks the number of operations currently in progress. Useful for monitoring concurrent operations, queue depths, and system load.
class InprogressTracker:
def __init__(self, gauge):
"""
Create an InprogressTracker.
Parameters:
- gauge: Gauge metric instance to track in-progress operations
"""
def __enter__(self) -> None:
"""Enter the tracking context and increment the gauge."""
def __exit__(self, typ, value, traceback):
"""Exit the tracking context and decrement the gauge."""
def __call__(self, f) -> Callable:
"""
Use as a decorator.
Parameters:
- f: Function to decorate
Returns:
Decorated function that tracks execution
"""Usage Example:
from prometheus_client import Gauge
import time
import random
import threading
import concurrent.futures
# Create gauges for tracking in-progress operations
active_requests = Gauge(
'http_requests_inprogress',
'Number of HTTP requests currently being processed',
['endpoint']
)
database_connections = Gauge(
'database_connections_active',
'Number of active database connections'
)
background_jobs = Gauge(
'background_jobs_inprogress',
'Number of background jobs currently running',
['job_type']
)
# Context manager usage
def handle_request(endpoint):
with active_requests.labels(endpoint).track_inprogress():
# Simulate request processing
time.sleep(random.uniform(0.1, 1.0))
return f"Processed {endpoint}"
def database_query():
with database_connections.track_inprogress():
# Simulate database operation
time.sleep(random.uniform(0.05, 0.3))
return "query result"
# Decorator usage
@background_jobs.labels('email_sender').track_inprogress()
def send_email_batch():
time.sleep(random.uniform(1.0, 3.0))
return "emails sent"
@background_jobs.labels('data_sync').track_inprogress()
def sync_data():
time.sleep(random.uniform(2.0, 5.0))
return "data synced"
@active_requests.labels('/api/upload').track_inprogress()
def handle_file_upload():
time.sleep(random.uniform(0.5, 2.0))
return "file uploaded"
# Simulate concurrent operations
def simulate_concurrent_requests():
endpoints = ['/api/users', '/api/orders', '/api/products']
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Submit multiple concurrent requests
futures = []
for _ in range(10):
endpoint = random.choice(endpoints)
future = executor.submit(handle_request, endpoint)
futures.append(future)
# Wait for completion
for future in concurrent.futures.as_completed(futures):
result = future.result()
print(f"Request completed: {result}")
def simulate_database_load():
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Submit database queries
futures = [executor.submit(database_query) for _ in range(8)]
for future in concurrent.futures.as_completed(futures):
result = future.result()
print(f"Query completed: {result}")
def simulate_background_jobs():
job_functions = [send_email_batch, sync_data, handle_file_upload]
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Submit background jobs
futures = [executor.submit(random.choice(job_functions)) for _ in range(6)]
for future in concurrent.futures.as_completed(futures):
result = future.result()
print(f"Job completed: {result}")
# Run simulations in separate threads
request_thread = threading.Thread(target=simulate_concurrent_requests)
db_thread = threading.Thread(target=simulate_database_load)
job_thread = threading.Thread(target=simulate_background_jobs)
request_thread.start()
db_thread.start()
job_thread.start()
# Monitor progress
for i in range(10):
time.sleep(1)
print(f"Active requests: {active_requests._value._value}")
print(f"DB connections: {database_connections._value._value}")
print(f"Background jobs: {background_jobs._child_samples()}")
request_thread.join()
db_thread.join()
job_thread.join()Example showing how to use multiple context managers together for comprehensive instrumentation.
Usage Example:
from prometheus_client import Counter, Gauge, Histogram, start_http_server
import time
import random
import threading
# Create comprehensive metrics
request_count = Counter(
'api_requests_total',
'Total API requests',
['method', 'endpoint', 'status']
)
request_duration = Histogram(
'api_request_duration_seconds',
'API request duration',
['method', 'endpoint']
)
active_requests = Gauge(
'api_requests_inprogress',
'Active API requests',
['endpoint']
)
request_errors = Counter(
'api_request_errors_total',
'API request errors',
['endpoint', 'error_type']
)
def api_endpoint(method, endpoint):
"""Fully instrumented API endpoint handler."""
# Track active requests
with active_requests.labels(endpoint).track_inprogress():
# Time the request
with request_duration.labels(method, endpoint).time():
# Count exceptions
with request_errors.labels(endpoint, 'connection').count_exceptions(ConnectionError):
with request_errors.labels(endpoint, 'timeout').count_exceptions(TimeoutError):
with request_errors.labels(endpoint, 'validation').count_exceptions(ValueError):
try:
# Simulate API processing
processing_time = random.uniform(0.1, 2.0)
time.sleep(processing_time)
# Simulate occasional errors
error_chance = random.random()
if error_chance < 0.05:
raise ConnectionError("External service unavailable")
elif error_chance < 0.08:
raise TimeoutError("Request timeout")
elif error_chance < 0.1:
raise ValueError("Invalid request data")
# Success
request_count.labels(method, endpoint, '200').inc()
return f"Success: {method} {endpoint}"
except (ConnectionError, TimeoutError, ValueError):
# Error cases
request_count.labels(method, endpoint, '500').inc()
return f"Error: {method} {endpoint}"
# Start metrics server
start_http_server(8000)
# Simulate API traffic
def generate_traffic():
endpoints = ['/users', '/orders', '/products', '/health']
methods = ['GET', 'POST', 'PUT', 'DELETE']
while True:
endpoint = random.choice(endpoints)
method = random.choice(methods)
# Process request in separate thread to allow concurrency
thread = threading.Thread(
target=lambda: api_endpoint(method, endpoint),
daemon=True
)
thread.start()
# Variable request rate
time.sleep(random.uniform(0.1, 0.5))
# Run traffic generator
traffic_thread = threading.Thread(target=generate_traffic, daemon=True)
traffic_thread.start()
print("API simulation running with comprehensive instrumentation:")
print("- Request counting by method, endpoint, and status")
print("- Request duration timing")
print("- Active request tracking")
print("- Exception counting by type")
print("- Metrics available at http://localhost:8000")
try:
while True:
time.sleep(5)
# Print current status
print(f"Active requests: {sum(g._value._value for g in active_requests._metrics.values())}")
except KeyboardInterrupt:
print("Shutting down...")Custom context managers that combine multiple instrumentation types.
Usage Example:
from prometheus_client import Counter, Gauge, Histogram
import contextlib
import time
# Create metrics
operation_count = Counter('operations_total', 'Operations', ['type', 'status'])
operation_duration = Histogram('operation_duration_seconds', 'Operation duration', ['type'])
active_operations = Gauge('operations_active', 'Active operations', ['type'])
operation_errors = Counter('operation_errors_total', 'Operation errors', ['type', 'error'])
@contextlib.contextmanager
def instrument_operation(operation_type):
"""Custom context manager that applies multiple instrumentations."""
# Start tracking
active_operations.labels(operation_type).inc()
start_time = time.time()
try:
yield
# Success
operation_count.labels(operation_type, 'success').inc()
except Exception as e:
# Error
operation_count.labels(operation_type, 'error').inc()
operation_errors.labels(operation_type, type(e).__name__).inc()
raise
finally:
# Always record duration and decrement active count
duration = time.time() - start_time
operation_duration.labels(operation_type).observe(duration)
active_operations.labels(operation_type).dec()
# Usage
with instrument_operation('database_query'):
time.sleep(0.1) # Simulate database query
with instrument_operation('api_call'):
if random.random() < 0.2:
raise ConnectionError("API failed")
time.sleep(0.2) # Simulate API call
# Decorator version
def instrumented_operation(operation_type):
def decorator(func):
def wrapper(*args, **kwargs):
with instrument_operation(operation_type):
return func(*args, **kwargs)
return wrapper
return decorator
@instrumented_operation('file_processing')
def process_file(filename):
time.sleep(random.uniform(0.5, 2.0))
if random.random() < 0.1:
raise IOError(f"Cannot read {filename}")
return f"Processed {filename}"
# Test the decorated function
for i in range(10):
try:
result = process_file(f"file_{i}.txt")
print(result)
except IOError as e:
print(f"Error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-prometheus-client