CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-opencensus

A comprehensive observability framework providing distributed tracing, metrics collection, and statistics gathering capabilities for Python applications.

Pending
Overview
Eval results
Files

exporters.mddocs/

Exporters and Transports

Export collected traces and metrics to various backends including console output, files, and cloud services. Includes both synchronous and asynchronous transport options with configurable batching and error handling.

Capabilities

Base Exporter Interface

Foundation interface for all trace exporters, providing consistent export and emission methods for sending telemetry data to various backends.

class Exporter:
    """Base class for trace exporters."""
    
    def export(self, span_datas):
        """
        Export span data to configured destination.
        
        Parameters:
        - span_datas: list, list of SpanData objects to export
        """
    
    def emit(self, span_datas):
        """
        Emit span data immediately.
        
        Parameters:
        - span_datas: list, list of SpanData objects to emit
        """

Built-in Trace Exporters

Ready-to-use exporters for common output destinations including console, files, and Python logging system.

class PrintExporter(Exporter):
    """
    Console output exporter for debugging and development.
    
    Exports trace data to stdout in human-readable format.
    
    Parameters:
    - transport: Transport, transport mechanism (default: SyncTransport)
    """
    def __init__(self, transport=SyncTransport): ...
    
    def export(self, span_datas):
        """Export spans to console output."""
    
    def emit(self, span_datas):
        """Emit spans to console immediately."""

class FileExporter(Exporter):
    """
    File output exporter for persistent trace storage.
    
    Parameters:
    - file_name: str, output file path (default: 'opencensus-traces.json')
    - transport: Transport, transport mechanism (default: SyncTransport)
    - file_mode: str, file mode for writing (default: 'w+')
    """
    def __init__(self, file_name='opencensus-traces.json', transport=SyncTransport, file_mode='w+'): ...
    
    def export(self, span_datas):
        """Export spans to configured file."""
    
    def emit(self, span_datas):
        """Write spans to file immediately."""

class LoggingExporter(Exporter):
    """
    Python logging system exporter.
    
    Exports trace data through Python's logging system,
    allowing integration with existing log configuration.
    
    Parameters:
    - handler: logging.Handler, custom log handler (optional)
    - transport: Transport, transport mechanism (default: SyncTransport)
    """
    def __init__(self, handler=None, transport=SyncTransport): ...
    
    def export(self, span_datas):
        """Export spans through logging system."""
    
    def emit(self, span_datas):
        """Log spans immediately."""

Transport Mechanisms

Control how exported data is transmitted, with options for synchronous, asynchronous, and batched delivery.

class Transport:
    """Base transport class for data transmission."""
    
    def export(self, datas):
        """
        Export data using this transport.
        
        Parameters:
        - datas: list, data objects to export
        """
    
    def flush(self):
        """Flush any pending data."""

class SyncTransport(Transport):
    """
    Synchronous transport for immediate data export.
    
    Parameters:
    - exporter: Exporter, destination exporter
    """
    def __init__(self, exporter): ...
    
    def export(self, datas):
        """Export data synchronously through configured exporter."""

class AsyncTransport(Transport):
    """
    Asynchronous transport with background thread processing.
    
    Provides batching, buffering, and automatic retry capabilities
    for high-throughput scenarios.
    
    Parameters:
    - exporter: Exporter, destination exporter
    - grace_period: float, shutdown grace period in seconds (default: 5.0)
    - max_batch_size: int, maximum batch size (default: 600)
    - wait_period: float, batch flush interval in seconds (default: 60.0)
    """
    def __init__(self, exporter, grace_period=5.0, max_batch_size=600, wait_period=60.0): ...
    
    def export(self, data):
        """Queue data for asynchronous export."""
    
    def flush(self):
        """
        Flush all pending data and wait for completion.
        
        Blocks until all queued data has been exported or
        grace period expires.
        """

Metrics Export Infrastructure

Periodic export of metrics with background processing, aggregation, and error handling for monitoring systems.

class PeriodicMetricTask:
    """
    Periodic metric export task with background thread.
    
    Parameters:
    - interval: float, export interval in seconds
    - function: callable, export function to call
    - args: tuple, function arguments
    - kwargs: dict, function keyword arguments  
    - name: str, task name for identification
    """
    def __init__(self, interval=None, function=None, args=None, kwargs=None, name=None): ...
    
    def run(self):
        """Start periodic metric export in background thread."""
    
    def close(self):
        """
        Stop periodic export and cleanup resources.
        
        Stops background thread and waits for completion.
        """

