CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-datadog

The Datadog Python library provides tools for interacting with Datadog's monitoring platform through HTTP API client functionality, DogStatsD metrics client, and command-line tools.

Pending
Overview
Eval results
Files

threadstats.mddocs/

ThreadStats

Thread-safe metrics collection system that aggregates metrics in background threads without hindering application performance. Ideal for high-throughput applications where metrics submission must not impact critical application threads.

Capabilities

Thread-Safe Metrics Collection

Collect metrics from multiple threads safely with automatic aggregation and background flushing to prevent performance degradation.

class ThreadStats:
    def __init__(self, namespace="", constant_tags=None, compress_payload=False):
        """
        Initialize ThreadStats client.
        
        Parameters:
        - namespace (str): Namespace to prefix all metric names (default: "")
        - constant_tags (list): Tags to attach to every metric reported by this client (default: None)
        - compress_payload (bool): Compress the payload using zlib (default: False)
        """

    def start(
        self,
        flush_interval=10,
        roll_up_interval=10,
        device=None,
        flush_in_thread=True,
        flush_in_greenlet=False,
        disabled=False
    ):
        """
        Start background metrics collection thread.
        
        Parameters:
        - flush_interval (int): The number of seconds to wait between flushes (default: 10)
        - roll_up_interval (int): Roll up interval for metrics aggregation (default: 10)
        - device: Device parameter for metrics (default: None)
        - flush_in_thread (bool): True if you'd like to spawn a thread to flush metrics (default: True)
        - flush_in_greenlet (bool): Set to true if you'd like to flush in a gevent greenlet (default: False)
        - disabled (bool): Disable metrics collection (default: False)
        """

    def stop(self):
        """
        Stop background collection thread and flush remaining metrics.
        """

Metric Submission Methods

Submit various metric types with automatic thread-safe aggregation and batching.

class ThreadStats:
    def gauge(self, metric, value, tags=None, sample_rate=1, timestamp=None):
        """
        Submit gauge metric (current value).
        
        Parameters:
        - metric (str): Metric name
        - value (float): Current gauge value
        - tags (list): List of tags in "key:value" format
        - sample_rate (float): Sampling rate (0.0-1.0)
        - timestamp (int): Unix timestamp (optional)
        """

    def increment(self, metric, value=1, tags=None, sample_rate=1):
        """
        Increment counter metric.
        
        Parameters:
        - metric (str): Metric name
        - value (int): Increment amount (default: 1)
        - tags (list): List of tags
        - sample_rate (float): Sampling rate
        """

    def decrement(self, metric, value=1, tags=None, sample_rate=1):
        """
        Decrement counter metric.
        
        Parameters:
        - metric (str): Metric name
        - value (int): Decrement amount (default: 1)
        - tags (list): List of tags
        - sample_rate (float): Sampling rate
        """

    def histogram(self, metric, value, tags=None, sample_rate=1):
        """
        Submit histogram metric for statistical analysis.
        
        Parameters:
        - metric (str): Metric name
        - value (float): Value to add to histogram
        - tags (list): List of tags
        - sample_rate (float): Sampling rate
        """

    def distribution(self, metric, value, tags=None, sample_rate=1):
        """
        Submit distribution metric for global statistical analysis.
        
        Parameters:
        - metric (str): Metric name
        - value (float): Value to add to distribution
        - tags (list): List of tags
        - sample_rate (float): Sampling rate
        """

    def timing(self, metric, value, tags=None, sample_rate=1):
        """
        Submit timing metric in milliseconds.
        
        Parameters:
        - metric (str): Metric name
        - value (float): Time duration in milliseconds
        - tags (list): List of tags
        - sample_rate (float): Sampling rate
        """

    def set(self, metric, value, tags=None, sample_rate=1):
        """
        Submit set metric (count unique values).
        
        Parameters:
        - metric (str): Metric name
        - value (str): Unique value to count
        - tags (list): List of tags
        - sample_rate (float): Sampling rate
        """

    def event(
        self,
        title,
        text,
        alert_type="info",
        aggregation_key=None,
        source_type_name=None,
        date_happened=None,
        priority="normal",
        tags=None
    ):
        """
        Submit custom event.
        
        Parameters:
        - title (str): Event title
        - text (str): Event description
        - alert_type (str): 'error', 'warning', 'info', or 'success'
        - aggregation_key (str): Key for grouping related events
        - source_type_name (str): Source type identifier
        - date_happened (int): Unix timestamp when event occurred
        - priority (str): 'normal' or 'low'
        - tags (list): List of tags
        """

Timing Utilities

Context managers and decorators for automatic timing measurement without manual calculation.

