OpenCensus Azure Monitor Exporter for telemetry data (logs, metrics, and traces) to Azure Monitor.
—
Performance counters and custom metrics collection with automatic standard metrics for comprehensive system monitoring. The metrics exporter provides both custom metric tracking and built-in standard metrics for CPU, memory, and request performance.
Core metrics exporter that sends OpenCensus metrics to Azure Monitor as performance counters and custom metrics.
class MetricsExporter(TransportMixin, ProcessorMixin):
"""
Metrics exporter for Microsoft Azure Monitor.
Exports OpenCensus metrics as Azure Monitor performance counters,
supporting both custom metrics and standard system metrics.
"""
def __init__(self, is_stats=False, **options):
"""
Initialize the metrics exporter.
Args:
is_stats (bool): Whether this is a statsbeat exporter (internal use)
**options: Configuration options including connection_string,
instrumentation_key, export_interval, etc.
"""
def export_metrics(self, metrics):
"""
Export a batch of metrics to Azure Monitor.
Args:
metrics (list): List of Metric objects to export
"""
def metric_to_envelopes(self, metric):
"""
Convert a metric to Azure Monitor telemetry envelopes.
Args:
metric (Metric): OpenCensus metric object
Returns:
list: List of Azure Monitor metric envelopes
"""
def shutdown(self):
"""
Shutdown the exporter and clean up resources.
Stops background threads and flushes any pending metrics.
"""
def add_telemetry_processor(self, processor):
"""
Add a telemetry processor for filtering/modifying telemetry.
Args:
processor (callable): Function that takes and returns envelope
"""Convenient factory function that creates a fully configured metrics exporter with background collection and standard metrics.
def new_metrics_exporter(**options):
"""
Create a new metrics exporter with background collection thread.
This factory function creates a MetricsExporter instance and configures
it with a background thread for automatic metric collection and export.
Standard system metrics are enabled by default.
Args:
**options: Configuration options passed to MetricsExporter
Returns:
MetricsExporter: Configured exporter with active background thread
"""from opencensus.ext.azure.metrics_exporter import new_metrics_exporter
# Create exporter with standard metrics enabled
exporter = new_metrics_exporter(
connection_string="InstrumentationKey=your-instrumentation-key",
export_interval=30.0, # Export every 30 seconds
enable_standard_metrics=True
)
# Standard metrics are automatically collected and exported
# No additional code needed for CPU, memory, request metricsfrom opencensus.ext.azure.metrics_exporter import new_metrics_exporter
from opencensus.stats import aggregation as aggregation_module
from opencensus.stats import measure as measure_module
from opencensus.stats import stats as stats_module
from opencensus.stats import view as view_module
from opencensus.tags import tag_map as tag_map_module
# Create exporter
exporter = new_metrics_exporter(
connection_string="InstrumentationKey=your-instrumentation-key"
)
# Define custom measures
request_count_measure = measure_module.MeasureInt(
"request_count", "Number of requests", "1")
request_latency_measure = measure_module.MeasureFloat(
"request_latency", "Request latency", "ms")
# Define views (how metrics are aggregated)
request_count_view = view_module.View(
"request_count_view",
"Number of requests by endpoint",
["endpoint", "method"],
request_count_measure,
aggregation_module.CountAggregation()
)
request_latency_view = view_module.View(
"request_latency_view",
"Request latency distribution",
["endpoint"],
request_latency_measure,
aggregation_module.DistributionAggregation([10, 50, 100, 500, 1000])
)
# Register views
stats_recorder = stats_module.stats.stats_recorder
view_manager = stats_module.stats.view_manager
view_manager.register_view(request_count_view)
view_manager.register_view(request_latency_view)
# Record metrics in your application
def handle_request(endpoint, method):
# Create tag map for dimensions
tag_map = tag_map_module.TagMap()
tag_map.insert("endpoint", endpoint)
tag_map.insert("method", method)
# Record request count
stats_recorder.new_measurement_map().measure_int_put(
request_count_measure, 1).record(tag_map)
# Measure request latency
start_time = time.time()
# ... handle request ...
latency = (time.time() - start_time) * 1000
tag_map_latency = tag_map_module.TagMap()
tag_map_latency.insert("endpoint", endpoint)
stats_recorder.new_measurement_map().measure_float_put(
request_latency_measure, latency).record(tag_map_latency)When enable_standard_metrics=True, the following system metrics are automatically collected:
class AzureStandardMetricsProducer(MetricProducer):
"""
Producer for Azure standard metrics.
Automatically collects standard system performance metrics
including CPU usage, memory consumption, and request statistics.
"""
def get_metrics(self):
"""
Get current standard metrics.
Returns:
list: List of standard metric objects
"""
def register_metrics():
"""
Register all standard metrics with OpenCensus.
Returns:
Registry: Registry instance with standard metrics registered
"""class ProcessorTimeMetric:
"""Processor time percentage metric."""
class RequestsAvgExecutionMetric:
"""Average request execution time metric."""
class RequestsRateMetric:
"""Request rate (requests per second) metric."""
class AvailableMemoryMetric:
"""Available system memory metric."""
class ProcessCPUMetric:
"""Process CPU usage percentage metric."""
class ProcessMemoryMetric:
"""Process memory usage metric."""These metrics are automatically collected at the configured export interval and provide:
Metrics exporter supports these specific options in addition to common options:
enable_standard_metrics (bool): Enable automatic standard metrics collection (default: True)is_stats (bool): Internal flag for statsbeat metrics (default: False)The exporter supports these OpenCensus metric types:
Note: Histogram/Distribution aggregations are not currently supported and will be skipped.
def custom_metric_processor(envelope):
"""Filter out noisy metrics or add custom properties."""
if envelope.data.baseData.metrics[0].name == "noisy_metric":
return None # Drop this metric
# Add custom properties
envelope.data.baseData.properties["environment"] = "production"
return envelope
exporter = new_metrics_exporter(
connection_string="InstrumentationKey=your-key-here"
)
exporter.add_telemetry_processor(custom_metric_processor)# Flask integration example
from flask import Flask, g
import time
app = Flask(__name__)
# Set up metrics
exporter = new_metrics_exporter(
connection_string="InstrumentationKey=your-key-here"
)
@app.before_request
def before_request():
g.start_time = time.time()
@app.after_request
def after_request(response):
# Record request metrics
duration = (time.time() - g.start_time) * 1000
tag_map = tag_map_module.TagMap()
tag_map.insert("endpoint", request.endpoint or "unknown")
tag_map.insert("method", request.method)
tag_map.insert("status_code", str(response.status_code))
# Record count and latency
stats_recorder.new_measurement_map().measure_int_put(
request_count_measure, 1).record(tag_map)
stats_recorder.new_measurement_map().measure_float_put(
request_latency_measure, duration).record(tag_map)
return responseimport atexit
exporter = new_metrics_exporter(
connection_string="InstrumentationKey=your-key-here"
)
# Ensure clean shutdown
def cleanup():
exporter.shutdown()
atexit.register(cleanup)max_batch_size for high-volume scenariosInstall with Tessl CLI
npx tessl i tessl/pypi-opencensus-ext-azure