def get_exporter_thread(metric_producers, exporter, interval=None):
    """
    Create and start metric export task.
    
    Parameters:
    - metric_producers: list, MetricProducer instances to export from
    - exporter: Exporter, destination for exported metrics
    - interval: float, export interval in seconds (default: 60)
    
    Returns:
    PeriodicMetricTask: Running export task
    """

class TransportError(Exception):
    """Exception raised for transport-related errors."""
    pass

# Constants
DEFAULT_INTERVAL = 60
"""int: Default export interval in seconds"""

GRACE_PERIOD = 5  
"""int: Default grace period for shutdown in seconds"""

Usage Examples

Console Export for Development

from opencensus.trace.tracer import Tracer
from opencensus.trace import print_exporter
from opencensus.trace.samplers import AlwaysOnSampler

# Create tracer with console export
tracer = Tracer(
    sampler=AlwaysOnSampler(),
    exporter=print_exporter.PrintExporter()
)

# Traced operations will print to console
with tracer.span('development_operation') as span:
    span.add_attribute('debug_info', 'testing console export')
    span.add_annotation('Starting development test')
    
    # Your application logic
    result = process_data()
    
    span.add_attribute('result_size', len(result))
    span.add_annotation('Development test completed')

# Console output will show span details:
# Span: development_operation
# Start: 2024-01-15T10:30:15.123456Z
# End: 2024-01-15T10:30:15.234567Z
# Attributes: {'debug_info': 'testing console export', 'result_size': 42}
# Annotations: [{'description': 'Starting development test', ...}, ...]

File Export for Persistent Storage

from opencensus.trace.tracer import Tracer
from opencensus.trace import file_exporter
from opencensus.trace.samplers import ProbabilitySampler
from opencensus.common.transports import AsyncTransport

# Create file exporter with async transport
exporter = file_exporter.FileExporter(
    file_name='./traces.json',
    transport=AsyncTransport(
        exporter=file_exporter.FileExporter('./traces.json'),
        max_batch_size=100,
        wait_period=30.0
    )
)

tracer = Tracer(
    sampler=ProbabilitySampler(rate=0.1),
    exporter=exporter
)

# High-throughput scenario
for i in range(1000):
    with tracer.span(f'batch_operation_{i}') as span:
        span.add_attribute('batch_id', i)
        span.add_attribute('operation_type', 'batch_process')
        
        # Process item
        process_batch_item(i)
        
        if i % 100 == 0:
            span.add_annotation(f'Completed batch {i//100}')

# Flush remaining data before shutdown
exporter.transport.flush()

Logging System Integration

import logging
from opencensus.trace.tracer import Tracer
from opencensus.trace import logging_exporter
from opencensus.common.transports import SyncTransport

# Configure logging system
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('application.log'),
        logging.StreamHandler()
    ]
)

# Create logging exporter
exporter = logging_exporter.LoggingExporter(
    logger_name='opencensus.traces',
    transport=SyncTransport(logging_exporter.LoggingExporter())
)

tracer = Tracer(exporter=exporter)

# Traces will appear in logs
with tracer.span('user_authentication') as span:
    span.add_attribute('user_id', 'user123')
    span.add_attribute('auth_method', 'oauth2')
    
    try:
        authenticate_user('user123')
        span.add_annotation('Authentication successful')
        
    except AuthenticationError as e:
        span.set_status(Status.from_exception(e))
        span.add_annotation('Authentication failed')
        raise

# Log entries will include trace information:
# 2024-01-15 10:30:15 - opencensus.traces - INFO - Span: user_authentication ...

Custom Exporter Implementation

from opencensus.trace.base_exporter import Exporter
import json
import requests