class ThreadStats:
    def timer(self, metric=None, tags=None, sample_rate=1):
        """
        Context manager for timing code blocks.
        
        Parameters:
        - metric (str): Metric name for timing
        - tags (list): List of tags
        - sample_rate (float): Sampling rate
        
        Returns:
        Context manager that submits timing metric on exit
        
        Usage:
        with stats.timer('operation.duration'):
            # Timed operation
            pass
        """

    def timed(self, metric=None, tags=None, sample_rate=1):
        """
        Timing decorator for measuring function execution time.
        
        Parameters:
        - metric (str): Metric name (defaults to function name)
        - tags (list): List of tags
        - sample_rate (float): Sampling rate
        
        Returns:
        Decorator function
        
        Usage:
        @stats.timed('function.process.duration')
        def process_data():
            pass
        """

Buffer Management

Control metric aggregation, flushing behavior, and memory management for optimal performance.

class ThreadStats:
    def flush(self, timestamp=None):
        """
        Manually flush aggregated metrics to Datadog API.
        
        Parameters:
        - timestamp (int): Timestamp for the flush operation
        """

AWS Lambda Integration

Specialized functions for AWS Lambda environments with automatic context handling and optimized flushing.

def datadog_lambda_wrapper(lambda_func):
    """
    Decorator for AWS Lambda functions to enable metrics collection.
    
    Parameters:
    - lambda_func (function): Lambda handler function
    
    Returns:
    Wrapped function with automatic metrics setup and flushing
    
    Usage:
    @datadog_lambda_wrapper
    def lambda_handler(event, context):
        lambda_metric('custom.metric', 1)
        return {'statusCode': 200}
    """

def lambda_metric(metric_name, value, tags=None, timestamp=None):
    """
    Submit metric from AWS Lambda function.
    
    Parameters:
    - metric_name (str): Metric name
    - value (float): Metric value
    - tags (list): List of tags
    - timestamp (int): Unix timestamp
    
    Note: Use with datadog_lambda_wrapper for automatic flushing
    """

Usage Examples

Basic ThreadStats Usage

from datadog.threadstats import ThreadStats
import time
import threading

# Initialize ThreadStats client
stats = ThreadStats()

# Start background collection thread
stats.start(flush_interval=5, roll_up_interval=10)

# Submit metrics from multiple threads safely
def worker_thread(thread_id):
    for i in range(100):
        stats.increment('worker.task.completed', tags=[f'thread:{thread_id}'])
        stats.gauge('worker.queue.size', 50 - i, tags=[f'thread:{thread_id}'])
        stats.timing('worker.task.duration', 25 + thread_id * 5)
        time.sleep(0.1)

# Create multiple worker threads
threads = []
for i in range(5):
    t = threading.Thread(target=worker_thread, args=(i,))
    threads.append(t)
    t.start()

# Wait for all threads to complete
for t in threads:
    t.join()

# Stop collection and flush remaining metrics
stats.stop()

Using Timer Context Manager

from datadog.threadstats import ThreadStats
import requests

stats = ThreadStats()
stats.start()

# Time different operations
with stats.timer('api.request.duration', tags=['endpoint:users']):
    response = requests.get('https://api.example.com/users')

with stats.timer('database.query.time', tags=['table:orders', 'operation:select']):
    # Simulated database query
    time.sleep(0.05)
    
with stats.timer('file.processing.time', tags=['type:csv']):
    # File processing operation
    process_large_file('data.csv')

stats.stop()

Function Timing with Decorators

from datadog.threadstats import ThreadStats

stats = ThreadStats()
stats.start()

@stats.timed('data.processing.duration', tags=['version:v2'])
def process_user_data(user_data):
    # Complex data processing
    processed = []
    for item in user_data:
        # Processing logic
        processed.append(transform_data(item))
    return processed

@stats.timed('cache.operation.time')
def update_cache(key, value):
    # Cache update operation
    cache_client.set(key, value, ttl=3600)
    return True

# Function calls automatically submit timing metrics
users = get_user_data()
result = process_user_data(users)
update_cache('processed_users', result)

stats.stop()

High-Throughput Metrics Collection

from datadog.threadstats import ThreadStats
import concurrent.futures
import random

# Configure for high-throughput scenario
stats = ThreadStats(
    flush_interval=2,      # Flush every 2 seconds
    roll_up_interval=5,    # Aggregate over 5-second windows
    flush_in_thread=True   # Use background thread
)

stats.start()

def simulate_high_traffic():
    """Simulate high-volume application metrics."""
    for _ in range(10000):
        # Application metrics
        stats.increment('app.requests.total')
        stats.gauge('app.memory.usage', random.uniform(50, 90))
        stats.histogram('app.response.time', random.uniform(10, 500))
        
        # Business metrics
        if random.random() < 0.1:  # 10% chance
            stats.increment('business.conversion.event')
        
        if random.random() < 0.05:  # 5% chance  
            stats.event(
                'User signup',
                'New user registration completed',
                alert_type='info',
                tags=['source:web', 'funnel:signup']
            )

# Run simulation in multiple threads
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(simulate_high_traffic) for _ in range(10)]
    concurrent.futures.wait(futures)

# Graceful shutdown with final flush
stats.stop()

AWS Lambda Integration

