CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-opencensus

A comprehensive observability framework providing distributed tracing, metrics collection, and statistics gathering capabilities for Python applications.

Pending
Overview
Eval results
Files

common.mddocs/

Common Utilities

Shared utilities including configuration management, resource detection, scheduling, HTTP handling, and general helper functions used across the OpenCensus ecosystem.

Capabilities

Dynamic Configuration

Load and configure OpenCensus components dynamically using string expressions and namespace-based imports.

def load(expr):
    """
    Dynamically import OpenCensus components and evaluate configuration expressions.
    
    Supports dot-notation imports and component instantiation from string expressions.
    
    Parameters:
    - expr: str, import expression (e.g., 'opencensus.trace.samplers.ProbabilitySampler')
    
    Returns:
    object: Imported module, class, or function
    """

class Namespace:
    """
    Dynamic import helper for OpenCensus configuration.
    
    Parameters:
    - name: str, namespace name
    - parent: Namespace, parent namespace (optional)
    """
    def __init__(self, name, parent=None): ...
    
    def __getattr__(self, item):
        """
        Dynamic attribute access for importing submodules.
        
        Parameters:
        - item: str, attribute/module name
        
        Returns:
        Namespace: Child namespace for further traversal
        """
    
    def __str__(self):
        """String representation of namespace path."""
    
    def __call__(self, *args, **kwargs):
        """
        Instantiate the referenced class/function.
        
        Parameters:
        - *args: positional arguments
        - **kwargs: keyword arguments
        
        Returns:
        object: Instantiated object or function result
        """
    
    @classmethod
    def eval(cls, expr):
        """
        Evaluate configuration expression.
        
        Parameters:
        - expr: str, configuration expression
        
        Returns:
        object: Evaluated result
        """

String and Time Utilities

General-purpose utilities for string processing, time handling, and data manipulation.

# Constants
UTF8 = 'utf-8'
"""str: UTF-8 encoding constant"""

MAX_LENGTH = 128
"""int: Default maximum string length"""

ISO_DATETIME_REGEX = '%Y-%m-%dT%H:%M:%S.%fZ'
"""str: ISO 8601 datetime format string"""

def get_truncatable_str(str_to_convert):
    """
    Truncate string and record byte count for OpenCensus limits.
    
    Parameters:
    - str_to_convert: str, string to process
    
    Returns:
    dict: Dictionary with 'value' (truncated string) and 'truncated_byte_count'
    """

def check_str_length(str_to_check, limit=MAX_LENGTH):
    """
    Check and truncate string length if necessary.
    
    Parameters:
    - str_to_check: str, string to validate
    - limit: int, maximum allowed length (default: 128)
    
    Returns:
    str: Original string if within limit, truncated string otherwise
    """

def to_iso_str(ts=None):
    """
    Get ISO 8601 string for UTC datetime.
    
    Parameters:
    - ts: datetime, timestamp to convert (default: current UTC time)
    
    Returns:
    str: ISO 8601 formatted datetime string
    """

def timestamp_to_microseconds(timestamp):
    """
    Convert timestamp string to microseconds since epoch.
    
    Parameters:
    - timestamp: str, ISO 8601 timestamp string
    
    Returns:
    int: Microseconds since Unix epoch
    """

Collection Utilities

Helper functions for working with iterables, including uniqueness filtering and windowing operations.

def iuniq(ible):
    """
    Iterator over unique items while preserving order.
    
    Parameters:
    - ible: iterable, input sequence
    
    Yields:
    object: Unique items from input sequence
    """

def uniq(ible):
    """
    List of unique items while preserving order.
    
    Parameters:
    - ible: iterable, input sequence
    
    Returns:
    list: List containing unique items from input
    """

def window(ible, length):
    """
    Split iterable into lists of specified length.
    
    Parameters:
    - ible: iterable, input sequence
    - length: int, window size
    
    Returns:
    list: List of lists, each containing 'length' items
    """

