A comprehensive observability framework providing distributed tracing, metrics collection, and statistics gathering capabilities for Python applications.
—
Export collected traces and metrics to various backends including console output, files, and cloud services. Includes both synchronous and asynchronous transport options with configurable batching and error handling.
Foundation interface for all trace exporters, providing consistent export and emission methods for sending telemetry data to various backends.
class Exporter:
"""Base class for trace exporters."""
def export(self, span_datas):
"""
Export span data to configured destination.
Parameters:
- span_datas: list, list of SpanData objects to export
"""
def emit(self, span_datas):
"""
Emit span data immediately.
Parameters:
- span_datas: list, list of SpanData objects to emit
"""Ready-to-use exporters for common output destinations including console, files, and Python logging system.
class PrintExporter(Exporter):
"""
Console output exporter for debugging and development.
Exports trace data to stdout in human-readable format.
Parameters:
- transport: Transport, transport mechanism (default: SyncTransport)
"""
def __init__(self, transport=SyncTransport): ...
def export(self, span_datas):
"""Export spans to console output."""
def emit(self, span_datas):
"""Emit spans to console immediately."""
class FileExporter(Exporter):
"""
File output exporter for persistent trace storage.
Parameters:
- file_name: str, output file path (default: 'opencensus-traces.json')
- transport: Transport, transport mechanism (default: SyncTransport)
- file_mode: str, file mode for writing (default: 'w+')
"""
def __init__(self, file_name='opencensus-traces.json', transport=SyncTransport, file_mode='w+'): ...
def export(self, span_datas):
"""Export spans to configured file."""
def emit(self, span_datas):
"""Write spans to file immediately."""
class LoggingExporter(Exporter):
"""
Python logging system exporter.
Exports trace data through Python's logging system,
allowing integration with existing log configuration.
Parameters:
- handler: logging.Handler, custom log handler (optional)
- transport: Transport, transport mechanism (default: SyncTransport)
"""
def __init__(self, handler=None, transport=SyncTransport): ...
def export(self, span_datas):
"""Export spans through logging system."""
def emit(self, span_datas):
"""Log spans immediately."""Control how exported data is transmitted, with options for synchronous, asynchronous, and batched delivery.
class Transport:
"""Base transport class for data transmission."""
def export(self, datas):
"""
Export data using this transport.
Parameters:
- datas: list, data objects to export
"""
def flush(self):
"""Flush any pending data."""
class SyncTransport(Transport):
"""
Synchronous transport for immediate data export.
Parameters:
- exporter: Exporter, destination exporter
"""
def __init__(self, exporter): ...
def export(self, datas):
"""Export data synchronously through configured exporter."""
class AsyncTransport(Transport):
"""
Asynchronous transport with background thread processing.
Provides batching, buffering, and automatic retry capabilities
for high-throughput scenarios.
Parameters:
- exporter: Exporter, destination exporter
- grace_period: float, shutdown grace period in seconds (default: 5.0)
- max_batch_size: int, maximum batch size (default: 600)
- wait_period: float, batch flush interval in seconds (default: 60.0)
"""
def __init__(self, exporter, grace_period=5.0, max_batch_size=600, wait_period=60.0): ...
def export(self, data):
"""Queue data for asynchronous export."""
def flush(self):
"""
Flush all pending data and wait for completion.
Blocks until all queued data has been exported or
grace period expires.
"""Periodic export of metrics with background processing, aggregation, and error handling for monitoring systems.
class PeriodicMetricTask:
"""
Periodic metric export task with background thread.
Parameters:
- interval: float, export interval in seconds
- function: callable, export function to call
- args: tuple, function arguments
- kwargs: dict, function keyword arguments
- name: str, task name for identification
"""
def __init__(self, interval=None, function=None, args=None, kwargs=None, name=None): ...
def run(self):
"""Start periodic metric export in background thread."""
def close(self):
"""
Stop periodic export and cleanup resources.
Stops background thread and waits for completion.
"""
def get_exporter_thread(metric_producers, exporter, interval=None):
"""
Create and start metric export task.
Parameters:
- metric_producers: list, MetricProducer instances to export from
- exporter: Exporter, destination for exported metrics
- interval: float, export interval in seconds (default: 60)
Returns:
PeriodicMetricTask: Running export task
"""
class TransportError(Exception):
"""Exception raised for transport-related errors."""
pass
# Constants
DEFAULT_INTERVAL = 60
"""int: Default export interval in seconds"""
GRACE_PERIOD = 5
"""int: Default grace period for shutdown in seconds"""from opencensus.trace.tracer import Tracer
from opencensus.trace import print_exporter
from opencensus.trace.samplers import AlwaysOnSampler
# Create tracer with console export
tracer = Tracer(
sampler=AlwaysOnSampler(),
exporter=print_exporter.PrintExporter()
)
# Traced operations will print to console
with tracer.span('development_operation') as span:
span.add_attribute('debug_info', 'testing console export')
span.add_annotation('Starting development test')
# Your application logic
result = process_data()
span.add_attribute('result_size', len(result))
span.add_annotation('Development test completed')
# Console output will show span details:
# Span: development_operation
# Start: 2024-01-15T10:30:15.123456Z
# End: 2024-01-15T10:30:15.234567Z
# Attributes: {'debug_info': 'testing console export', 'result_size': 42}
# Annotations: [{'description': 'Starting development test', ...}, ...]from opencensus.trace.tracer import Tracer
from opencensus.trace import file_exporter
from opencensus.trace.samplers import ProbabilitySampler
from opencensus.common.transports import AsyncTransport
# Create file exporter with async transport
exporter = file_exporter.FileExporter(
file_name='./traces.json',
transport=AsyncTransport(
exporter=file_exporter.FileExporter('./traces.json'),
max_batch_size=100,
wait_period=30.0
)
)
tracer = Tracer(
sampler=ProbabilitySampler(rate=0.1),
exporter=exporter
)
# High-throughput scenario
for i in range(1000):
with tracer.span(f'batch_operation_{i}') as span:
span.add_attribute('batch_id', i)
span.add_attribute('operation_type', 'batch_process')
# Process item
process_batch_item(i)
if i % 100 == 0:
span.add_annotation(f'Completed batch {i//100}')
# Flush remaining data before shutdown
exporter.transport.flush()import logging
from opencensus.trace.tracer import Tracer
from opencensus.trace import logging_exporter
from opencensus.common.transports import SyncTransport
# Configure logging system
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('application.log'),
logging.StreamHandler()
]
)
# Create logging exporter
exporter = logging_exporter.LoggingExporter(
logger_name='opencensus.traces',
transport=SyncTransport(logging_exporter.LoggingExporter())
)
tracer = Tracer(exporter=exporter)
# Traces will appear in logs
with tracer.span('user_authentication') as span:
span.add_attribute('user_id', 'user123')
span.add_attribute('auth_method', 'oauth2')
try:
authenticate_user('user123')
span.add_annotation('Authentication successful')
except AuthenticationError as e:
span.set_status(Status.from_exception(e))
span.add_annotation('Authentication failed')
raise
# Log entries will include trace information:
# 2024-01-15 10:30:15 - opencensus.traces - INFO - Span: user_authentication ...from opencensus.trace.base_exporter import Exporter
import json
import requests
class HTTPExporter(Exporter):
"""Custom exporter that sends traces to HTTP endpoint."""
def __init__(self, endpoint_url, api_key=None, timeout=10):
self.endpoint_url = endpoint_url
self.api_key = api_key
self.timeout = timeout
self.session = requests.Session()
if api_key:
self.session.headers.update({'Authorization': f'Bearer {api_key}'})
def export(self, span_datas):
"""Export spans to HTTP endpoint."""
try:
# Convert spans to JSON format
trace_data = []
for span_data in span_datas:
trace_data.append({
'traceId': span_data.context.trace_id,
'spanId': span_data.span_id,
'name': span_data.name,
'startTime': span_data.start_time.isoformat(),
'endTime': span_data.end_time.isoformat() if span_data.end_time else None,
'attributes': dict(span_data.attributes or {}),
'status': {
'code': span_data.status.canonical_code if span_data.status else 0,
'message': span_data.status.message if span_data.status else None
}
})
# Send to endpoint
response = self.session.post(
self.endpoint_url,
json={'traces': trace_data},
timeout=self.timeout
)
response.raise_for_status()
except Exception as e:
print(f"Export failed: {e}")
def emit(self, span_datas):
"""Emit spans immediately."""
self.export(span_datas)
# Usage
custom_exporter = HTTPExporter(
endpoint_url='https://monitoring.example.com/traces',
api_key='your-api-key'
)
tracer = Tracer(exporter=custom_exporter)
with tracer.span('custom_export_test') as span:
span.add_attribute('custom_field', 'test_value')
# Span will be sent to HTTP endpointfrom opencensus.metrics.transport import get_exporter_thread, PeriodicMetricTask
from opencensus.metrics.export import MetricProducer, Metric, MetricDescriptor
from opencensus.stats import Stats
import time
class CustomMetricProducer(MetricProducer):
"""Custom metric producer for application metrics."""
def __init__(self):
self.stats = Stats()
def get_metrics(self):
"""Get current metrics for export."""
# Collect current view data
view_datas = self.stats.view_manager.get_all_exported_data()
metrics = []
for view_data in view_datas:
# Convert view data to metrics
metric = self._convert_view_to_metric(view_data)
if metric:
metrics.append(metric)
return metrics
def _convert_view_to_metric(self, view_data):
"""Convert view data to metric format."""
# Implementation depends on your specific metric format
pass
class ConsoleMetricExporter:
"""Simple console exporter for metrics."""
def export(self, metrics):
"""Export metrics to console."""
print(f"=== Metrics Export ({len(metrics)} metrics) ===")
for metric in metrics:
print(f"Metric: {metric.descriptor.name}")
print(f" Description: {metric.descriptor.description}")
print(f" Time Series: {len(metric.time_series)}")
print("=" * 50)
# Set up periodic export
producer = CustomMetricProducer()
exporter = ConsoleMetricExporter()
# Start periodic export every 30 seconds
export_task = get_exporter_thread(
metric_producers=[producer],
exporter=exporter,
interval=30
)
# Let it run for a while
try:
time.sleep(120) # Run for 2 minutes
finally:
# Clean shutdown
export_task.close()from opencensus.trace.tracer import Tracer
from opencensus.trace import file_exporter
from opencensus.common.transports import AsyncTransport
import signal
import sys
class GracefulFileExporter:
"""File exporter with graceful shutdown handling."""
def __init__(self, file_path, batch_size=500, flush_interval=60):
self.exporter = file_exporter.FileExporter(file_path)
self.transport = AsyncTransport(
exporter=self.exporter,
max_batch_size=batch_size,
wait_period=flush_interval,
grace_period=10.0
)
# Set up signal handlers for graceful shutdown
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle shutdown signals gracefully."""
print(f"Received signal {signum}, flushing traces...")
self.transport.flush()
sys.exit(0)
def export(self, span_datas):
"""Export spans through async transport."""
for span_data in span_datas:
self.transport.export(span_data)
# Usage in production application
exporter = GracefulFileExporter('./production_traces.jsonl')
tracer = Tracer(exporter=exporter)
# Application will gracefully flush traces on shutdown
try:
while True: # Main application loop
with tracer.span('main_operation') as span:
# Your application logic
process_requests()
except KeyboardInterrupt:
print("Shutting down gracefully...")
# Signal handler will flush remaining tracesInstall with Tessl CLI
npx tessl i tessl/pypi-opencensus