Python observability platform with structured logging, distributed tracing, metrics collection, and automatic instrumentation for popular frameworks and AI services.
—
Integration components for connecting Logfire with standard logging frameworks, structured logging libraries, and external systems. These handlers and processors enable seamless integration with existing logging infrastructure and third-party tools.
Integration with Python's built-in logging module to capture existing log records and forward them to Logfire with proper formatting and context.
class LogfireLoggingHandler:
"""
Handler for integrating Python standard library logging with Logfire.
Captures log records and forwards them to Logfire with appropriate formatting.
"""
def __init__(self, logfire: Logfire | None = None) -> None:
"""
Initialize the logging handler.
Parameters:
- logfire: Specific Logfire instance to use (None for default instance)
"""
def emit(self, record: logging.LogRecord) -> None:
"""
Process a log record and send it to Logfire.
Parameters:
- record: Standard library LogRecord to process
"""Usage Examples:
import logging
import logfire
# Configure Logfire
logfire.configure()
# Set up standard library logging with Logfire handler
logging.basicConfig(
level=logging.INFO,
handlers=[
logfire.LogfireLoggingHandler(),
logging.StreamHandler() # Also log to console
]
)
# Use standard logging - automatically captured by Logfire
logger = logging.getLogger(__name__)
logger.info('Application started')
logger.error('Database connection failed', extra={'db_host': 'localhost'})
# Existing logging code works unchanged
def process_user_data(user_id):
logger.debug('Processing user data', extra={'user_id': user_id})
try:
# Processing logic
result = perform_processing()
logger.info('User data processed successfully', extra={'user_id': user_id})
return result
except Exception as e:
logger.exception('Failed to process user data', extra={'user_id': user_id})
raiseIntegration with structlog for structured logging with consistent key-value formatting and rich context preservation.
class StructlogProcessor:
"""
Processor for integrating structlog with Logfire.
Processes structlog events and forwards them to Logfire with structured data.
Alias: LogfireProcessor (same class, different name for backwards compatibility)
"""
def __init__(self, logfire: Logfire | None = None) -> None:
"""
Initialize the structlog processor.
Parameters:
- logfire: Specific Logfire instance to use (None for default instance)
"""
def __call__(self, logger, method_name: str, event_dict: dict) -> dict:
"""
Process a structlog event dictionary.
Parameters:
- logger: Structlog logger instance
- method_name: Log level method name
- event_dict: Event dictionary with structured data
Returns: Processed event dictionary
"""
# Alias for backwards compatibility
LogfireProcessor = StructlogProcessorUsage Examples:
import structlog
import logfire
# Configure Logfire
logfire.configure()
# Configure structlog with Logfire processor
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
logfire.StructlogProcessor(), # Send to Logfire
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
# Use structlog - automatically captured by Logfire
logger = structlog.get_logger()
logger.info("User login", user_id=123, ip_address="192.168.1.1")
logger.error("Payment failed", user_id=123, amount=99.99, error_code="CARD_DECLINED")
# Structured logging with context
logger = logger.bind(request_id="req-456", user_id=123)
logger.info("Starting request processing")
logger.info("Database query completed", query_time_ms=45)
logger.info("Request processing completed", total_time_ms=150)Integration with Loguru for enhanced logging capabilities with automatic formatting and context preservation.
def loguru_handler() -> dict[str, Any]:
"""
Create a Logfire handler configuration for Loguru integration.
Returns: Dictionary containing handler configuration for Loguru.add()
"""Usage Examples:
import loguru
import logfire
# Configure Logfire
logfire.configure()
# Configure Loguru with Logfire integration
from loguru import logger
# Add Logfire as a handler
handler_config = logfire.loguru_handler()
logger.add(**handler_config)
# Use Loguru - automatically captured by Logfire
logger.info("Application started")
logger.info("Processing user {user_id}", user_id=123)
logger.error("Failed to connect to database: {error}", error="connection timeout")
# Loguru's rich formatting works with Logfire
logger.info("User data: {data}", data={"name": "John", "age": 30})
# Exception logging with tracebacks
try:
result = 1 / 0
except Exception as e:
logger.exception("Division error occurred")Utilities for managing OpenTelemetry context and baggage for cross-service correlation and metadata propagation.
def get_baggage() -> dict[str, str]:
"""
Get current OpenTelemetry baggage values.
Returns: Dictionary of baggage key-value pairs currently in context
"""
def set_baggage(baggage: dict[str, str]) -> Token:
"""
Set OpenTelemetry baggage for context propagation across service boundaries.
Parameters:
- baggage: Dictionary of key-value pairs to add to baggage
Returns: Context token for restoration if needed
"""
def add_non_user_code_prefix(prefix: str) -> None:
"""
Add a prefix to identify non-user code in stack traces.
Helps filter out framework/library code from error reporting.
Parameters:
- prefix: Path prefix to mark as non-user code (e.g., '/usr/local/lib/python3.9/site-packages/')
"""Usage Examples:
import logfire
# Set baggage for request correlation
request_baggage = {
'user_id': '12345',
'session_id': 'sess_abcd1234',
'request_id': 'req_xyz789',
'feature_flags': 'new_ui_enabled,beta_features'
)
# Set baggage - automatically propagated in HTTP headers
token = logfire.set_baggage(request_baggage)
# Make service calls - baggage is automatically included
with logfire.span('External service call'):
response = requests.post('https://service-b.example.com/process')
# In the receiving service, get baggage
current_baggage = logfire.get_baggage()
user_id = current_baggage.get('user_id')
session_id = current_baggage.get('session_id')
# Use baggage for correlation
with logfire.span('Process user request', user_id=user_id, session_id=session_id):
# Processing logic
pass
# Configure non-user code filtering
logfire.add_non_user_code_prefix('/usr/local/lib/python3.9/site-packages/')
logfire.add_non_user_code_prefix('/opt/conda/lib/python3.9/site-packages/')Utilities for controlling when and how instrumentation operates, allowing fine-grained control over observability data collection.
def suppress_instrumentation() -> AbstractContextManager[None]:
"""
Context manager to temporarily suppress all automatic instrumentation.
Useful for avoiding recursive instrumentation or excluding specific operations.
Returns: Context manager that disables instrumentation within its scope
"""Usage Examples:
import logfire
import requests
# Configure instrumentation
logfire.configure()
logfire.instrument_requests()
# Normal requests are instrumented
response = requests.get('https://api.example.com/data') # Creates span
# Suppress instrumentation for specific operations
with logfire.suppress_instrumentation():
# This request won't create spans or metrics
health_check = requests.get('https://api.example.com/health')
# Internal operations you don't want to trace
internal_response = requests.post('https://internal-metrics.company.com/report')
# Instrumentation resumes after context
response2 = requests.get('https://api.example.com/users') # Creates span
# Useful for avoiding noise in monitoring systems
def send_telemetry_data():
with logfire.suppress_instrumentation():
# Don't trace the telemetry sending itself
requests.post('https://telemetry-collector.example.com/', json=metrics_data)Command-line utilities and development helpers for managing Logfire configuration and debugging observability setup.
def logfire_info() -> None:
"""
Display current Logfire configuration information.
Useful for debugging and verifying setup in development.
"""Usage Examples:
import logfire
# Configure Logfire
logfire.configure(
service_name='my-web-app',
environment='development',
send_to_logfire=True
)
# Display current configuration (useful for debugging)
logfire.logfire_info()
# Output shows:
# - Service name and version
# - Environment settings
# - Export destinations
# - Console configuration
# - Instrumentation statusSupport for extending Logfire with custom processing and export logic through OpenTelemetry processors.
# Configuration support for custom processors
class AdvancedOptions:
"""Advanced configuration options for custom integrations."""
additional_span_processors: Sequence[SpanProcessor] = ()
"""Additional OpenTelemetry span processors for custom processing."""
log_record_processors: Sequence[LogRecordProcessor] = ()
"""Additional OpenTelemetry log record processors."""Usage Examples:
import logfire
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
# Custom span processor for additional export destinations
jaeger_exporter = JaegerExporter(
agent_host_name="jaeger-collector.example.com",
agent_port=6831,
)
jaeger_processor = BatchSpanProcessor(jaeger_exporter)
# Configure Logfire with custom processors
logfire.configure(
service_name='my-service',
advanced=logfire.AdvancedOptions(
additional_span_processors=[jaeger_processor]
)
)
# Now spans are sent to both Logfire and Jaeger
with logfire.span('Custom processing'):
# This span goes to both destinations
logfire.info('Processing data')Helper functions and utilities for integrating with external monitoring and alerting systems.
Usage Examples:
import logfire
# Integration with external monitoring systems
def setup_monitoring_integrations():
# Configure Logfire
logfire.configure(
service_name='production-api',
environment='production'
)
# Set up correlation with external systems
logfire.set_baggage({
'deployment_id': 'deploy-abc123',
'datacenter': 'us-west-2',
'cluster': 'production-cluster-1'
})
# Use tags for filtering and alerting
production_logger = logfire.with_tags('production', 'api', 'critical')
return production_logger
# Use throughout application
logger = setup_monitoring_integrations()
def handle_critical_operation():
with logger.span('Critical operation', operation_type='payment_processing') as span:
try:
# Critical business logic
result = process_payment()
span.set_attribute('success', True)
return result
except Exception as e:
# This will be tagged for alerting
span.record_exception(e)
logger.error('Critical operation failed',
operation_type='payment_processing',
error_type=type(e).__name__)
raise# Standard library imports for integrations
import logging
from contextvars import Token
from typing import AbstractContextManager, Any, Callable, Sequence
# OpenTelemetry processor types
from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.sdk._logs import LogRecordProcessor
# Logfire types
from logfire import LogfireLogging Integration:
Performance Considerations:
Security:
Example Production Setup:
import logging
import structlog
import logfire
def setup_production_logging():
# Configure Logfire
logfire.configure(
service_name='production-api',
environment='production',
send_to_logfire=True,
console=False, # Disable console in production
scrubbing=logfire.ScrubbingOptions(
extra_patterns=[r'password', r'api_key', r'token']
)
)
# Configure standard library logging
logging.basicConfig(
level=logging.INFO,
handlers=[logfire.LogfireLoggingHandler()],
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Configure structlog
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
logfire.StructlogProcessor(),
structlog.processors.JSONRenderer()
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
# Set up correlation context
logfire.set_baggage({
'service': 'production-api',
'version': '1.2.3',
'environment': 'production'
})
return logfire.with_tags('production', 'api')
# Use in application
production_logger = setup_production_logging()Install with Tessl CLI
npx tessl i tessl/pypi-logfire