def get_weakref(func):
    """
    Get weak reference to bound or unbound function.
    
    Handles both bound methods and regular functions, providing
    weak references that don't prevent garbage collection.
    
    Parameters:
    - func: callable, function or method to reference
    
    Returns:
    weakref: Weak reference to the function
    """

Resource Detection

Automatic detection of runtime environment and resource metadata for cloud platforms and containerized environments.

class Resource:
    """
    Entity description for signal reporting with type and labels.
    
    Parameters:
    - type_: str, resource type identifier
    - labels: dict, key-value resource labels
    """
    def __init__(self, type_=None, labels=None): ...
    
    def get_type(self):
        """
        Get resource type.
        
        Returns:
        str: Resource type identifier
        """
    
    def get_labels(self):
        """
        Get resource labels.
        
        Returns:
        dict: Resource labels dictionary
        """
    
    def merge(self, other):
        """
        Merge with another resource, combining labels.
        
        Parameters:
        - other: Resource, resource to merge with
        
        Returns:
        Resource: New resource with merged labels
        """

def merge_resources(resource_list):
    """
    Merge multiple resources into a single resource.
    
    Parameters:
    - resource_list: list, list of Resource objects
    
    Returns:
    Resource: Merged resource with combined labels
    """

def check_ascii_256(string):
    """
    Validate ASCII printable characters and length constraints.
    
    Parameters:
    - string: str, string to validate
    
    Returns:
    bool: True if string contains only ASCII printable chars and is <= 256 chars
    """

def unquote(string):
    """
    Strip surrounding quotes from string.
    
    Parameters:
    - string: str, potentially quoted string
    
    Returns:
    str: String with surrounding quotes removed
    """

def parse_labels(labels_str):
    """
    Parse label key-value pairs from string format.
    
    Parses comma-separated key=value pairs, handling quoted values.
    
    Parameters:
    - labels_str: str, label string (e.g., "key1=value1,key2=value2")
    
    Returns:
    dict: Parsed labels dictionary
    """

def get_from_env():
    """
    Get resource from environment variables.
    
    Reads OC_RESOURCE_TYPE and OC_RESOURCE_LABELS environment variables
    to construct a Resource object.
    
    Returns:
    Resource: Resource constructed from environment variables
    """

# Environment variable constants
OC_RESOURCE_TYPE = 'OC_RESOURCE_TYPE'
"""str: Environment variable for resource type"""

OC_RESOURCE_LABELS = 'OC_RESOURCE_LABELS'
"""str: Environment variable for resource labels"""

def is_gce_environment():
    """
    Check if running on Google Compute Engine.
    
    Returns:
    bool: True if running on GCE
    """

def is_aws_environment():
    """
    Check if running on AWS EC2.
    
    Returns:
    bool: True if running on AWS EC2
    """

def get_instance():
    """
    Get resource for current cloud environment.
    
    Automatically detects cloud platform and returns appropriate
    resource with platform-specific metadata.
    
    Returns:
    Resource: Resource representing current environment
    """

Scheduling and Background Tasks

Thread-based scheduling and queue management for background processing and periodic tasks.

class PeriodicTask:
    """
    Periodic thread execution for background tasks.
    
    Parameters:
    - interval: float, execution interval in seconds
    - function: callable, function to execute periodically
    - args: tuple, function arguments (optional)
    - kwargs: dict, function keyword arguments (optional)
    - name: str, task name for identification (optional)
    """
    def __init__(self, interval, function, args=None, kwargs=None, name=None): ...
    
    def run(self):
        """Start periodic execution in background thread."""
    
    def cancel(self):
        """Cancel periodic execution and stop background thread."""

class QueueEvent:
    """
    Thread synchronization event for queue coordination.
    
    Parameters:
    - name: str, event name for identification
    """
    def __init__(self, name): ...
    
    def set(self):
        """Set the event, releasing waiting threads."""
    
    def wait(self, timeout=None):
        """
        Wait for event to be set.
        
        Parameters:
        - timeout: float, maximum wait time in seconds (optional)
        
        Returns:
        bool: True if event was set, False if timeout occurred
        """

class QueueExitEvent:
    """
    Special event for signaling queue shutdown.
    
    Parameters:
    - name: str, event name for identification
    """
    def __init__(self, name): ...

