Python observability platform with structured logging, distributed tracing, metrics collection, and automatic instrumentation for popular frameworks and AI services.
—
Metrics creation and management for quantitative monitoring including counters, histograms, gauges, and callback-based metrics. Logfire provides comprehensive metrics capabilities built on OpenTelemetry standards for performance monitoring and system observability.
Counters track cumulative values that only increase over time, such as request counts, error counts, or bytes processed.
def metric_counter(name: str, *,
unit: str = '',
description: str = '') -> Counter:
"""
Create a counter metric for tracking cumulative increasing values.
Parameters:
- name: Unique metric name
- unit: Unit of measurement (e.g., 'requests', 'bytes', 'ms')
- description: Human-readable description of what the metric measures
Returns: Counter instance for recording values
"""
class Counter:
"""Counter metric for tracking cumulative increasing values."""
def add(self, amount: int | float, attributes: dict[str, str] | None = None) -> None:
"""
Add a value to the counter.
Parameters:
- amount: Positive number to add to the counter
- attributes: Optional attributes for labeling/filtering
"""Usage Examples:
import logfire
logfire.configure()
# Create counters for different metrics
request_counter = logfire.metric_counter(
'http_requests_total',
unit='requests',
description='Total number of HTTP requests'
)
error_counter = logfire.metric_counter(
'http_errors_total',
unit='errors',
description='Total number of HTTP errors'
)
bytes_processed = logfire.metric_counter(
'bytes_processed_total',
unit='bytes',
description='Total bytes processed by the application'
)
# Record counter values
request_counter.add(1, {'method': 'GET', 'endpoint': '/users'})
request_counter.add(1, {'method': 'POST', 'endpoint': '/users'})
# Record errors with context
error_counter.add(1, {'status_code': '404', 'endpoint': '/users/999'})
# Record data processing
bytes_processed.add(1024, {'operation': 'upload', 'file_type': 'json'})Histograms track distributions of values, automatically creating buckets for analyzing percentiles, averages, and value distributions.
def metric_histogram(name: str, *,
unit: str = '',
description: str = '') -> Histogram:
"""
Create a histogram metric for tracking value distributions.
Parameters:
- name: Unique metric name
- unit: Unit of measurement (e.g., 'ms', 'seconds', 'bytes')
- description: Human-readable description of what the metric measures
Returns: Histogram instance for recording values
"""
class Histogram:
"""Histogram metric for tracking value distributions."""
def record(self, amount: int | float, attributes: dict[str, str] | None = None) -> None:
"""
Record a value in the histogram.
Parameters:
- amount: Value to record
- attributes: Optional attributes for labeling/filtering
"""Usage Examples:
import logfire
import time
logfire.configure()
# Create histograms for performance monitoring
response_time = logfire.metric_histogram(
'http_request_duration_ms',
unit='ms',
description='HTTP request duration in milliseconds'
)
payload_size = logfire.metric_histogram(
'request_payload_size_bytes',
unit='bytes',
description='Size of request payloads'
)
query_duration = logfire.metric_histogram(
'database_query_duration_ms',
unit='ms',
description='Database query execution time'
)
# Record response times
def handle_request():
start_time = time.time()
# Process request
time.sleep(0.1) # Simulated work
duration_ms = (time.time() - start_time) * 1000
response_time.record(duration_ms, {
'method': 'GET',
'endpoint': '/api/users',
'status_code': '200'
})
# Record payload sizes
payload_size.record(2048, {'content_type': 'application/json'})
# Record query performance
query_duration.record(15.5, {'table': 'users', 'operation': 'SELECT'})Gauges track current values that can go up or down, such as memory usage, queue length, or temperature readings.
def metric_gauge(name: str, *,
unit: str = '',
description: str = '') -> Gauge:
"""
Create a gauge metric for tracking current values that can increase or decrease.
Parameters:
- name: Unique metric name
- unit: Unit of measurement (e.g., 'bytes', 'percent', 'count')
- description: Human-readable description of what the metric measures
Returns: Gauge instance for recording values
"""
class Gauge:
"""Gauge metric for tracking current values."""
def set(self, amount: int | float, attributes: dict[str, str] | None = None) -> None:
"""
Set the current value of the gauge.
Parameters:
- amount: Current value to set
- attributes: Optional attributes for labeling/filtering
"""Usage Examples:
import logfire
import psutil
logfire.configure()
# Create gauges for system monitoring
memory_usage = logfire.metric_gauge(
'memory_usage_bytes',
unit='bytes',
description='Current memory usage'
)
cpu_utilization = logfire.metric_gauge(
'cpu_utilization_percent',
unit='percent',
description='Current CPU utilization percentage'
)
queue_size = logfire.metric_gauge(
'task_queue_size',
unit='tasks',
description='Current number of tasks in queue'
)
# Update gauge values
def update_system_metrics():
# Memory usage
memory_usage.set(psutil.virtual_memory().used, {'type': 'virtual'})
# CPU utilization
cpu_percent = psutil.cpu_percent()
cpu_utilization.set(cpu_percent, {'core': 'all'})
# Application-specific metrics
queue_size.set(len(task_queue), {'queue_type': 'background_jobs'})
# Call periodically to update gauges
update_system_metrics()Up-down counters track values that can increase or decrease, similar to gauges but for cumulative values like active connections or items in inventory.
def metric_up_down_counter(name: str, *,
unit: str = '',
description: str = '') -> UpDownCounter:
"""
Create an up-down counter metric for values that can increase or decrease.
Parameters:
- name: Unique metric name
- unit: Unit of measurement (e.g., 'connections', 'items')
- description: Human-readable description of what the metric measures
Returns: UpDownCounter instance for recording changes
"""
class UpDownCounter:
"""Up-down counter metric for tracking values that can increase or decrease."""
def add(self, amount: int | float, attributes: dict[str, str] | None = None) -> None:
"""
Add a value to the up-down counter (can be positive or negative).
Parameters:
- amount: Value to add (positive to increase, negative to decrease)
- attributes: Optional attributes for labeling/filtering
"""Usage Examples:
import logfire
logfire.configure()
# Create up-down counters for resource tracking
active_connections = logfire.metric_up_down_counter(
'active_connections',
unit='connections',
description='Number of active database connections'
)
items_in_cart = logfire.metric_up_down_counter(
'shopping_cart_items',
unit='items',
description='Number of items in shopping carts'
)
# Track connection lifecycle
def on_connection_open():
active_connections.add(1, {'database': 'users', 'pool': 'primary'})
def on_connection_close():
active_connections.add(-1, {'database': 'users', 'pool': 'primary'})
# Track shopping cart changes
def add_item_to_cart(user_id, item_id):
items_in_cart.add(1, {'user_category': 'premium'})
def remove_item_from_cart(user_id, item_id):
items_in_cart.add(-1, {'user_category': 'premium'})Callback metrics allow you to define functions that are called periodically to collect metric values, useful for metrics that are expensive to calculate or need to be collected on a schedule.
def metric_counter_callback(name: str, *,
callbacks: Sequence[Callable[[], Iterable[Measurement]]],
unit: str = '',
description: str = '') -> None:
"""
Create a counter metric with callback-based collection.
Parameters:
- name: Unique metric name
- callbacks: Functions that return measurement values when called
- unit: Unit of measurement
- description: Human-readable description
"""
def metric_gauge_callback(name: str,
callbacks: Sequence[Callable[[], Iterable[Measurement]]], *,
unit: str = '',
description: str = '') -> None:
"""
Create a gauge metric with callback-based collection.
Parameters:
- name: Unique metric name
- callbacks: Functions that return measurement values when called
- unit: Unit of measurement
- description: Human-readable description
"""
def metric_up_down_counter_callback(name: str,
callbacks: Sequence[Callable[[], Iterable[Measurement]]], *,
unit: str = '',
description: str = '') -> None:
"""
Create an up-down counter metric with callback-based collection.
Parameters:
- name: Unique metric name
- callbacks: Functions that return measurement values when called
- unit: Unit of measurement
- description: Human-readable description
"""Usage Examples:
import logfire
import psutil
from typing import Iterable
from opentelemetry.metrics import Measurement
logfire.configure()
# Callback functions for system metrics
def get_memory_measurements() -> Iterable[Measurement]:
memory = psutil.virtual_memory()
return [
Measurement(value=memory.total, attributes={'type': 'total'}),
Measurement(value=memory.available, attributes={'type': 'available'}),
Measurement(value=memory.used, attributes={'type': 'used'})
]
def get_cpu_measurements() -> Iterable[Measurement]:
cpu_percentages = psutil.cpu_percent(percpu=True)
measurements = []
for i, percent in enumerate(cpu_percentages):
measurements.append(
Measurement(value=percent, attributes={'core': str(i)})
)
return measurements
def get_connection_count() -> Iterable[Measurement]:
# Example: count active connections from connection pool
active_count = connection_pool.active_connections()
return [Measurement(value=active_count, attributes={'pool': 'primary'})]
# Register callback metrics
logfire.metric_gauge_callback(
'system_memory_bytes',
callbacks=[get_memory_measurements],
unit='bytes',
description='System memory statistics'
)
logfire.metric_gauge_callback(
'cpu_usage_percent',
callbacks=[get_cpu_measurements],
unit='percent',
description='Per-core CPU usage percentage'
)
logfire.metric_up_down_counter_callback(
'database_connections_active',
callbacks=[get_connection_count],
unit='connections',
description='Active database connections'
)Configure how metrics are collected, processed, and exported.
class MetricsOptions:
"""Configuration options for metrics collection."""
additional_readers: Sequence[MetricReader] = ()
"""Additional metric readers for custom export destinations."""
collect_in_spans: bool = True
"""Whether to collect metrics data within span context."""Usage Example:
import logfire
from opentelemetry.exporter.prometheus import PrometheusMetricReader
# Configure metrics with Prometheus export
prometheus_reader = PrometheusMetricReader()
logfire.configure(
metrics=logfire.MetricsOptions(
additional_readers=[prometheus_reader],
collect_in_spans=True
)
)Metrics can be associated with spans to provide rich context about when and where measurements were taken.
Usage Examples:
import logfire
logfire.configure()
request_duration = logfire.metric_histogram(
'request_duration_ms',
unit='ms',
description='Request processing duration'
)
def handle_request():
with logfire.span('Handle HTTP request', endpoint='/api/users') as span:
start_time = time.time()
# Process request
result = process_request()
# Record metric within span context
duration_ms = (time.time() - start_time) * 1000
request_duration.record(duration_ms, {
'endpoint': '/api/users',
'status': '200'
})
span.set_attribute('duration_ms', duration_ms)
return result# OpenTelemetry metric types
from opentelemetry.metrics import Counter, Histogram, Gauge, UpDownCounter
from opentelemetry.metrics import Measurement, MetricReader
from typing import Callable, Sequence, Iterable
# Measurement creation for callbacks
class Measurement:
"""A single measurement value with optional attributes."""
def __init__(self, value: int | float, attributes: dict[str, str] | None = None): ...
# Callback function signature
MetricCallback = Callable[[], Iterable[Measurement]]Naming Conventions:
http_request_duration_secondsrequests_total, errors_totalAttribute Management:
Performance Considerations:
Usage Example with Best Practices:
import logfire
logfire.configure()
# Well-named metrics with appropriate units
http_requests_total = logfire.metric_counter(
'http_requests_total',
unit='requests',
description='Total number of HTTP requests received'
)
http_request_duration_seconds = logfire.metric_histogram(
'http_request_duration_seconds',
unit='seconds',
description='HTTP request duration in seconds'
)
# Consistent, low-cardinality attributes
def record_request_metrics(method, endpoint, status_code, duration):
# Good: low cardinality attributes
attributes = {
'method': method,
'endpoint': normalize_endpoint(endpoint), # /users/{id} not /users/123
'status_code': str(status_code)
}
http_requests_total.add(1, attributes)
http_request_duration_seconds.record(duration, attributes)Install with Tessl CLI
npx tessl i tessl/pypi-logfire