OpenCensus Azure Monitor Exporter for telemetry data (logs, metrics, and traces) to Azure Monitor.
—
Shared configuration options, utilities, and protocol objects used across all exporters. The configuration system provides centralized options management, connection string parsing, and common utilities for Azure Monitor integration.
Central configuration class that manages all exporter options with validation and defaults.
class Options(BaseObject):
"""
Configuration options for Azure exporters.
Provides centralized configuration with validation, defaults,
and Azure Monitor-specific option processing.
"""
def __init__(self, **options):
"""
Initialize configuration options.
Args:
**options: Configuration key-value pairs
Common Options:
connection_string (str): Azure Monitor connection string (recommended)
instrumentation_key (str): Instrumentation key (deprecated)
endpoint (str): Custom endpoint URL
export_interval (float): Export frequency in seconds (default: 15.0)
max_batch_size (int): Maximum batch size (default: 100)
enable_local_storage (bool): Enable local storage (default: True)
storage_path (str): Custom storage path
timeout (float): Network timeout in seconds (default: 10.0)
logging_sampling_rate (float): Log sampling rate (default: 1.0)
enable_standard_metrics (bool): Enable standard metrics (default: True)
credential: Azure credential for authentication
proxies (dict): Proxy configuration
"""Utility functions for processing and validating configuration options.
def process_options(options):
"""
Process and validate configuration options.
Args:
options (dict): Raw configuration options
Returns:
dict: Processed and validated options
"""
def parse_connection_string(connection_string):
"""
Parse Azure Monitor connection string.
Args:
connection_string (str): Azure Monitor connection string
Returns:
dict: Parsed connection components including instrumentation_key
and ingestion_endpoint
Raises:
ValueError: If connection string format is invalid
"""
def validate_instrumentation_key(instrumentation_key):
"""
Validate instrumentation key format.
Args:
instrumentation_key (str): Instrumentation key to validate
Raises:
ValueError: If instrumentation key format is invalid
"""INGESTION_ENDPOINT = "IngestionEndpoint"
INSTRUMENTATION_KEY = "InstrumentationKey"
TEMPDIR_PREFIX = "opencensus-python-"from opencensus.ext.azure.log_exporter import AzureLogHandler
# Using connection string (recommended)
handler = AzureLogHandler(
connection_string="InstrumentationKey=your-instrumentation-key;IngestionEndpoint=https://your-region.in.applicationinsights.azure.com/"
)
# Using instrumentation key directly (deprecated)
handler = AzureLogHandler(
instrumentation_key="your-instrumentation-key"
)from opencensus.ext.azure.trace_exporter import AzureExporter
from opencensus.ext.azure.common.storage import LocalFileStorage
exporter = AzureExporter(
connection_string="InstrumentationKey=your-key-here",
export_interval=30.0, # Export every 30 seconds
max_batch_size=50, # Batch up to 50 items
timeout=15.0, # 15 second network timeout
enable_local_storage=True, # Enable persistence
storage_path="/tmp/azure_monitor", # Custom storage location
storage_max_size=50 * 1024 * 1024, # 50MB max storage
storage_retention_period=7 * 24 * 3600, # 7 days retention
proxies={ # Proxy configuration
'http': 'http://proxy.company.com:8080',
'https': 'https://proxy.company.com:8080'
}
)from azure.identity import DefaultAzureCredential
from opencensus.ext.azure.metrics_exporter import new_metrics_exporter
# Use Azure Identity for authentication
credential = DefaultAzureCredential()
exporter = new_metrics_exporter(
connection_string="InstrumentationKey=your-key-here",
credential=credential
)import os
from opencensus.ext.azure.log_exporter import AzureLogHandler
# Configuration from environment variables
handler = AzureLogHandler(
connection_string=os.environ.get('AZURE_MONITOR_CONNECTION_STRING'),
export_interval=float(os.environ.get('EXPORT_INTERVAL', '15.0')),
max_batch_size=int(os.environ.get('MAX_BATCH_SIZE', '100')),
enable_local_storage=os.environ.get('ENABLE_STORAGE', 'true').lower() == 'true'
)connection_string (str): Complete Azure Monitor connection string containing instrumentation key and endpoint
"InstrumentationKey=<key>;IngestionEndpoint=<endpoint>"instrumentation_key and endpointinstrumentation_key (str): Azure Application Insights instrumentation key
connection_string insteadendpoint (str): Custom ingestion endpoint URL
credential: Azure credential object for authentication
export_interval (float): How often to export telemetry in seconds
max_batch_size (int): Maximum number of telemetry items per batch
timeout (float): Network request timeout in seconds
enable_local_storage (bool): Enable local file persistence
storage_path (str): Directory for local storage files
storage_max_size (int): Maximum storage directory size in bytes
storage_maintenance_period (int): Storage cleanup frequency in seconds
storage_retention_period (int): Storage file retention time in seconds
logging_sampling_rate (float): Sampling rate for log records
enable_standard_metrics (bool): Enable automatic standard metrics
proxies (dict): HTTP proxy configuration
{'http': 'http://proxy:port', 'https': 'https://proxy:port'}'http://user:pass@proxy:port'grace_period (float): Shutdown grace period in seconds
queue_capacity (int): Maximum queued telemetry items
minimum_retry_interval (int): Minimum retry delay in seconds
Azure Monitor connection strings contain multiple components:
InstrumentationKey=12345678-1234-1234-1234-123456789012;IngestionEndpoint=https://your-region.in.applicationinsights.azure.com/;LiveEndpoint=https://your-region.livediagnostics.monitor.azure.com/The configuration system performs validation:
Common configuration errors:
# Null or empty instrumentation key
ValueError: "Instrumentation key cannot be none or empty."
# Invalid instrumentation key format
ValueError: "Invalid instrumentation key."
# Invalid sampling rate
ValueError: "Sampling must be in the range: [0,1]"
# Invalid batch size
ValueError: "Max batch size must be at least 1."Azure Monitor protocol objects used for telemetry data structures.
class BaseObject(dict):
"""
Base class for all Azure Monitor protocol objects.
Provides attribute-style access to dictionary data with defaults.
"""
def __init__(self, *args, **kwargs):
"""
Initialize the base object.
Args:
*args: Positional arguments passed to dict
**kwargs: Keyword arguments set as attributes
"""
class Envelope(BaseObject):
"""
Telemetry envelope containing metadata and data payload.
Standard wrapper for all telemetry items sent to Azure Monitor.
"""
def __init__(self, *args, **kwargs):
"""
Initialize telemetry envelope.
Args:
ver (int): Schema version (default: 1)
name (str): Telemetry item name
time (str): Timestamp in ISO format
iKey (str): Instrumentation key
tags (dict): Context tags
data (Data): Telemetry data payload
"""
class Data(BaseObject):
"""
Telemetry data container holding base data and type information.
"""
def __init__(self, *args, **kwargs):
"""
Initialize data container.
Args:
baseData: The actual telemetry data object
baseType (str): Type identifier for the base data
"""
class DataPoint(BaseObject):
"""
Metric data point containing value and metadata.
"""
def __init__(self, *args, **kwargs):
"""
Initialize metric data point.
Args:
ns (str): Namespace
name (str): Metric name
value (float): Metric value
kind (str, optional): Data point kind
count (int, optional): Sample count
min (float, optional): Minimum value
max (float, optional): Maximum value
stdDev (float, optional): Standard deviation
"""
class Event(BaseObject):
"""
Custom event telemetry data.
"""
def __init__(self, *args, **kwargs):
"""
Initialize event data.
Args:
name (str): Event name
properties (dict): Custom properties
measurements (dict): Custom measurements
"""
class ExceptionData(BaseObject):
"""
Exception telemetry data with stack trace information.
"""
def __init__(self, *args, **kwargs):
"""
Initialize exception data.
Args:
exceptions (list): List of exception details
severityLevel (int): Exception severity level
properties (dict): Custom properties
"""
class Message(BaseObject):
"""
Log message telemetry data.
"""
def __init__(self, *args, **kwargs):
"""
Initialize message data.
Args:
message (str): Log message text
severityLevel (int): Message severity level
properties (dict): Custom properties
"""
class MetricData(BaseObject):
"""
Metric telemetry data container.
"""
def __init__(self, *args, **kwargs):
"""
Initialize metric data.
Args:
metrics (list): List of DataPoint objects
properties (dict): Custom properties
"""
class Request(BaseObject):
"""
HTTP request telemetry data.
"""
def __init__(self, *args, **kwargs):
"""
Initialize request data.
Args:
id (str): Request identifier
name (str): Request name
url (str): Request URL
duration (str): Request duration
responseCode (str): HTTP response code
success (bool): Request success status
properties (dict): Custom properties
"""
class RemoteDependency(BaseObject):
"""
Remote dependency call telemetry data.
"""
def __init__(self, *args, **kwargs):
"""
Initialize dependency data.
Args:
id (str): Dependency call identifier
name (str): Dependency name
type (str): Dependency type (HTTP, SQL, etc.)
target (str): Dependency target
data (str): Dependency data (URL, command, etc.)
duration (str): Call duration
resultCode (str): Result code
success (bool): Call success status
properties (dict): Custom properties
"""Local file storage system for reliable telemetry persistence.
class LocalFileStorage:
"""
Local file storage for telemetry persistence during network issues.
Provides reliable storage with automatic maintenance, retention,
and retry capabilities for Azure Monitor telemetry.
"""
def __init__(self, path, max_size=50*1024*1024, maintenance_period=60,
retention_period=7*24*60*60, write_timeout=60, source=None):
"""
Initialize local file storage.
Args:
path (str): Storage directory path
max_size (int): Maximum storage size in bytes (default: 50MB)
maintenance_period (int): Cleanup interval in seconds (default: 60)
retention_period (int): File retention time in seconds (default: 7 days)
write_timeout (int): Write operation timeout in seconds (default: 60)
source (str, optional): Source identifier for maintenance task naming
"""
def put(self, data, lease_period=0):
"""
Store telemetry data to local file.
Args:
data (list): List of telemetry items to store
lease_period (int): Lease time in seconds (default: 0)
Returns:
LocalFileBlob: Blob object for stored data, or None if storage full
"""
def get(self):
"""
Retrieve next available telemetry blob.
Returns:
LocalFileBlob: Next available blob, or None if none available
"""
def gets(self):
"""
Generator for all available telemetry blobs.
Yields:
LocalFileBlob: Available blob objects in chronological order
"""
def close(self):
"""
Close storage and stop maintenance task.
"""
class LocalFileBlob:
"""
Individual telemetry data file with lease capabilities.
Represents a single stored telemetry batch with operations
for reading, leasing, and cleanup.
"""
def __init__(self, fullpath):
"""
Initialize file blob.
Args:
fullpath (str): Full path to the blob file
"""
def get(self):
"""
Read telemetry data from blob file.
Returns:
tuple: Tuple of JSON-decoded telemetry items, or None on error
"""
def put(self, data, lease_period=0):
"""
Write telemetry data to blob file.
Args:
data (list): List of telemetry items to write
lease_period (int): Initial lease time in seconds
Returns:
LocalFileBlob: Self reference, or None on error
"""
def lease(self, period):
"""
Acquire or extend lease on blob file.
Args:
period (int): Lease duration in seconds
Returns:
LocalFileBlob: Self reference if successful, None if failed
"""
def delete(self):
"""
Delete the blob file.
"""Common utilities for Azure Monitor integration.
def validate_instrumentation_key(instrumentation_key):
"""
Validate instrumentation key format.
Ensures the instrumentation key is not null/empty and matches
the required UUID format for Azure Monitor.
Args:
instrumentation_key (str): Instrumentation key to validate
Raises:
ValueError: If key is null, empty, or invalid format
"""
def parse_connection_string(connection_string):
"""
Parse Azure Monitor connection string into components.
Args:
connection_string (str): Connection string to parse
Returns:
dict: Parsed connection components with lowercase keys
Raises:
ValueError: If connection string format is invalid
"""
def timestamp_to_duration(start_time, end_time):
"""
Convert start and end timestamps to Azure Monitor duration format.
Args:
start_time: Start timestamp
end_time: End timestamp
Returns:
str: Duration in format 'd.hh:mm:ss.fff'
"""
def timestamp_to_iso_str(timestamp):
"""
Convert timestamp to ISO format string.
Args:
timestamp (float): Unix timestamp
Returns:
str: ISO formatted timestamp string
"""
def microseconds_to_duration(microseconds):
"""
Convert microseconds to Azure Monitor duration format.
Args:
microseconds (int): Duration in microseconds
Returns:
str: Duration in format 'd.hh:mm:ss.fff'
"""
# Azure Monitor context information
azure_monitor_context = {
'ai.cloud.role': 'Application role name',
'ai.cloud.roleInstance': 'Role instance identifier',
'ai.device.id': 'Device identifier',
'ai.device.locale': 'Device locale',
'ai.device.osVersion': 'Operating system version',
'ai.device.type': 'Device type',
'ai.internal.sdkVersion': 'SDK version information'
}HTTP transport functionality with retry logic and status handling.
class TransportStatusCode:
"""
Status codes for telemetry transmission results.
"""
SUCCESS = 0 # All telemetry successfully transmitted
RETRY = 1 # Transmission failed, should retry later
DROP = 2 # Transmission failed, should not retry
STATSBEAT_SHUTDOWN = 3 # Statsbeat shutdown signal
class TransportMixin:
"""
Mixin providing HTTP transport functionality for exporters.
Handles telemetry transmission to Azure Monitor with retry logic,
local storage integration, and error handling.
"""
def _transmit(self, envelopes):
"""
Transmit telemetry envelopes to Azure Monitor.
Args:
envelopes (list): List of telemetry envelopes to transmit
Returns:
int: TransportStatusCode indicating transmission result
"""
def _transmit_from_storage(self):
"""
Transmit telemetry from local storage.
Processes stored telemetry files and attempts retransmission,
cleaning up successfully sent items and re-leasing failed items.
"""
def _check_stats_collection(self):
"""
Check if statsbeat collection should be enabled.
Returns:
bool: True if statsbeat collection should be enabled
"""
# HTTP status code constants
RETRYABLE_STATUS_CODES = (401, 403, 408, 429, 500, 502, 503, 504)
THROTTLE_STATUS_CODES = (402, 439)
REDIRECT_STATUS_CODES = (307, 308)Telemetry processing pipeline for filtering and enrichment.
class ProcessorMixin:
"""
Mixin providing telemetry processor functionality.
Enables registration and application of custom telemetry processors
for filtering, enrichment, and modification of telemetry data.
"""
def add_telemetry_processor(self, processor):
"""
Add a telemetry processor to the processing pipeline.
Args:
processor (callable): Function that takes an envelope and returns
modified envelope or None to drop
"""
def apply_telemetry_processors(self, envelopes):
"""
Apply all registered processors to telemetry envelopes.
Args:
envelopes (list): List of telemetry envelopes to process
Returns:
list: Processed envelopes (may be filtered/modified)
"""Install with Tessl CLI
npx tessl i tessl/pypi-opencensus-ext-azure