A comprehensive observability framework providing distributed tracing, metrics collection, and statistics gathering capabilities for Python applications.
—
Shared utilities including configuration management, resource detection, scheduling, HTTP handling, and general helper functions used across the OpenCensus ecosystem.
Load and configure OpenCensus components dynamically using string expressions and namespace-based imports.
def load(expr):
"""
Dynamically import OpenCensus components and evaluate configuration expressions.
Supports dot-notation imports and component instantiation from string expressions.
Parameters:
- expr: str, import expression (e.g., 'opencensus.trace.samplers.ProbabilitySampler')
Returns:
object: Imported module, class, or function
"""
class Namespace:
"""
Dynamic import helper for OpenCensus configuration.
Parameters:
- name: str, namespace name
- parent: Namespace, parent namespace (optional)
"""
def __init__(self, name, parent=None): ...
def __getattr__(self, item):
"""
Dynamic attribute access for importing submodules.
Parameters:
- item: str, attribute/module name
Returns:
Namespace: Child namespace for further traversal
"""
def __str__(self):
"""String representation of namespace path."""
def __call__(self, *args, **kwargs):
"""
Instantiate the referenced class/function.
Parameters:
- *args: positional arguments
- **kwargs: keyword arguments
Returns:
object: Instantiated object or function result
"""
@classmethod
def eval(cls, expr):
"""
Evaluate configuration expression.
Parameters:
- expr: str, configuration expression
Returns:
object: Evaluated result
"""General-purpose utilities for string processing, time handling, and data manipulation.
# Constants
UTF8 = 'utf-8'
"""str: UTF-8 encoding constant"""
MAX_LENGTH = 128
"""int: Default maximum string length"""
ISO_DATETIME_REGEX = '%Y-%m-%dT%H:%M:%S.%fZ'
"""str: ISO 8601 datetime format string"""
def get_truncatable_str(str_to_convert):
"""
Truncate string and record byte count for OpenCensus limits.
Parameters:
- str_to_convert: str, string to process
Returns:
dict: Dictionary with 'value' (truncated string) and 'truncated_byte_count'
"""
def check_str_length(str_to_check, limit=MAX_LENGTH):
"""
Check and truncate string length if necessary.
Parameters:
- str_to_check: str, string to validate
- limit: int, maximum allowed length (default: 128)
Returns:
str: Original string if within limit, truncated string otherwise
"""
def to_iso_str(ts=None):
"""
Get ISO 8601 string for UTC datetime.
Parameters:
- ts: datetime, timestamp to convert (default: current UTC time)
Returns:
str: ISO 8601 formatted datetime string
"""
def timestamp_to_microseconds(timestamp):
"""
Convert timestamp string to microseconds since epoch.
Parameters:
- timestamp: str, ISO 8601 timestamp string
Returns:
int: Microseconds since Unix epoch
"""Helper functions for working with iterables, including uniqueness filtering and windowing operations.
def iuniq(ible):
"""
Iterator over unique items while preserving order.
Parameters:
- ible: iterable, input sequence
Yields:
object: Unique items from input sequence
"""
def uniq(ible):
"""
List of unique items while preserving order.
Parameters:
- ible: iterable, input sequence
Returns:
list: List containing unique items from input
"""
def window(ible, length):
"""
Split iterable into lists of specified length.
Parameters:
- ible: iterable, input sequence
- length: int, window size
Returns:
list: List of lists, each containing 'length' items
"""
def get_weakref(func):
"""
Get weak reference to bound or unbound function.
Handles both bound methods and regular functions, providing
weak references that don't prevent garbage collection.
Parameters:
- func: callable, function or method to reference
Returns:
weakref: Weak reference to the function
"""Automatic detection of runtime environment and resource metadata for cloud platforms and containerized environments.
class Resource:
"""
Entity description for signal reporting with type and labels.
Parameters:
- type_: str, resource type identifier
- labels: dict, key-value resource labels
"""
def __init__(self, type_=None, labels=None): ...
def get_type(self):
"""
Get resource type.
Returns:
str: Resource type identifier
"""
def get_labels(self):
"""
Get resource labels.
Returns:
dict: Resource labels dictionary
"""
def merge(self, other):
"""
Merge with another resource, combining labels.
Parameters:
- other: Resource, resource to merge with
Returns:
Resource: New resource with merged labels
"""
def merge_resources(resource_list):
"""
Merge multiple resources into a single resource.
Parameters:
- resource_list: list, list of Resource objects
Returns:
Resource: Merged resource with combined labels
"""
def check_ascii_256(string):
"""
Validate ASCII printable characters and length constraints.
Parameters:
- string: str, string to validate
Returns:
bool: True if string contains only ASCII printable chars and is <= 256 chars
"""
def unquote(string):
"""
Strip surrounding quotes from string.
Parameters:
- string: str, potentially quoted string
Returns:
str: String with surrounding quotes removed
"""
def parse_labels(labels_str):
"""
Parse label key-value pairs from string format.
Parses comma-separated key=value pairs, handling quoted values.
Parameters:
- labels_str: str, label string (e.g., "key1=value1,key2=value2")
Returns:
dict: Parsed labels dictionary
"""
def get_from_env():
"""
Get resource from environment variables.
Reads OC_RESOURCE_TYPE and OC_RESOURCE_LABELS environment variables
to construct a Resource object.
Returns:
Resource: Resource constructed from environment variables
"""
# Environment variable constants
OC_RESOURCE_TYPE = 'OC_RESOURCE_TYPE'
"""str: Environment variable for resource type"""
OC_RESOURCE_LABELS = 'OC_RESOURCE_LABELS'
"""str: Environment variable for resource labels"""
def is_gce_environment():
"""
Check if running on Google Compute Engine.
Returns:
bool: True if running on GCE
"""
def is_aws_environment():
"""
Check if running on AWS EC2.
Returns:
bool: True if running on AWS EC2
"""
def get_instance():
"""
Get resource for current cloud environment.
Automatically detects cloud platform and returns appropriate
resource with platform-specific metadata.
Returns:
Resource: Resource representing current environment
"""Thread-based scheduling and queue management for background processing and periodic tasks.
class PeriodicTask:
"""
Periodic thread execution for background tasks.
Parameters:
- interval: float, execution interval in seconds
- function: callable, function to execute periodically
- args: tuple, function arguments (optional)
- kwargs: dict, function keyword arguments (optional)
- name: str, task name for identification (optional)
"""
def __init__(self, interval, function, args=None, kwargs=None, name=None): ...
def run(self):
"""Start periodic execution in background thread."""
def cancel(self):
"""Cancel periodic execution and stop background thread."""
class QueueEvent:
"""
Thread synchronization event for queue coordination.
Parameters:
- name: str, event name for identification
"""
def __init__(self, name): ...
def set(self):
"""Set the event, releasing waiting threads."""
def wait(self, timeout=None):
"""
Wait for event to be set.
Parameters:
- timeout: float, maximum wait time in seconds (optional)
Returns:
bool: True if event was set, False if timeout occurred
"""
class QueueExitEvent:
"""
Special event for signaling queue shutdown.
Parameters:
- name: str, event name for identification
"""
def __init__(self, name): ...
class Queue:
"""
Thread-safe queue with capacity limits and bulk operations.
Parameters:
- capacity: int, maximum queue capacity
"""
def __init__(self, capacity): ...
def gets(self, count, timeout):
"""
Get multiple items from queue.
Parameters:
- count: int, maximum number of items to retrieve
- timeout: float, maximum wait time in seconds
Returns:
list: Retrieved items (may be fewer than requested)
"""
def is_empty(self):
"""
Check if queue is empty.
Returns:
bool: True if queue contains no items
"""
def flush(self, timeout=None):
"""
Remove and return all items from queue.
Parameters:
- timeout: float, maximum wait time in seconds (optional)
Returns:
list: All items that were in the queue
"""
def put(self, item, block=True, timeout=None):
"""
Put item in queue.
Parameters:
- item: object, item to add to queue
- block: bool, whether to block if queue is full
- timeout: float, maximum wait time if blocking
"""
def puts(self, items, block=True, timeout=None):
"""
Put multiple items in queue.
Parameters:
- items: list, items to add to queue
- block: bool, whether to block if queue is full
- timeout: float, maximum wait time if blocking
"""Simple HTTP client functionality for internal OpenCensus communication and metadata retrieval.
def get_request(request_url, request_headers=dict()):
"""
Execute HTTP GET request with timeout and error handling.
Parameters:
- request_url: str, URL to request
- request_headers: dict, HTTP headers (default: empty dict)
Returns:
requests.Response: HTTP response object
"""
# Constants
_REQUEST_TIMEOUT = 2
"""int: Default HTTP request timeout in seconds"""Backports and compatibility layers for supporting different Python versions and environments.
class WeakMethod:
"""
Weak reference to bound methods (Python 2.6 compatibility).
Provides weak references to bound methods that don't prevent
garbage collection of the target object.
Parameters:
- meth: method, bound method to reference
- callback: callable, cleanup callback (optional)
"""
def __init__(self, meth, callback=None): ...from opencensus.common.configuration import load, Namespace
# Load components dynamically from string expressions
sampler_class = load('opencensus.trace.samplers.ProbabilitySampler')
sampler = sampler_class(rate=0.1)
exporter_class = load('opencensus.trace.exporters.print_exporter.PrintExporter')
exporter = exporter_class()
# Use namespace for fluent configuration
oc = Namespace('opencensus')
tracer = oc.trace.tracer.Tracer(
sampler=oc.trace.samplers.ProbabilitySampler(rate=0.5),
exporter=oc.trace.exporters.print_exporter.PrintExporter()
)
# Configuration from strings (useful for config files)
config_expr = 'opencensus.trace.samplers.AlwaysOnSampler()'
sampler = Namespace.eval(config_expr)from opencensus.common.resource import get_from_env, get_instance
from opencensus.common.monitored_resource import is_gce_environment, is_aws_environment
import os
# Auto-detect current environment
if is_gce_environment():
print("Running on Google Compute Engine")
resource = get_instance()
print(f"Resource type: {resource.get_type()}")
print(f"Resource labels: {resource.get_labels()}")
elif is_aws_environment():
print("Running on AWS EC2")
resource = get_instance()
print(f"Resource type: {resource.get_type()}")
print(f"Resource labels: {resource.get_labels()}")
else:
print("Running on unknown platform")
# Get resource from environment variables
os.environ['OC_RESOURCE_TYPE'] = 'k8s_container'
os.environ['OC_RESOURCE_LABELS'] = 'cluster_name=prod,namespace=default,pod_name=app-123'
env_resource = get_from_env()
print(f"Environment resource: {env_resource.get_type()}")
print(f"Environment labels: {env_resource.get_labels()}")
# Merge resources
from opencensus.common.resource import Resource, merge_resources
app_resource = Resource('application', {'name': 'my-app', 'version': '1.0'})
platform_resource = get_instance()
merged = merge_resources([app_resource, platform_resource])
print(f"Merged resource: {merged.get_labels()}")from opencensus.common.schedule import PeriodicTask, Queue, QueueEvent
import time
import threading
# Simple periodic task
def heartbeat():
print(f"Heartbeat at {time.time()}")
# Create and start periodic task
task = PeriodicTask(
interval=5.0, # Every 5 seconds
function=heartbeat,
name="heartbeat_task"
)
task.run()
# Let it run for 30 seconds
time.sleep(30)
# Stop the task
task.cancel()
# Advanced example with queue processing
work_queue = Queue(capacity=1000)
stop_event = QueueEvent("stop_processing")
def worker():
"""Background worker that processes queue items."""
while not stop_event.wait(timeout=1.0):
try:
# Get up to 10 items at once
items = work_queue.gets(count=10, timeout=1.0)
if items:
print(f"Processing {len(items)} items")
for item in items:
process_item(item)
except Exception as e:
print(f"Worker error: {e}")
def process_item(item):
"""Process individual work item."""
print(f"Processing: {item}")
time.sleep(0.1) # Simulate work
# Start background worker
worker_thread = threading.Thread(target=worker)
worker_thread.start()
# Add work items
for i in range(50):
work_queue.put(f"work_item_{i}")
# Let worker process items
time.sleep(10)
# Signal shutdown and wait
stop_event.set()
worker_thread.join()
print("All work completed")from opencensus.common.utils import (
get_truncatable_str, check_str_length, to_iso_str,
timestamp_to_microseconds, uniq, window
)
from datetime import datetime
# String truncation with byte counting
long_string = "A" * 200
result = get_truncatable_str(long_string)
print(f"Truncated: {len(result['value'])} chars")
print(f"Bytes truncated: {result['truncated_byte_count']}")
# Length checking
test_strings = ["short", "a" * 100, "a" * 150]
for s in test_strings:
checked = check_str_length(s, limit=128)
print(f"'{s[:10]}...' -> {len(checked)} chars")
# Time utilities
now = datetime.utcnow()
iso_string = to_iso_str(now)
print(f"ISO format: {iso_string}")
microseconds = timestamp_to_microseconds(iso_string)
print(f"Microseconds: {microseconds}")
# Collection utilities
data = [1, 2, 2, 3, 1, 4, 5, 3]
unique_data = uniq(data)
print(f"Original: {data}")
print(f"Unique: {unique_data}")
# Window data for batch processing
windowed = window(range(15), length=4)
print(f"Windowed: {windowed}")
# Output: [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14]]from opencensus.common.http_handler import get_request
# Make HTTP request with timeout
try:
response = get_request(
'https://api.example.com/health',
{'User-Agent': 'OpenCensus/1.0', 'Accept': 'application/json'}
)
if response.status_code == 200:
print("Service is healthy")
print(f"Response: {response.json()}")
else:
print(f"Service check failed: {response.status_code}")
except Exception as e:
print(f"Request failed: {e}")
# Fetch metadata (common in cloud environments)
def get_instance_metadata():
"""Get instance metadata from cloud provider."""
try:
# Example: GCE metadata
response = get_request(
'http://metadata.google.internal/computeMetadata/v1/instance/id',
{'Metadata-Flavor': 'Google'}
)
return response.text
except:
return None
instance_id = get_instance_metadata()
if instance_id:
print(f"Instance ID: {instance_id}")Install with Tessl CLI
npx tessl i tessl/pypi-opencensus