CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-logfire

Python observability platform with structured logging, distributed tracing, metrics collection, and automatic instrumentation for popular frameworks and AI services.

Pending
Overview
Eval results
Files

integrations.mddocs/

Integrations and Handlers

Integration components for connecting Logfire with standard logging frameworks, structured logging libraries, and external systems. These handlers and processors enable seamless integration with existing logging infrastructure and third-party tools.

Capabilities

Standard Library Logging Integration

Integration with Python's built-in logging module to capture existing log records and forward them to Logfire with proper formatting and context.

class LogfireLoggingHandler:
    """
    Handler for integrating Python standard library logging with Logfire.
    Captures log records and forwards them to Logfire with appropriate formatting.
    """
    
    def __init__(self, logfire: Logfire | None = None) -> None:
        """
        Initialize the logging handler.
        
        Parameters:
        - logfire: Specific Logfire instance to use (None for default instance)
        """
    
    def emit(self, record: logging.LogRecord) -> None:
        """
        Process a log record and send it to Logfire.
        
        Parameters:
        - record: Standard library LogRecord to process
        """

Usage Examples:

import logging
import logfire

# Configure Logfire
logfire.configure()

# Set up standard library logging with Logfire handler
logging.basicConfig(
    level=logging.INFO,
    handlers=[
        logfire.LogfireLoggingHandler(),
        logging.StreamHandler()  # Also log to console
    ]
)

# Use standard logging - automatically captured by Logfire
logger = logging.getLogger(__name__)
logger.info('Application started')
logger.error('Database connection failed', extra={'db_host': 'localhost'})

# Existing logging code works unchanged
def process_user_data(user_id):
    logger.debug('Processing user data', extra={'user_id': user_id})
    try:
        # Processing logic
        result = perform_processing()
        logger.info('User data processed successfully', extra={'user_id': user_id})
        return result
    except Exception as e:
        logger.exception('Failed to process user data', extra={'user_id': user_id})
        raise

Structlog Integration

Integration with structlog for structured logging with consistent key-value formatting and rich context preservation.

class StructlogProcessor:
    """
    Processor for integrating structlog with Logfire.
    Processes structlog events and forwards them to Logfire with structured data.
    
    Alias: LogfireProcessor (same class, different name for backwards compatibility)
    """
    
    def __init__(self, logfire: Logfire | None = None) -> None:
        """
        Initialize the structlog processor.
        
        Parameters:
        - logfire: Specific Logfire instance to use (None for default instance)
        """
    
    def __call__(self, logger, method_name: str, event_dict: dict) -> dict:
        """
        Process a structlog event dictionary.
        
        Parameters:
        - logger: Structlog logger instance
        - method_name: Log level method name
        - event_dict: Event dictionary with structured data
        
        Returns: Processed event dictionary
        """

# Alias for backwards compatibility
LogfireProcessor = StructlogProcessor

Usage Examples:

import structlog
import logfire

# Configure Logfire
logfire.configure()

# Configure structlog with Logfire processor
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        logfire.StructlogProcessor(),  # Send to Logfire
        structlog.processors.JSONRenderer()
    ],
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
    wrapper_class=structlog.stdlib.BoundLogger,
    cache_logger_on_first_use=True,
)

# Use structlog - automatically captured by Logfire
logger = structlog.get_logger()

logger.info("User login", user_id=123, ip_address="192.168.1.1")
logger.error("Payment failed", user_id=123, amount=99.99, error_code="CARD_DECLINED")

# Structured logging with context
logger = logger.bind(request_id="req-456", user_id=123)
logger.info("Starting request processing")
logger.info("Database query completed", query_time_ms=45)
logger.info("Request processing completed", total_time_ms=150)

Loguru Integration

Integration with Loguru for enhanced logging capabilities with automatic formatting and context preservation.

def loguru_handler() -> dict[str, Any]:
    """
    Create a Logfire handler configuration for Loguru integration.
    
    Returns: Dictionary containing handler configuration for Loguru.add()
    """

Usage Examples:

import loguru
import logfire

# Configure Logfire
logfire.configure()

# Configure Loguru with Logfire integration
from loguru import logger

# Add Logfire as a handler
handler_config = logfire.loguru_handler()
logger.add(**handler_config)