class HTTPExporter(Exporter):
    """Custom exporter that sends traces to HTTP endpoint."""
    
    def __init__(self, endpoint_url, api_key=None, timeout=10):
        self.endpoint_url = endpoint_url
        self.api_key = api_key
        self.timeout = timeout
        self.session = requests.Session()
        
        if api_key:
            self.session.headers.update({'Authorization': f'Bearer {api_key}'})
    
    def export(self, span_datas):
        """Export spans to HTTP endpoint."""
        try:
            # Convert spans to JSON format
            trace_data = []
            for span_data in span_datas:
                trace_data.append({
                    'traceId': span_data.context.trace_id,
                    'spanId': span_data.span_id,
                    'name': span_data.name,
                    'startTime': span_data.start_time.isoformat(),
                    'endTime': span_data.end_time.isoformat() if span_data.end_time else None,
                    'attributes': dict(span_data.attributes or {}),
                    'status': {
                        'code': span_data.status.canonical_code if span_data.status else 0,
                        'message': span_data.status.message if span_data.status else None
                    }
                })
            
            # Send to endpoint
            response = self.session.post(
                self.endpoint_url,
                json={'traces': trace_data},
                timeout=self.timeout
            )
            response.raise_for_status()
            
        except Exception as e:
            print(f"Export failed: {e}")
    
    def emit(self, span_datas):
        """Emit spans immediately."""
        self.export(span_datas)

# Usage
custom_exporter = HTTPExporter(
    endpoint_url='https://monitoring.example.com/traces',
    api_key='your-api-key'
)

tracer = Tracer(exporter=custom_exporter)

with tracer.span('custom_export_test') as span:
    span.add_attribute('custom_field', 'test_value')
    # Span will be sent to HTTP endpoint

Periodic Metrics Export

from opencensus.metrics.transport import get_exporter_thread, PeriodicMetricTask
from opencensus.metrics.export import MetricProducer, Metric, MetricDescriptor
from opencensus.stats import Stats
import time

class CustomMetricProducer(MetricProducer):
    """Custom metric producer for application metrics."""
    
    def __init__(self):
        self.stats = Stats()
    
    def get_metrics(self):
        """Get current metrics for export."""
        # Collect current view data
        view_datas = self.stats.view_manager.get_all_exported_data()
        
        metrics = []
        for view_data in view_datas:
            # Convert view data to metrics
            metric = self._convert_view_to_metric(view_data)
            if metric:
                metrics.append(metric)
        
        return metrics
    
    def _convert_view_to_metric(self, view_data):
        """Convert view data to metric format."""
        # Implementation depends on your specific metric format
        pass

class ConsoleMetricExporter:
    """Simple console exporter for metrics."""
    
    def export(self, metrics):
        """Export metrics to console."""
        print(f"=== Metrics Export ({len(metrics)} metrics) ===")
        for metric in metrics:
            print(f"Metric: {metric.descriptor.name}")
            print(f"  Description: {metric.descriptor.description}")
            print(f"  Time Series: {len(metric.time_series)}")
        print("=" * 50)

# Set up periodic export
producer = CustomMetricProducer()
exporter = ConsoleMetricExporter()

# Start periodic export every 30 seconds
export_task = get_exporter_thread(
    metric_producers=[producer],
    exporter=exporter,
    interval=30
)

# Let it run for a while
try:
    time.sleep(120)  # Run for 2 minutes
finally:
    # Clean shutdown
    export_task.close()

Advanced Transport Configuration

from opencensus.trace.tracer import Tracer
from opencensus.trace import file_exporter
from opencensus.common.transports import AsyncTransport
import signal
import sys

class GracefulFileExporter:
    """File exporter with graceful shutdown handling."""
    
    def __init__(self, file_path, batch_size=500, flush_interval=60):
        self.exporter = file_exporter.FileExporter(file_path)
        self.transport = AsyncTransport(
            exporter=self.exporter,
            max_batch_size=batch_size,
            wait_period=flush_interval,
            grace_period=10.0
        )
        
        # Set up signal handlers for graceful shutdown
        signal.signal(signal.SIGINT, self._signal_handler)
        signal.signal(signal.SIGTERM, self._signal_handler)
    
    def _signal_handler(self, signum, frame):
        """Handle shutdown signals gracefully."""
        print(f"Received signal {signum}, flushing traces...")
        self.transport.flush()
        sys.exit(0)
    
    def export(self, span_datas):
        """Export spans through async transport."""
        for span_data in span_datas:
            self.transport.export(span_data)

# Usage in production application
exporter = GracefulFileExporter('./production_traces.jsonl')
tracer = Tracer(exporter=exporter)

# Application will gracefully flush traces on shutdown
try:
    while True:  # Main application loop
        with tracer.span('main_operation') as span:
            # Your application logic
            process_requests()
            
except KeyboardInterrupt:
    print("Shutting down gracefully...")
    # Signal handler will flush remaining traces

Install with Tessl CLI

npx tessl i tessl/pypi-opencensus

docs

common.md

exporters.md

index.md

logging.md

metrics-stats.md

tags-context.md

tracing.md

tile.json