CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-prometheus-client

Python client for the Prometheus monitoring system.

Pending
Overview
Eval results
Files

context-managers.mddocs/

Context Managers

Context managers and decorators for automatic instrumentation including timing operations, counting exceptions, and tracking in-progress work. These utilities provide convenient ways to instrument code without manual metric updates.

Capabilities

Timer

A context manager and decorator that automatically measures elapsed time and records it to a metric. Works with Histogram, Summary, and Gauge metrics.

class Timer:
    def __init__(self, metric, callback_name):
        """
        Create a Timer for the given metric.
        
        Parameters:
        - metric: Metric instance (Histogram, Summary, or Gauge)
        - callback_name: Method name to call on metric ('observe' for Histogram/Summary, 'set' for Gauge)
        """
    
    def __enter__(self):
        """Enter the timing context."""
        return self
    
    def __exit__(self, typ, value, traceback):
        """Exit the timing context and record elapsed time."""
        
    def labels(self, *args, **kw) -> None:
        """
        Update the Timer to use the labeled metric instance.
        
        Parameters:
        - args: Label values as positional arguments
        - kw: Label values as keyword arguments
        
        Note: Modifies the Timer instance in place
        """
    
    def __call__(self, f) -> Callable:
        """
        Use as a decorator.
        
        Parameters:
        - f: Function to decorate
        
        Returns:
        Decorated function that times execution
        """

Usage Example:

from prometheus_client import Histogram, Summary, Gauge
import time
import random

# Create metrics that support timing
request_duration = Histogram(
    'http_request_duration_seconds',
    'HTTP request duration',
    ['method', 'endpoint']
)

response_time = Summary(
    'http_response_time_seconds', 
    'HTTP response time summary'
)

last_request_duration = Gauge(
    'last_request_duration_seconds',
    'Duration of the last request'
)

# Context manager usage
with request_duration.labels('GET', '/api/users').time():
    # Simulate API call
    time.sleep(random.uniform(0.1, 0.5))

with response_time.time():
    # Simulate processing
    time.sleep(0.2)

# Gauge timing (records last duration)
with last_request_duration.time():
    time.sleep(0.3)

# Decorator usage
@request_duration.labels('POST', '/api/orders').time()
def create_order():
    time.sleep(random.uniform(0.2, 0.8))
    return {'order_id': 12345}

@response_time.time()
def process_request():
    time.sleep(random.uniform(0.1, 0.4))
    return "processed"

# Call decorated functions
order = create_order()
result = process_request()

# Multiple label combinations
endpoints = ['/users', '/orders', '/products']
methods = ['GET', 'POST', 'PUT']

for endpoint in endpoints:
    for method in methods:
        with request_duration.labels(method, endpoint).time():
            time.sleep(random.uniform(0.05, 0.2))

ExceptionCounter

A context manager and decorator that automatically counts exceptions of specified types. Useful for monitoring error rates and debugging application issues.

class ExceptionCounter:
    def __init__(self, counter: "Counter", exception: Union[Type[BaseException], Tuple[Type[BaseException], ...]]) -> None:
        """
        Create an ExceptionCounter.
        
        Parameters:
        - counter: Counter metric instance
        - exception: Exception type(s) to count (single type or tuple of types)
        """
    
    def __enter__(self) -> None:
        """Enter the exception counting context."""
    
    def __exit__(self, typ, value, traceback) -> Literal[False]:
        """
        Exit the context and count exceptions if they occurred.
        
        Parameters:
        - typ: Exception type (None if no exception)
        - value: Exception instance
        - traceback: Exception traceback
        
        Returns:
        False (does not suppress exceptions)
        """
    
    def __call__(self, f) -> Callable:
        """
        Use as a decorator.
        
        Parameters:
        - f: Function to decorate
        
        Returns:
        Decorated function that counts exceptions
        """

Usage Example:

from prometheus_client import Counter
import random
import time

# Create counters for different exception types
http_errors = Counter(
    'http_errors_total',
    'Total HTTP errors',
    ['error_type', 'endpoint']
)

database_errors = Counter(
    'database_errors_total',
    'Database connection errors'  
)

general_errors = Counter(
    'application_errors_total',
    'General application errors',
    ['function']
)

# Context manager usage - count specific exceptions
def risky_api_call():
    if random.random() < 0.3:
        raise ConnectionError("API unavailable")
    elif random.random() < 0.2:
        raise ValueError("Invalid response") 
    return "success"

# Count ConnectionError exceptions
with http_errors.labels('connection_error', '/api/external').count_exceptions(ConnectionError):
    result = risky_api_call()

# Count multiple exception types
with http_errors.labels('client_error', '/api/users').count_exceptions((ValueError, TypeError)):
    result = risky_api_call()

# Database operation with exception counting
def connect_to_database():
    if random.random() < 0.1:
        raise ConnectionError("Database unavailable")
    return "connected"

with database_errors.count_exceptions():  # Counts all exceptions
    connection = connect_to_database()