class Queue:
    """
    Thread-safe queue with capacity limits and bulk operations.
    
    Parameters:
    - capacity: int, maximum queue capacity
    """
    def __init__(self, capacity): ...
    
    def gets(self, count, timeout):
        """
        Get multiple items from queue.
        
        Parameters:
        - count: int, maximum number of items to retrieve
        - timeout: float, maximum wait time in seconds
        
        Returns:
        list: Retrieved items (may be fewer than requested)
        """
    
    def is_empty(self):
        """
        Check if queue is empty.
        
        Returns:
        bool: True if queue contains no items
        """
    
    def flush(self, timeout=None):
        """
        Remove and return all items from queue.
        
        Parameters:
        - timeout: float, maximum wait time in seconds (optional)
        
        Returns:
        list: All items that were in the queue
        """
    
    def put(self, item, block=True, timeout=None):
        """
        Put item in queue.
        
        Parameters:
        - item: object, item to add to queue
        - block: bool, whether to block if queue is full
        - timeout: float, maximum wait time if blocking
        """
    
    def puts(self, items, block=True, timeout=None):
        """
        Put multiple items in queue.
        
        Parameters:
        - items: list, items to add to queue
        - block: bool, whether to block if queue is full
        - timeout: float, maximum wait time if blocking
        """

HTTP Utilities

Simple HTTP client functionality for internal OpenCensus communication and metadata retrieval.

def get_request(request_url, request_headers=dict()):
    """
    Execute HTTP GET request with timeout and error handling.
    
    Parameters:
    - request_url: str, URL to request
    - request_headers: dict, HTTP headers (default: empty dict)
    
    Returns:
    requests.Response: HTTP response object
    """

# Constants
_REQUEST_TIMEOUT = 2
"""int: Default HTTP request timeout in seconds"""

Compatibility Utilities

Backports and compatibility layers for supporting different Python versions and environments.

class WeakMethod:
    """
    Weak reference to bound methods (Python 2.6 compatibility).
    
    Provides weak references to bound methods that don't prevent
    garbage collection of the target object.
    
    Parameters:
    - meth: method, bound method to reference
    - callback: callable, cleanup callback (optional)
    """
    def __init__(self, meth, callback=None): ...

Usage Examples

Dynamic Configuration

from opencensus.common.configuration import load, Namespace

# Load components dynamically from string expressions
sampler_class = load('opencensus.trace.samplers.ProbabilitySampler')
sampler = sampler_class(rate=0.1)

exporter_class = load('opencensus.trace.exporters.print_exporter.PrintExporter')
exporter = exporter_class()

# Use namespace for fluent configuration
oc = Namespace('opencensus')
tracer = oc.trace.tracer.Tracer(
    sampler=oc.trace.samplers.ProbabilitySampler(rate=0.5),
    exporter=oc.trace.exporters.print_exporter.PrintExporter()
)

# Configuration from strings (useful for config files)
config_expr = 'opencensus.trace.samplers.AlwaysOnSampler()'
sampler = Namespace.eval(config_expr)

Resource Detection

from opencensus.common.resource import get_from_env, get_instance
from opencensus.common.monitored_resource import is_gce_environment, is_aws_environment
import os

# Auto-detect current environment
if is_gce_environment():
    print("Running on Google Compute Engine")
    resource = get_instance()
    print(f"Resource type: {resource.get_type()}")
    print(f"Resource labels: {resource.get_labels()}")

elif is_aws_environment():
    print("Running on AWS EC2")
    resource = get_instance()
    print(f"Resource type: {resource.get_type()}")
    print(f"Resource labels: {resource.get_labels()}")

else:
    print("Running on unknown platform")

# Get resource from environment variables
os.environ['OC_RESOURCE_TYPE'] = 'k8s_container'
os.environ['OC_RESOURCE_LABELS'] = 'cluster_name=prod,namespace=default,pod_name=app-123'

env_resource = get_from_env()
print(f"Environment resource: {env_resource.get_type()}")
print(f"Environment labels: {env_resource.get_labels()}")

