CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-logfire

Python observability platform with structured logging, distributed tracing, metrics collection, and automatic instrumentation for popular frameworks and AI services.

Pending
Overview
Eval results
Files

metrics.mddocs/

Metrics

Metrics creation and management for quantitative monitoring including counters, histograms, gauges, and callback-based metrics. Logfire provides comprehensive metrics capabilities built on OpenTelemetry standards for performance monitoring and system observability.

Capabilities

Counter Metrics

Counters track cumulative values that only increase over time, such as request counts, error counts, or bytes processed.

def metric_counter(name: str, *, 
                  unit: str = '', 
                  description: str = '') -> Counter:
    """
    Create a counter metric for tracking cumulative increasing values.
    
    Parameters:
    - name: Unique metric name
    - unit: Unit of measurement (e.g., 'requests', 'bytes', 'ms')
    - description: Human-readable description of what the metric measures
    
    Returns: Counter instance for recording values
    """

class Counter:
    """Counter metric for tracking cumulative increasing values."""
    
    def add(self, amount: int | float, attributes: dict[str, str] | None = None) -> None:
        """
        Add a value to the counter.
        
        Parameters:
        - amount: Positive number to add to the counter
        - attributes: Optional attributes for labeling/filtering
        """

Usage Examples:

import logfire

logfire.configure()

# Create counters for different metrics
request_counter = logfire.metric_counter(
    'http_requests_total',
    unit='requests',
    description='Total number of HTTP requests'
)

error_counter = logfire.metric_counter(
    'http_errors_total', 
    unit='errors',
    description='Total number of HTTP errors'
)

bytes_processed = logfire.metric_counter(
    'bytes_processed_total',
    unit='bytes',
    description='Total bytes processed by the application'
)

# Record counter values
request_counter.add(1, {'method': 'GET', 'endpoint': '/users'})
request_counter.add(1, {'method': 'POST', 'endpoint': '/users'})

# Record errors with context
error_counter.add(1, {'status_code': '404', 'endpoint': '/users/999'})

# Record data processing
bytes_processed.add(1024, {'operation': 'upload', 'file_type': 'json'})

Histogram Metrics

Histograms track distributions of values, automatically creating buckets for analyzing percentiles, averages, and value distributions.

def metric_histogram(name: str, *,
                    unit: str = '',
                    description: str = '') -> Histogram:
    """
    Create a histogram metric for tracking value distributions.
    
    Parameters:
    - name: Unique metric name
    - unit: Unit of measurement (e.g., 'ms', 'seconds', 'bytes')
    - description: Human-readable description of what the metric measures
    
    Returns: Histogram instance for recording values
    """

class Histogram:
    """Histogram metric for tracking value distributions."""
    
    def record(self, amount: int | float, attributes: dict[str, str] | None = None) -> None:
        """
        Record a value in the histogram.
        
        Parameters:
        - amount: Value to record
        - attributes: Optional attributes for labeling/filtering
        """

Usage Examples:

import logfire
import time

logfire.configure()

# Create histograms for performance monitoring
response_time = logfire.metric_histogram(
    'http_request_duration_ms',
    unit='ms', 
    description='HTTP request duration in milliseconds'
)

payload_size = logfire.metric_histogram(
    'request_payload_size_bytes',
    unit='bytes',
    description='Size of request payloads'
)

query_duration = logfire.metric_histogram(
    'database_query_duration_ms',
    unit='ms',
    description='Database query execution time'
)

# Record response times
def handle_request():
    start_time = time.time()
    
    # Process request
    time.sleep(0.1)  # Simulated work
    
    duration_ms = (time.time() - start_time) * 1000
    response_time.record(duration_ms, {
        'method': 'GET',
        'endpoint': '/api/users',
        'status_code': '200'
    })

# Record payload sizes
payload_size.record(2048, {'content_type': 'application/json'})

# Record query performance
query_duration.record(15.5, {'table': 'users', 'operation': 'SELECT'})

Gauge Metrics

Gauges track current values that can go up or down, such as memory usage, queue length, or temperature readings.

def metric_gauge(name: str, *,
                unit: str = '',
                description: str = '') -> Gauge:
    """
    Create a gauge metric for tracking current values that can increase or decrease.
    
    Parameters:
    - name: Unique metric name
    - unit: Unit of measurement (e.g., 'bytes', 'percent', 'count')
    - description: Human-readable description of what the metric measures
    
    Returns: Gauge instance for recording values
    """

class Gauge:
    """Gauge metric for tracking current values."""
    
    def set(self, amount: int | float, attributes: dict[str, str] | None = None) -> None:
        """
        Set the current value of the gauge.
        
        Parameters:
        - amount: Current value to set
        - attributes: Optional attributes for labeling/filtering
        """