# Decorator usage
@general_errors.labels('data_processing').count_exceptions()
def process_data(data):
    if not data:
        raise ValueError("Empty data")
    if len(data) > 1000:
        raise MemoryError("Data too large")
    return f"processed {len(data)} items"

@http_errors.labels('timeout', '/api/slow').count_exceptions(TimeoutError)
def slow_api_call():
    time.sleep(2)
    if random.random() < 0.2:
        raise TimeoutError("Request timeout")
    return "completed"

# Call decorated functions
try:
    result = process_data([])  # Will raise ValueError and increment counter
except ValueError:
    pass

try:
    result = slow_api_call()  # May raise TimeoutError and increment counter
except TimeoutError:
    pass

# Labeled counters with exception counting
services = ['auth', 'payments', 'inventory']
error_types = ['connection', 'timeout', 'validation']

def simulate_service_call(service):
    if random.random() < 0.15:
        if random.random() < 0.5:
            raise ConnectionError(f"{service} unavailable")
        else:
            raise TimeoutError(f"{service} timeout")
    return f"{service} success"

for service in services:
    # Count different error types separately
    with http_errors.labels('connection', service).count_exceptions(ConnectionError):
        with http_errors.labels('timeout', service).count_exceptions(TimeoutError):
            try:
                result = simulate_service_call(service)
                print(f"{service}: {result}")
            except (ConnectionError, TimeoutError) as e:
                print(f"{service}: {e}")

InprogressTracker

A context manager and decorator that tracks the number of operations currently in progress. Useful for monitoring concurrent operations, queue depths, and system load.

class InprogressTracker:
    def __init__(self, gauge):
        """
        Create an InprogressTracker.
        
        Parameters:
        - gauge: Gauge metric instance to track in-progress operations
        """
    
    def __enter__(self) -> None:
        """Enter the tracking context and increment the gauge."""
    
    def __exit__(self, typ, value, traceback):
        """Exit the tracking context and decrement the gauge."""
    
    def __call__(self, f) -> Callable:
        """
        Use as a decorator.
        
        Parameters:
        - f: Function to decorate
        
        Returns:
        Decorated function that tracks execution
        """

Usage Example:

from prometheus_client import Gauge
import time
import random
import threading
import concurrent.futures

# Create gauges for tracking in-progress operations
active_requests = Gauge(
    'http_requests_inprogress',
    'Number of HTTP requests currently being processed',
    ['endpoint']
)

database_connections = Gauge(
    'database_connections_active',
    'Number of active database connections'
)

background_jobs = Gauge(
    'background_jobs_inprogress', 
    'Number of background jobs currently running',
    ['job_type']
)

# Context manager usage
def handle_request(endpoint):
    with active_requests.labels(endpoint).track_inprogress():
        # Simulate request processing
        time.sleep(random.uniform(0.1, 1.0))
        return f"Processed {endpoint}"

def database_query():
    with database_connections.track_inprogress():
        # Simulate database operation
        time.sleep(random.uniform(0.05, 0.3))
        return "query result"

# Decorator usage
@background_jobs.labels('email_sender').track_inprogress()
def send_email_batch():
    time.sleep(random.uniform(1.0, 3.0))
    return "emails sent"

@background_jobs.labels('data_sync').track_inprogress()
def sync_data():
    time.sleep(random.uniform(2.0, 5.0))
    return "data synced"

@active_requests.labels('/api/upload').track_inprogress()
def handle_file_upload():
    time.sleep(random.uniform(0.5, 2.0))
    return "file uploaded"

# Simulate concurrent operations
def simulate_concurrent_requests():
    endpoints = ['/api/users', '/api/orders', '/api/products']
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # Submit multiple concurrent requests
        futures = []
        for _ in range(10):
            endpoint = random.choice(endpoints)
            future = executor.submit(handle_request, endpoint)
            futures.append(future)
        
        # Wait for completion
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            print(f"Request completed: {result}")

def simulate_database_load():
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        # Submit database queries
        futures = [executor.submit(database_query) for _ in range(8)]
        
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            print(f"Query completed: {result}")

def simulate_background_jobs():
    job_functions = [send_email_batch, sync_data, handle_file_upload]
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        # Submit background jobs
        futures = [executor.submit(random.choice(job_functions)) for _ in range(6)]
        
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            print(f"Job completed: {result}")

# Run simulations in separate threads
request_thread = threading.Thread(target=simulate_concurrent_requests)
db_thread = threading.Thread(target=simulate_database_load) 
job_thread = threading.Thread(target=simulate_background_jobs)

request_thread.start()
db_thread.start()
job_thread.start()

# Monitor progress
for i in range(10):
    time.sleep(1)
    print(f"Active requests: {active_requests._value._value}")
    print(f"DB connections: {database_connections._value._value}")
    print(f"Background jobs: {background_jobs._child_samples()}")

request_thread.join()
db_thread.join()
job_thread.join()

Combined Usage

Example showing how to use multiple context managers together for comprehensive instrumentation.

Usage Example:

from prometheus_client import Counter, Gauge, Histogram, start_http_server
import time
import random
import threading

# Create comprehensive metrics
request_count = Counter(
    'api_requests_total',
    'Total API requests', 
    ['method', 'endpoint', 'status']
)