# Merge resources
from opencensus.common.resource import Resource, merge_resources

app_resource = Resource('application', {'name': 'my-app', 'version': '1.0'})
platform_resource = get_instance()

merged = merge_resources([app_resource, platform_resource])
print(f"Merged resource: {merged.get_labels()}")

Periodic Background Tasks

from opencensus.common.schedule import PeriodicTask, Queue, QueueEvent
import time
import threading

# Simple periodic task
def heartbeat():
    print(f"Heartbeat at {time.time()}")

# Create and start periodic task
task = PeriodicTask(
    interval=5.0,  # Every 5 seconds
    function=heartbeat,
    name="heartbeat_task"
)

task.run()

# Let it run for 30 seconds
time.sleep(30)

# Stop the task
task.cancel()

# Advanced example with queue processing
work_queue = Queue(capacity=1000)
stop_event = QueueEvent("stop_processing")

def worker():
    """Background worker that processes queue items."""
    while not stop_event.wait(timeout=1.0):
        try:
            # Get up to 10 items at once
            items = work_queue.gets(count=10, timeout=1.0)
            
            if items:
                print(f"Processing {len(items)} items")
                for item in items:
                    process_item(item)
                
        except Exception as e:
            print(f"Worker error: {e}")

def process_item(item):
    """Process individual work item."""
    print(f"Processing: {item}")
    time.sleep(0.1)  # Simulate work

# Start background worker
worker_thread = threading.Thread(target=worker)
worker_thread.start()

# Add work items
for i in range(50):
    work_queue.put(f"work_item_{i}")

# Let worker process items
time.sleep(10)

# Signal shutdown and wait
stop_event.set()
worker_thread.join()

print("All work completed")

String and Data Utilities

from opencensus.common.utils import (
    get_truncatable_str, check_str_length, to_iso_str, 
    timestamp_to_microseconds, uniq, window
)
from datetime import datetime

# String truncation with byte counting
long_string = "A" * 200
result = get_truncatable_str(long_string)
print(f"Truncated: {len(result['value'])} chars")
print(f"Bytes truncated: {result['truncated_byte_count']}")

# Length checking
test_strings = ["short", "a" * 100, "a" * 150]
for s in test_strings:
    checked = check_str_length(s, limit=128)
    print(f"'{s[:10]}...' -> {len(checked)} chars")

# Time utilities
now = datetime.utcnow()
iso_string = to_iso_str(now)
print(f"ISO format: {iso_string}")

microseconds = timestamp_to_microseconds(iso_string)
print(f"Microseconds: {microseconds}")

# Collection utilities
data = [1, 2, 2, 3, 1, 4, 5, 3]
unique_data = uniq(data)
print(f"Original: {data}")
print(f"Unique: {unique_data}")

# Window data for batch processing
windowed = window(range(15), length=4)
print(f"Windowed: {windowed}")
# Output: [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14]]

HTTP Requests

from opencensus.common.http_handler import get_request

# Make HTTP request with timeout
try:
    response = get_request(
        'https://api.example.com/health',
        {'User-Agent': 'OpenCensus/1.0', 'Accept': 'application/json'}
    )
    
    if response.status_code == 200:
        print("Service is healthy")
        print(f"Response: {response.json()}")
    else:
        print(f"Service check failed: {response.status_code}")
        
except Exception as e:
    print(f"Request failed: {e}")

# Fetch metadata (common in cloud environments)
def get_instance_metadata():
    """Get instance metadata from cloud provider."""
    try:
        # Example: GCE metadata
        response = get_request(
            'http://metadata.google.internal/computeMetadata/v1/instance/id',
            {'Metadata-Flavor': 'Google'}
        )
        return response.text
    except:
        return None

instance_id = get_instance_metadata()
if instance_id:
    print(f"Instance ID: {instance_id}")

Install with Tessl CLI

npx tessl i tessl/pypi-opencensus

docs

common.md

exporters.md

index.md

logging.md

metrics-stats.md

tags-context.md

tracing.md

tile.json