Usage Examples:

import logfire
import psutil

logfire.configure()

# Create gauges for system monitoring
memory_usage = logfire.metric_gauge(
    'memory_usage_bytes',
    unit='bytes',
    description='Current memory usage'
)

cpu_utilization = logfire.metric_gauge(
    'cpu_utilization_percent',
    unit='percent',
    description='Current CPU utilization percentage'
)

queue_size = logfire.metric_gauge(
    'task_queue_size',
    unit='tasks',
    description='Current number of tasks in queue'
)

# Update gauge values
def update_system_metrics():
    # Memory usage
    memory_usage.set(psutil.virtual_memory().used, {'type': 'virtual'})
    
    # CPU utilization
    cpu_percent = psutil.cpu_percent()
    cpu_utilization.set(cpu_percent, {'core': 'all'})
    
    # Application-specific metrics
    queue_size.set(len(task_queue), {'queue_type': 'background_jobs'})

# Call periodically to update gauges
update_system_metrics()

Up-Down Counter Metrics

Up-down counters track values that can increase or decrease, similar to gauges but for cumulative values like active connections or items in inventory.

def metric_up_down_counter(name: str, *,
                          unit: str = '',
                          description: str = '') -> UpDownCounter:
    """
    Create an up-down counter metric for values that can increase or decrease.
    
    Parameters:
    - name: Unique metric name
    - unit: Unit of measurement (e.g., 'connections', 'items')
    - description: Human-readable description of what the metric measures
    
    Returns: UpDownCounter instance for recording changes
    """

class UpDownCounter:
    """Up-down counter metric for tracking values that can increase or decrease."""
    
    def add(self, amount: int | float, attributes: dict[str, str] | None = None) -> None:
        """
        Add a value to the up-down counter (can be positive or negative).
        
        Parameters:
        - amount: Value to add (positive to increase, negative to decrease)
        - attributes: Optional attributes for labeling/filtering
        """

Usage Examples:

import logfire

logfire.configure()

# Create up-down counters for resource tracking
active_connections = logfire.metric_up_down_counter(
    'active_connections',
    unit='connections',
    description='Number of active database connections'
)

items_in_cart = logfire.metric_up_down_counter(
    'shopping_cart_items',
    unit='items',
    description='Number of items in shopping carts'
)

# Track connection lifecycle
def on_connection_open():
    active_connections.add(1, {'database': 'users', 'pool': 'primary'})

def on_connection_close():
    active_connections.add(-1, {'database': 'users', 'pool': 'primary'})

# Track shopping cart changes
def add_item_to_cart(user_id, item_id):
    items_in_cart.add(1, {'user_category': 'premium'})

def remove_item_from_cart(user_id, item_id):
    items_in_cart.add(-1, {'user_category': 'premium'})

Callback-Based Metrics

Callback metrics allow you to define functions that are called periodically to collect metric values, useful for metrics that are expensive to calculate or need to be collected on a schedule.

def metric_counter_callback(name: str, *,
                           callbacks: Sequence[Callable[[], Iterable[Measurement]]],
                           unit: str = '',
                           description: str = '') -> None:
    """
    Create a counter metric with callback-based collection.
    
    Parameters:
    - name: Unique metric name
    - callbacks: Functions that return measurement values when called
    - unit: Unit of measurement
    - description: Human-readable description
    """

def metric_gauge_callback(name: str, 
                         callbacks: Sequence[Callable[[], Iterable[Measurement]]], *,
                         unit: str = '',
                         description: str = '') -> None:
    """
    Create a gauge metric with callback-based collection.
    
    Parameters:
    - name: Unique metric name  
    - callbacks: Functions that return measurement values when called
    - unit: Unit of measurement
    - description: Human-readable description
    """

def metric_up_down_counter_callback(name: str,
                                   callbacks: Sequence[Callable[[], Iterable[Measurement]]], *,
                                   unit: str = '',
                                   description: str = '') -> None:
    """
    Create an up-down counter metric with callback-based collection.
    
    Parameters:
    - name: Unique metric name
    - callbacks: Functions that return measurement values when called
    - unit: Unit of measurement
    - description: Human-readable description
    """

Usage Examples:

import logfire
import psutil
from typing import Iterable
from opentelemetry.metrics import Measurement

logfire.configure()

# Callback functions for system metrics
def get_memory_measurements() -> Iterable[Measurement]:
    memory = psutil.virtual_memory()
    return [
        Measurement(value=memory.total, attributes={'type': 'total'}),
        Measurement(value=memory.available, attributes={'type': 'available'}),
        Measurement(value=memory.used, attributes={'type': 'used'})
    ]