request_duration = Histogram(
    'api_request_duration_seconds',
    'API request duration',
    ['method', 'endpoint']
)

active_requests = Gauge(
    'api_requests_inprogress',
    'Active API requests',
    ['endpoint']
)

request_errors = Counter(
    'api_request_errors_total',
    'API request errors',
    ['endpoint', 'error_type']
)

def api_endpoint(method, endpoint):
    """Fully instrumented API endpoint handler."""
    
    # Track active requests
    with active_requests.labels(endpoint).track_inprogress():
        # Time the request
        with request_duration.labels(method, endpoint).time():
            # Count exceptions
            with request_errors.labels(endpoint, 'connection').count_exceptions(ConnectionError):
                with request_errors.labels(endpoint, 'timeout').count_exceptions(TimeoutError):
                    with request_errors.labels(endpoint, 'validation').count_exceptions(ValueError):
                        try:
                            # Simulate API processing
                            processing_time = random.uniform(0.1, 2.0)
                            time.sleep(processing_time)
                            
                            # Simulate occasional errors
                            error_chance = random.random()
                            if error_chance < 0.05:
                                raise ConnectionError("External service unavailable")
                            elif error_chance < 0.08:
                                raise TimeoutError("Request timeout")
                            elif error_chance < 0.1:
                                raise ValueError("Invalid request data")
                            
                            # Success
                            request_count.labels(method, endpoint, '200').inc()
                            return f"Success: {method} {endpoint}"
                            
                        except (ConnectionError, TimeoutError, ValueError):
                            # Error cases
                            request_count.labels(method, endpoint, '500').inc()
                            return f"Error: {method} {endpoint}"

# Start metrics server
start_http_server(8000)

# Simulate API traffic
def generate_traffic():
    endpoints = ['/users', '/orders', '/products', '/health']
    methods = ['GET', 'POST', 'PUT', 'DELETE']
    
    while True:
        endpoint = random.choice(endpoints)
        method = random.choice(methods)
        
        # Process request in separate thread to allow concurrency
        thread = threading.Thread(
            target=lambda: api_endpoint(method, endpoint),
            daemon=True
        )
        thread.start()
        
        # Variable request rate
        time.sleep(random.uniform(0.1, 0.5))

# Run traffic generator
traffic_thread = threading.Thread(target=generate_traffic, daemon=True)
traffic_thread.start()

print("API simulation running with comprehensive instrumentation:")
print("- Request counting by method, endpoint, and status")
print("- Request duration timing")  
print("- Active request tracking")
print("- Exception counting by type")
print("- Metrics available at http://localhost:8000")

try:
    while True:
        time.sleep(5)
        # Print current status
        print(f"Active requests: {sum(g._value._value for g in active_requests._metrics.values())}")
except KeyboardInterrupt:
    print("Shutting down...")

Advanced Context Manager Patterns

Custom context managers that combine multiple instrumentation types.

Usage Example:

from prometheus_client import Counter, Gauge, Histogram
import contextlib
import time

# Create metrics
operation_count = Counter('operations_total', 'Operations', ['type', 'status'])
operation_duration = Histogram('operation_duration_seconds', 'Operation duration', ['type'])
active_operations = Gauge('operations_active', 'Active operations', ['type'])
operation_errors = Counter('operation_errors_total', 'Operation errors', ['type', 'error'])

@contextlib.contextmanager
def instrument_operation(operation_type):
    """Custom context manager that applies multiple instrumentations."""
    
    # Start tracking
    active_operations.labels(operation_type).inc()
    start_time = time.time()
    
    try:
        yield
        # Success
        operation_count.labels(operation_type, 'success').inc()
        
    except Exception as e:
        # Error
        operation_count.labels(operation_type, 'error').inc()
        operation_errors.labels(operation_type, type(e).__name__).inc()
        raise
        
    finally:
        # Always record duration and decrement active count
        duration = time.time() - start_time
        operation_duration.labels(operation_type).observe(duration)
        active_operations.labels(operation_type).dec()

# Usage
with instrument_operation('database_query'):
    time.sleep(0.1)  # Simulate database query

with instrument_operation('api_call'):
    if random.random() < 0.2:
        raise ConnectionError("API failed")
    time.sleep(0.2)  # Simulate API call

# Decorator version
def instrumented_operation(operation_type):
    def decorator(func):
        def wrapper(*args, **kwargs):
            with instrument_operation(operation_type):
                return func(*args, **kwargs)
        return wrapper
    return decorator

@instrumented_operation('file_processing')
def process_file(filename):
    time.sleep(random.uniform(0.5, 2.0))
    if random.random() < 0.1:
        raise IOError(f"Cannot read {filename}")
    return f"Processed {filename}"

# Test the decorated function
for i in range(10):
    try:
        result = process_file(f"file_{i}.txt")
        print(result)
    except IOError as e:
        print(f"Error: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-prometheus-client

docs

advanced.md

collectors.md

context-managers.md

core-metrics.md

exposition.md

index.md

registry.md

tile.json