The Datadog Python library provides tools for interacting with Datadog's monitoring platform through HTTP API client functionality, DogStatsD metrics client, and command-line tools.
—
Thread-safe metrics collection system that aggregates metrics in background threads without hindering application performance. Ideal for high-throughput applications where metrics submission must not impact critical application threads.
Collect metrics from multiple threads safely with automatic aggregation and background flushing to prevent performance degradation.
class ThreadStats:
def __init__(self, namespace="", constant_tags=None, compress_payload=False):
"""
Initialize ThreadStats client.
Parameters:
- namespace (str): Namespace to prefix all metric names (default: "")
- constant_tags (list): Tags to attach to every metric reported by this client (default: None)
- compress_payload (bool): Compress the payload using zlib (default: False)
"""
def start(
self,
flush_interval=10,
roll_up_interval=10,
device=None,
flush_in_thread=True,
flush_in_greenlet=False,
disabled=False
):
"""
Start background metrics collection thread.
Parameters:
- flush_interval (int): The number of seconds to wait between flushes (default: 10)
- roll_up_interval (int): Roll up interval for metrics aggregation (default: 10)
- device: Device parameter for metrics (default: None)
- flush_in_thread (bool): True if you'd like to spawn a thread to flush metrics (default: True)
- flush_in_greenlet (bool): Set to true if you'd like to flush in a gevent greenlet (default: False)
- disabled (bool): Disable metrics collection (default: False)
"""
def stop(self):
"""
Stop background collection thread and flush remaining metrics.
"""Submit various metric types with automatic thread-safe aggregation and batching.
class ThreadStats:
def gauge(self, metric, value, tags=None, sample_rate=1, timestamp=None):
"""
Submit gauge metric (current value).
Parameters:
- metric (str): Metric name
- value (float): Current gauge value
- tags (list): List of tags in "key:value" format
- sample_rate (float): Sampling rate (0.0-1.0)
- timestamp (int): Unix timestamp (optional)
"""
def increment(self, metric, value=1, tags=None, sample_rate=1):
"""
Increment counter metric.
Parameters:
- metric (str): Metric name
- value (int): Increment amount (default: 1)
- tags (list): List of tags
- sample_rate (float): Sampling rate
"""
def decrement(self, metric, value=1, tags=None, sample_rate=1):
"""
Decrement counter metric.
Parameters:
- metric (str): Metric name
- value (int): Decrement amount (default: 1)
- tags (list): List of tags
- sample_rate (float): Sampling rate
"""
def histogram(self, metric, value, tags=None, sample_rate=1):
"""
Submit histogram metric for statistical analysis.
Parameters:
- metric (str): Metric name
- value (float): Value to add to histogram
- tags (list): List of tags
- sample_rate (float): Sampling rate
"""
def distribution(self, metric, value, tags=None, sample_rate=1):
"""
Submit distribution metric for global statistical analysis.
Parameters:
- metric (str): Metric name
- value (float): Value to add to distribution
- tags (list): List of tags
- sample_rate (float): Sampling rate
"""
def timing(self, metric, value, tags=None, sample_rate=1):
"""
Submit timing metric in milliseconds.
Parameters:
- metric (str): Metric name
- value (float): Time duration in milliseconds
- tags (list): List of tags
- sample_rate (float): Sampling rate
"""
def set(self, metric, value, tags=None, sample_rate=1):
"""
Submit set metric (count unique values).
Parameters:
- metric (str): Metric name
- value (str): Unique value to count
- tags (list): List of tags
- sample_rate (float): Sampling rate
"""
def event(
self,
title,
text,
alert_type="info",
aggregation_key=None,
source_type_name=None,
date_happened=None,
priority="normal",
tags=None
):
"""
Submit custom event.
Parameters:
- title (str): Event title
- text (str): Event description
- alert_type (str): 'error', 'warning', 'info', or 'success'
- aggregation_key (str): Key for grouping related events
- source_type_name (str): Source type identifier
- date_happened (int): Unix timestamp when event occurred
- priority (str): 'normal' or 'low'
- tags (list): List of tags
"""Context managers and decorators for automatic timing measurement without manual calculation.
class ThreadStats:
def timer(self, metric=None, tags=None, sample_rate=1):
"""
Context manager for timing code blocks.
Parameters:
- metric (str): Metric name for timing
- tags (list): List of tags
- sample_rate (float): Sampling rate
Returns:
Context manager that submits timing metric on exit
Usage:
with stats.timer('operation.duration'):
# Timed operation
pass
"""
def timed(self, metric=None, tags=None, sample_rate=1):
"""
Timing decorator for measuring function execution time.
Parameters:
- metric (str): Metric name (defaults to function name)
- tags (list): List of tags
- sample_rate (float): Sampling rate
Returns:
Decorator function
Usage:
@stats.timed('function.process.duration')
def process_data():
pass
"""Control metric aggregation, flushing behavior, and memory management for optimal performance.
class ThreadStats:
def flush(self, timestamp=None):
"""
Manually flush aggregated metrics to Datadog API.
Parameters:
- timestamp (int): Timestamp for the flush operation
"""Specialized functions for AWS Lambda environments with automatic context handling and optimized flushing.
def datadog_lambda_wrapper(lambda_func):
"""
Decorator for AWS Lambda functions to enable metrics collection.
Parameters:
- lambda_func (function): Lambda handler function
Returns:
Wrapped function with automatic metrics setup and flushing
Usage:
@datadog_lambda_wrapper
def lambda_handler(event, context):
lambda_metric('custom.metric', 1)
return {'statusCode': 200}
"""
def lambda_metric(metric_name, value, tags=None, timestamp=None):
"""
Submit metric from AWS Lambda function.
Parameters:
- metric_name (str): Metric name
- value (float): Metric value
- tags (list): List of tags
- timestamp (int): Unix timestamp
Note: Use with datadog_lambda_wrapper for automatic flushing
"""from datadog.threadstats import ThreadStats
import time
import threading
# Initialize ThreadStats client
stats = ThreadStats()
# Start background collection thread
stats.start(flush_interval=5, roll_up_interval=10)
# Submit metrics from multiple threads safely
def worker_thread(thread_id):
for i in range(100):
stats.increment('worker.task.completed', tags=[f'thread:{thread_id}'])
stats.gauge('worker.queue.size', 50 - i, tags=[f'thread:{thread_id}'])
stats.timing('worker.task.duration', 25 + thread_id * 5)
time.sleep(0.1)
# Create multiple worker threads
threads = []
for i in range(5):
t = threading.Thread(target=worker_thread, args=(i,))
threads.append(t)
t.start()
# Wait for all threads to complete
for t in threads:
t.join()
# Stop collection and flush remaining metrics
stats.stop()from datadog.threadstats import ThreadStats
import requests
stats = ThreadStats()
stats.start()
# Time different operations
with stats.timer('api.request.duration', tags=['endpoint:users']):
response = requests.get('https://api.example.com/users')
with stats.timer('database.query.time', tags=['table:orders', 'operation:select']):
# Simulated database query
time.sleep(0.05)
with stats.timer('file.processing.time', tags=['type:csv']):
# File processing operation
process_large_file('data.csv')
stats.stop()from datadog.threadstats import ThreadStats
stats = ThreadStats()
stats.start()
@stats.timed('data.processing.duration', tags=['version:v2'])
def process_user_data(user_data):
# Complex data processing
processed = []
for item in user_data:
# Processing logic
processed.append(transform_data(item))
return processed
@stats.timed('cache.operation.time')
def update_cache(key, value):
# Cache update operation
cache_client.set(key, value, ttl=3600)
return True
# Function calls automatically submit timing metrics
users = get_user_data()
result = process_user_data(users)
update_cache('processed_users', result)
stats.stop()from datadog.threadstats import ThreadStats
import concurrent.futures
import random
# Configure for high-throughput scenario
stats = ThreadStats(
flush_interval=2, # Flush every 2 seconds
roll_up_interval=5, # Aggregate over 5-second windows
flush_in_thread=True # Use background thread
)
stats.start()
def simulate_high_traffic():
"""Simulate high-volume application metrics."""
for _ in range(10000):
# Application metrics
stats.increment('app.requests.total')
stats.gauge('app.memory.usage', random.uniform(50, 90))
stats.histogram('app.response.time', random.uniform(10, 500))
# Business metrics
if random.random() < 0.1: # 10% chance
stats.increment('business.conversion.event')
if random.random() < 0.05: # 5% chance
stats.event(
'User signup',
'New user registration completed',
alert_type='info',
tags=['source:web', 'funnel:signup']
)
# Run simulation in multiple threads
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(simulate_high_traffic) for _ in range(10)]
concurrent.futures.wait(futures)
# Graceful shutdown with final flush
stats.stop()from datadog.threadstats.aws_lambda import datadog_lambda_wrapper, lambda_metric
import json
@datadog_lambda_wrapper
def lambda_handler(event, context):
"""AWS Lambda handler with automatic Datadog metrics."""
# Submit custom metrics
lambda_metric('lambda.invocation.count', 1, tags=['function:user-processor'])
lambda_metric('lambda.event.size', len(json.dumps(event)))
try:
# Process the event
result = process_event(event)
# Success metrics
lambda_metric('lambda.processing.success', 1)
lambda_metric('lambda.result.size', len(str(result)))
return {
'statusCode': 200,
'body': json.dumps(result)
}
except Exception as e:
# Error metrics
lambda_metric('lambda.processing.error', 1, tags=['error_type:processing'])
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def process_event(event):
"""Business logic for processing Lambda events."""
# Simulate processing time
lambda_metric('lambda.processing.duration', 250)
return {'processed': True, 'items': len(event.get('items', []))}from datadog.threadstats import ThreadStats
from flask import Flask, request
import time
app = Flask(__name__)
stats = ThreadStats()
stats.start()
@app.before_request
def before_request():
request.start_time = time.time()
stats.increment('web.request.started', tags=[
f'endpoint:{request.endpoint}',
f'method:{request.method}'
])
@app.after_request
def after_request(response):
# Calculate request duration
duration = (time.time() - request.start_time) * 1000 # Convert to ms
# Submit request metrics
stats.timing('web.request.duration', duration, tags=[
f'endpoint:{request.endpoint}',
f'method:{request.method}',
f'status:{response.status_code}'
])
stats.gauge('web.response.size', len(response.get_data()))
return response
@app.route('/api/users')
def get_users():
# Business logic metrics
stats.increment('api.users.request')
with stats.timer('database.query.users'):
users = fetch_users_from_db()
stats.gauge('api.users.returned', len(users))
return {'users': users}
if __name__ == '__main__':
try:
app.run()
finally:
stats.stop() # Ensure metrics are flushed on shutdownfrom datadog.threadstats import ThreadStats
# Custom configuration for specific use case
stats = ThreadStats(
api_key='your-api-key',
app_key='your-app-key',
host_name='custom-host',
flush_interval=30, # Flush every 30 seconds
roll_up_interval=60, # Aggregate over 1-minute windows
flush_in_thread=True, # Background flushing
device='container-1' # Custom device identifier
)
stats.start()
# Submit metrics with custom aggregation
for i in range(1000):
stats.increment('custom.counter', tags=['batch:hourly'])
stats.gauge('custom.processing.rate', i * 0.5)
# These will be aggregated over the roll_up_interval
if i % 100 == 0:
stats.event(
f'Batch checkpoint {i}',
f'Processed {i} items in current batch',
tags=['batch:hourly', f'checkpoint:{i}']
)
# Manual flush if needed before automatic interval
stats.flush()
stats.stop()# Good: One ThreadStats instance shared across threads
stats = ThreadStats()
stats.start()
def worker_function(worker_id):
# Safe to call from multiple threads
stats.increment('worker.processed', tags=[f'worker:{worker_id}'])
# Avoid: Creating multiple ThreadStats instances
# This wastes resources and can cause metric duplicationfrom datadog.threadstats import ThreadStats
class MetricsManager:
def __init__(self):
self.stats = ThreadStats()
def start(self):
self.stats.start()
def stop(self):
"""Ensure clean shutdown with metric flushing."""
self.stats.stop() # This flushes remaining metrics
def __enter__(self):
self.start()
return self.stats
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
# Use context manager for automatic cleanup
with MetricsManager() as stats:
stats.increment('app.started')
# Do application work
stats.increment('app.finished')
# Metrics are automatically flushed on exit# For AWS Lambda, use the wrapper and lambda_metric functions
from datadog.threadstats.aws_lambda import datadog_lambda_wrapper, lambda_metric
@datadog_lambda_wrapper
def lambda_handler(event, context):
# Don't create ThreadStats instances in Lambda
# Use lambda_metric instead for automatic lifecycle management
lambda_metric('lambda.execution', 1)
# Business logic
result = process_lambda_event(event)
lambda_metric('lambda.success', 1)
return resultInstall with Tessl CLI
npx tessl i tessl/pypi-datadog