def get_cpu_measurements() -> Iterable[Measurement]:
    cpu_percentages = psutil.cpu_percent(percpu=True)
    measurements = []
    for i, percent in enumerate(cpu_percentages):
        measurements.append(
            Measurement(value=percent, attributes={'core': str(i)})
        )
    return measurements

def get_connection_count() -> Iterable[Measurement]:
    # Example: count active connections from connection pool
    active_count = connection_pool.active_connections()
    return [Measurement(value=active_count, attributes={'pool': 'primary'})]

# Register callback metrics
logfire.metric_gauge_callback(
    'system_memory_bytes',
    callbacks=[get_memory_measurements],
    unit='bytes',
    description='System memory statistics'
)

logfire.metric_gauge_callback(
    'cpu_usage_percent',
    callbacks=[get_cpu_measurements],
    unit='percent',
    description='Per-core CPU usage percentage'
)

logfire.metric_up_down_counter_callback(
    'database_connections_active',
    callbacks=[get_connection_count],
    unit='connections',
    description='Active database connections'
)

Metrics Configuration

Configure how metrics are collected, processed, and exported.

class MetricsOptions:
    """Configuration options for metrics collection."""
    
    additional_readers: Sequence[MetricReader] = ()
    """Additional metric readers for custom export destinations."""
    
    collect_in_spans: bool = True
    """Whether to collect metrics data within span context."""

Usage Example:

import logfire
from opentelemetry.exporter.prometheus import PrometheusMetricReader

# Configure metrics with Prometheus export
prometheus_reader = PrometheusMetricReader()

logfire.configure(
    metrics=logfire.MetricsOptions(
        additional_readers=[prometheus_reader],
        collect_in_spans=True
    )
)

Metrics Integration with Spans

Metrics can be associated with spans to provide rich context about when and where measurements were taken.

Usage Examples:

import logfire

logfire.configure()

request_duration = logfire.metric_histogram(
    'request_duration_ms',
    unit='ms',
    description='Request processing duration'
)

def handle_request():
    with logfire.span('Handle HTTP request', endpoint='/api/users') as span:
        start_time = time.time()
        
        # Process request
        result = process_request()
        
        # Record metric within span context
        duration_ms = (time.time() - start_time) * 1000
        request_duration.record(duration_ms, {
            'endpoint': '/api/users',
            'status': '200'
        })
        
        span.set_attribute('duration_ms', duration_ms)
        return result

Type Definitions

# OpenTelemetry metric types
from opentelemetry.metrics import Counter, Histogram, Gauge, UpDownCounter
from opentelemetry.metrics import Measurement, MetricReader
from typing import Callable, Sequence, Iterable

# Measurement creation for callbacks
class Measurement:
    """A single measurement value with optional attributes."""
    def __init__(self, value: int | float, attributes: dict[str, str] | None = None): ...

# Callback function signature
MetricCallback = Callable[[], Iterable[Measurement]]

Best Practices

Naming Conventions:

  • Use descriptive names with units: http_request_duration_seconds
  • Include totals for counters: requests_total, errors_total
  • Use consistent naming across related metrics

Attribute Management:

  • Keep attribute cardinality low (< 1000 unique combinations)
  • Use consistent attribute names across metrics
  • Avoid high-cardinality attributes like user IDs or request IDs

Performance Considerations:

  • Use callback metrics for expensive calculations
  • Batch metric updates when possible
  • Consider sampling for high-frequency metrics

Usage Example with Best Practices:

import logfire

logfire.configure()

# Well-named metrics with appropriate units
http_requests_total = logfire.metric_counter(
    'http_requests_total',
    unit='requests',
    description='Total number of HTTP requests received'
)

http_request_duration_seconds = logfire.metric_histogram(
    'http_request_duration_seconds', 
    unit='seconds',
    description='HTTP request duration in seconds'
)

# Consistent, low-cardinality attributes
def record_request_metrics(method, endpoint, status_code, duration):
    # Good: low cardinality attributes
    attributes = {
        'method': method,
        'endpoint': normalize_endpoint(endpoint),  # /users/{id} not /users/123
        'status_code': str(status_code)
    }
    
    http_requests_total.add(1, attributes)
    http_request_duration_seconds.record(duration, attributes)

Install with Tessl CLI

npx tessl i tessl/pypi-logfire

docs

core-logging.md

index.md

instrumentation.md

integrations.md

metrics.md

spans-tracing.md

tile.json