# Use Loguru - automatically captured by Logfire
logger.info("Application started")
logger.info("Processing user {user_id}", user_id=123)
logger.error("Failed to connect to database: {error}", error="connection timeout")

# Loguru's rich formatting works with Logfire
logger.info("User data: {data}", data={"name": "John", "age": 30})

# Exception logging with tracebacks
try:
    result = 1 / 0
except Exception as e:
    logger.exception("Division error occurred")

Context and Baggage Management

Utilities for managing OpenTelemetry context and baggage for cross-service correlation and metadata propagation.

def get_baggage() -> dict[str, str]:
    """
    Get current OpenTelemetry baggage values.
    
    Returns: Dictionary of baggage key-value pairs currently in context
    """

def set_baggage(baggage: dict[str, str]) -> Token:
    """
    Set OpenTelemetry baggage for context propagation across service boundaries.
    
    Parameters:
    - baggage: Dictionary of key-value pairs to add to baggage
    
    Returns: Context token for restoration if needed
    """

def add_non_user_code_prefix(prefix: str) -> None:
    """
    Add a prefix to identify non-user code in stack traces.
    Helps filter out framework/library code from error reporting.
    
    Parameters:
    - prefix: Path prefix to mark as non-user code (e.g., '/usr/local/lib/python3.9/site-packages/')
    """

Usage Examples:

import logfire

# Set baggage for request correlation
request_baggage = {
    'user_id': '12345',
    'session_id': 'sess_abcd1234',
    'request_id': 'req_xyz789',
    'feature_flags': 'new_ui_enabled,beta_features'
)

# Set baggage - automatically propagated in HTTP headers
token = logfire.set_baggage(request_baggage)

# Make service calls - baggage is automatically included
with logfire.span('External service call'):
    response = requests.post('https://service-b.example.com/process')

# In the receiving service, get baggage
current_baggage = logfire.get_baggage()
user_id = current_baggage.get('user_id')
session_id = current_baggage.get('session_id')

# Use baggage for correlation
with logfire.span('Process user request', user_id=user_id, session_id=session_id):
    # Processing logic
    pass

# Configure non-user code filtering
logfire.add_non_user_code_prefix('/usr/local/lib/python3.9/site-packages/')
logfire.add_non_user_code_prefix('/opt/conda/lib/python3.9/site-packages/')

Instrumentation Control

Utilities for controlling when and how instrumentation operates, allowing fine-grained control over observability data collection.

def suppress_instrumentation() -> AbstractContextManager[None]:
    """
    Context manager to temporarily suppress all automatic instrumentation.
    Useful for avoiding recursive instrumentation or excluding specific operations.
    
    Returns: Context manager that disables instrumentation within its scope
    """

Usage Examples:

import logfire
import requests

# Configure instrumentation
logfire.configure()
logfire.instrument_requests()

# Normal requests are instrumented
response = requests.get('https://api.example.com/data')  # Creates span

# Suppress instrumentation for specific operations
with logfire.suppress_instrumentation():
    # This request won't create spans or metrics
    health_check = requests.get('https://api.example.com/health')
    
    # Internal operations you don't want to trace
    internal_response = requests.post('https://internal-metrics.company.com/report')

# Instrumentation resumes after context
response2 = requests.get('https://api.example.com/users')  # Creates span

# Useful for avoiding noise in monitoring systems
def send_telemetry_data():
    with logfire.suppress_instrumentation():
        # Don't trace the telemetry sending itself
        requests.post('https://telemetry-collector.example.com/', json=metrics_data)

CLI and Development Tools

Command-line utilities and development helpers for managing Logfire configuration and debugging observability setup.

def logfire_info() -> None:
    """
    Display current Logfire configuration information.
    Useful for debugging and verifying setup in development.
    """

Usage Examples:

import logfire

# Configure Logfire
logfire.configure(
    service_name='my-web-app',
    environment='development',
    send_to_logfire=True
)

# Display current configuration (useful for debugging)
logfire.logfire_info()
# Output shows:
# - Service name and version
# - Environment settings  
# - Export destinations
# - Console configuration
# - Instrumentation status

Custom Processors and Exporters

Support for extending Logfire with custom processing and export logic through OpenTelemetry processors.