from datadog.threadstats.aws_lambda import datadog_lambda_wrapper, lambda_metric
import json

@datadog_lambda_wrapper
def lambda_handler(event, context):
    """AWS Lambda handler with automatic Datadog metrics."""
    
    # Submit custom metrics
    lambda_metric('lambda.invocation.count', 1, tags=['function:user-processor'])
    lambda_metric('lambda.event.size', len(json.dumps(event)))
    
    try:
        # Process the event
        result = process_event(event)
        
        # Success metrics
        lambda_metric('lambda.processing.success', 1)
        lambda_metric('lambda.result.size', len(str(result)))
        
        return {
            'statusCode': 200,
            'body': json.dumps(result)
        }
        
    except Exception as e:
        # Error metrics
        lambda_metric('lambda.processing.error', 1, tags=['error_type:processing'])
        
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

def process_event(event):
    """Business logic for processing Lambda events."""
    # Simulate processing time
    lambda_metric('lambda.processing.duration', 250)
    return {'processed': True, 'items': len(event.get('items', []))}

Integration with Web Frameworks

from datadog.threadstats import ThreadStats
from flask import Flask, request
import time

app = Flask(__name__)
stats = ThreadStats()
stats.start()

@app.before_request
def before_request():
    request.start_time = time.time()
    stats.increment('web.request.started', tags=[
        f'endpoint:{request.endpoint}',
        f'method:{request.method}'
    ])

@app.after_request  
def after_request(response):
    # Calculate request duration
    duration = (time.time() - request.start_time) * 1000  # Convert to ms
    
    # Submit request metrics
    stats.timing('web.request.duration', duration, tags=[
        f'endpoint:{request.endpoint}',
        f'method:{request.method}',
        f'status:{response.status_code}'
    ])
    
    stats.gauge('web.response.size', len(response.get_data()))
    
    return response

@app.route('/api/users')
def get_users():
    # Business logic metrics
    stats.increment('api.users.request')
    
    with stats.timer('database.query.users'):
        users = fetch_users_from_db()
    
    stats.gauge('api.users.returned', len(users))
    return {'users': users}

if __name__ == '__main__':
    try:
        app.run()
    finally:
        stats.stop()  # Ensure metrics are flushed on shutdown

Custom Aggregation Configuration

from datadog.threadstats import ThreadStats

# Custom configuration for specific use case
stats = ThreadStats(
    api_key='your-api-key',
    app_key='your-app-key',
    host_name='custom-host',
    flush_interval=30,     # Flush every 30 seconds
    roll_up_interval=60,   # Aggregate over 1-minute windows
    flush_in_thread=True,  # Background flushing
    device='container-1'   # Custom device identifier
)

stats.start()

# Submit metrics with custom aggregation
for i in range(1000):
    stats.increment('custom.counter', tags=['batch:hourly'])
    stats.gauge('custom.processing.rate', i * 0.5)
    
    # These will be aggregated over the roll_up_interval
    if i % 100 == 0:
        stats.event(
            f'Batch checkpoint {i}',
            f'Processed {i} items in current batch',
            tags=['batch:hourly', f'checkpoint:{i}']
        )

# Manual flush if needed before automatic interval
stats.flush()

stats.stop()

Best Practices

Thread Safety and Performance

# Good: One ThreadStats instance shared across threads
stats = ThreadStats()
stats.start()

def worker_function(worker_id):
    # Safe to call from multiple threads
    stats.increment('worker.processed', tags=[f'worker:{worker_id}'])
    
# Avoid: Creating multiple ThreadStats instances
# This wastes resources and can cause metric duplication

Proper Lifecycle Management

from datadog.threadstats import ThreadStats

class MetricsManager:
    def __init__(self):
        self.stats = ThreadStats()
        
    def start(self):
        self.stats.start()
        
    def stop(self):
        """Ensure clean shutdown with metric flushing."""
        self.stats.stop()  # This flushes remaining metrics
        
    def __enter__(self):
        self.start()
        return self.stats
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.stop()

# Use context manager for automatic cleanup
with MetricsManager() as stats:
    stats.increment('app.started')
    # Do application work
    stats.increment('app.finished')
# Metrics are automatically flushed on exit

Lambda-Specific Considerations

# For AWS Lambda, use the wrapper and lambda_metric functions
from datadog.threadstats.aws_lambda import datadog_lambda_wrapper, lambda_metric

@datadog_lambda_wrapper
def lambda_handler(event, context):
    # Don't create ThreadStats instances in Lambda
    # Use lambda_metric instead for automatic lifecycle management
    lambda_metric('lambda.execution', 1)
    
    # Business logic
    result = process_lambda_event(event)
    
    lambda_metric('lambda.success', 1)
    return result

Install with Tessl CLI

npx tessl i tessl/pypi-datadog

docs

command-line-tools.md

configuration.md

dogstatsd-client.md

error-handling.md

http-api-client.md

index.md

threadstats.md

tile.json