# Configuration support for custom processors
class AdvancedOptions:
    """Advanced configuration options for custom integrations."""
    
    additional_span_processors: Sequence[SpanProcessor] = ()
    """Additional OpenTelemetry span processors for custom processing."""
    
    log_record_processors: Sequence[LogRecordProcessor] = ()
    """Additional OpenTelemetry log record processors."""

Usage Examples:

import logfire
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter

# Custom span processor for additional export destinations
jaeger_exporter = JaegerExporter(
    agent_host_name="jaeger-collector.example.com",
    agent_port=6831,
)
jaeger_processor = BatchSpanProcessor(jaeger_exporter)

# Configure Logfire with custom processors
logfire.configure(
    service_name='my-service',
    advanced=logfire.AdvancedOptions(
        additional_span_processors=[jaeger_processor]
    )
)

# Now spans are sent to both Logfire and Jaeger
with logfire.span('Custom processing'):
    # This span goes to both destinations
    logfire.info('Processing data')

Third-Party Integrations

Helper functions and utilities for integrating with external monitoring and alerting systems.

Usage Examples:

import logfire

# Integration with external monitoring systems
def setup_monitoring_integrations():
    # Configure Logfire
    logfire.configure(
        service_name='production-api',
        environment='production'
    )
    
    # Set up correlation with external systems
    logfire.set_baggage({
        'deployment_id': 'deploy-abc123',
        'datacenter': 'us-west-2',
        'cluster': 'production-cluster-1'
    })
    
    # Use tags for filtering and alerting
    production_logger = logfire.with_tags('production', 'api', 'critical')
    
    return production_logger

# Use throughout application
logger = setup_monitoring_integrations()

def handle_critical_operation():
    with logger.span('Critical operation', operation_type='payment_processing') as span:
        try:
            # Critical business logic
            result = process_payment()
            span.set_attribute('success', True)
            return result
        except Exception as e:
            # This will be tagged for alerting
            span.record_exception(e)
            logger.error('Critical operation failed', 
                        operation_type='payment_processing',
                        error_type=type(e).__name__)
            raise

Type Definitions

# Standard library imports for integrations
import logging
from contextvars import Token
from typing import AbstractContextManager, Any, Callable, Sequence

# OpenTelemetry processor types
from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.sdk._logs import LogRecordProcessor

# Logfire types
from logfire import Logfire

Best Practices for Integrations

Logging Integration:

  • Use consistent log levels across different logging systems
  • Preserve structured data when converting between formats
  • Maintain correlation IDs and request context

Performance Considerations:

  • Use async handlers for high-volume logging
  • Configure appropriate batching and export intervals
  • Monitor handler performance to avoid logging bottlenecks

Security:

  • Be careful with baggage content (avoid sensitive data)
  • Use scrubbing options to redact sensitive information
  • Consider the security implications of cross-service context propagation

Example Production Setup:

import logging
import structlog
import logfire

def setup_production_logging():
    # Configure Logfire
    logfire.configure(
        service_name='production-api',
        environment='production',
        send_to_logfire=True,
        console=False,  # Disable console in production
        scrubbing=logfire.ScrubbingOptions(
            extra_patterns=[r'password', r'api_key', r'token']
        )
    )
    
    # Configure standard library logging
    logging.basicConfig(
        level=logging.INFO,
        handlers=[logfire.LogfireLoggingHandler()],
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # Configure structlog  
    structlog.configure(
        processors=[
            structlog.stdlib.filter_by_level,
            structlog.stdlib.add_logger_name,
            structlog.stdlib.add_log_level,
            structlog.processors.TimeStamper(fmt="iso"),
            logfire.StructlogProcessor(),
            structlog.processors.JSONRenderer()
        ],
        logger_factory=structlog.stdlib.LoggerFactory(),
        wrapper_class=structlog.stdlib.BoundLogger,
        cache_logger_on_first_use=True,
    )
    
    # Set up correlation context
    logfire.set_baggage({
        'service': 'production-api',
        'version': '1.2.3',
        'environment': 'production'
    })
    
    return logfire.with_tags('production', 'api')

# Use in application
production_logger = setup_production_logging()

Install with Tessl CLI

npx tessl i tessl/pypi-logfire

docs

core-logging.md

index.md

instrumentation.md

integrations.md

metrics.md

spans-tracing.md

tile.json