The Datadog Python library provides tools for interacting with Datadog's monitoring platform through HTTP API client functionality, DogStatsD metrics client, and command-line tools.
—
High-performance StatsD client for submitting metrics, events, and service checks to DogStatsD. Supports multiple transport protocols (UDP, Unix Domain Sockets), buffering, aggregation, and automatic telemetry injection for efficient real-time monitoring.
Submit various metric types with tags, sampling, and timing information for comprehensive application monitoring.
class DogStatsd:
def __init__(
self,
host="localhost",
port=8125,
max_buffer_size=None,
namespace=None,
constant_tags=None,
use_ms=False,
use_default_route=False,
socket_path=None,
default_sample_rate=1,
disable_telemetry=False,
telemetry_min_flush_interval=10,
max_buffer_len=0,
container_id=None,
origin_detection_enabled=True,
cardinality=None
):
"""
Initialize DogStatsD client.
Parameters:
- host (str): StatsD server hostname
- port (int): StatsD server port
- max_buffer_size (int): Maximum UDP packet size in bytes
- namespace (str): Prefix for all metric names
- constant_tags (list): Tags applied to all metrics
- use_ms (bool): Use milliseconds for timing metrics
- use_default_route (bool): Dynamically set host to default route
- socket_path (str): Unix domain socket path (overrides host/port)
- default_sample_rate (float): Default sampling rate (0.0-1.0)
- disable_telemetry (bool): Disable client telemetry
- telemetry_min_flush_interval (int): Minimum telemetry flush interval
- max_buffer_len (int): Maximum buffer length before flush
- container_id (str): Container ID for origin detection
- origin_detection_enabled (bool): Enable origin detection
- cardinality (str): Cardinality level for metrics
"""
def gauge(self, metric, value, tags=None, sample_rate=None, cardinality=None):
"""
Submit gauge metric (current value).
Parameters:
- metric (str): Metric name
- value (float): Current value
- tags (list): List of tags in "key:value" format
- sample_rate (float): Sampling rate (0.0-1.0, default: None)
- cardinality (str): Cardinality level override
"""
def gauge_with_timestamp(self, metric, value, timestamp, tags=None, sample_rate=None, cardinality=None):
"""
Submit gauge metric with explicit timestamp.
Parameters:
- metric (str): Metric name
- value (float): Current value
- timestamp (int): Unix timestamp in seconds
- tags (list): List of tags in "key:value" format
- sample_rate (float): Sampling rate (0.0-1.0, default: None)
- cardinality (str): Cardinality level override
"""
def increment(self, metric, value=1, tags=None, sample_rate=None, cardinality=None):
"""
Increment counter metric.
Parameters:
- metric (str): Metric name
- value (int): Increment amount (default: 1)
- tags (list): List of tags
- sample_rate (float): Sampling rate (0.0-1.0, default: None)
- cardinality (str): Cardinality level override
"""
def decrement(self, metric, value=1, tags=None, sample_rate=None, cardinality=None):
"""
Decrement counter metric.
Parameters:
- metric (str): Metric name
- value (int): Decrement amount (default: 1)
- tags (list): List of tags
- sample_rate (float): Sampling rate (0.0-1.0, default: None)
- cardinality (str): Cardinality level override
"""
def count(self, metric, value, tags=None, sample_rate=None, cardinality=None):
"""
Submit count metric (aggregated over flush interval).
Parameters:
- metric (str): Metric name
- value (int): Count value
- tags (list): List of tags
- sample_rate (float): Sampling rate (0.0-1.0, default: None)
- cardinality (str): Cardinality level override
"""
def histogram(self, metric, value, tags=None, sample_rate=None, cardinality=None):
"""
Submit histogram metric for statistical analysis.
Parameters:
- metric (str): Metric name
- value (float): Value to add to histogram
- tags (list): List of tags
- sample_rate (float): Sampling rate (0.0-1.0, default: None)
- cardinality (str): Cardinality level override
"""
def distribution(self, metric, value, tags=None, sample_rate=None, cardinality=None):
"""
Submit distribution metric for global statistical analysis.
Parameters:
- metric (str): Metric name
- value (float): Value to add to distribution
- tags (list): List of tags
- sample_rate (float): Sampling rate (0.0-1.0, default: None)
- cardinality (str): Cardinality level override
"""
def timing(self, metric, value, tags=None, sample_rate=None, cardinality=None):
"""
Submit timing metric in milliseconds.
Parameters:
- metric (str): Metric name
- value (float): Time duration in milliseconds
- tags (list): List of tags
- sample_rate (float): Sampling rate (0.0-1.0, default: None)
- cardinality (str): Cardinality level override
"""
def set(self, metric, value, tags=None, sample_rate=None, cardinality=None):
"""
Submit set metric (count unique values).
Parameters:
- metric (str): Metric name
- value (str): Unique value to count
- tags (list): List of tags
- sample_rate (float): Sampling rate
"""Submit custom events and service health status for monitoring application state and significant occurrences.
class DogStatsd:
def event(
self,
title,
text,
alert_type="info",
aggregation_key=None,
source_type_name=None,
date_happened=None,
priority="normal",
tags=None,
hostname=None
):
"""
Submit custom event.
Parameters:
- title (str): Event title
- text (str): Event description
- alert_type (str): 'error', 'warning', 'info', or 'success'
- aggregation_key (str): Key for grouping related events
- source_type_name (str): Source type (e.g., 'my_app')
- date_happened (int): Unix timestamp when event occurred
- priority (str): 'normal' or 'low'
- tags (list): List of tags
- hostname (str): Host name for the event
"""
def service_check(
self,
check_name,
status,
tags=None,
timestamp=None,
hostname=None,
message=None
):
"""
Submit service check status.
Parameters:
- check_name (str): Name of the service check
- status (int): Check status (0=OK, 1=WARNING, 2=CRITICAL, 3=UNKNOWN)
- tags (list): List of tags
- timestamp (int): Unix timestamp for the check time
- hostname (str): Host name for the check
- message (str): Additional message for the check
"""Use decorators and context managers for automatic timing and distribution measurement.
class DogStatsd:
def timed(self, metric=None, tags=None, sample_rate=1, use_ms=None):
"""
Timing decorator for measuring function execution time.
Parameters:
- metric (str): Metric name (defaults to function name)
- tags (list): List of tags
- sample_rate (float): Sampling rate
- use_ms (bool): Use milliseconds (overrides client setting)
Returns:
Decorator function
Usage:
@statsd.timed('my_function.duration')
def my_function():
pass
"""
def distributed(self, metric=None, tags=None, sample_rate=1):
"""
Distribution decorator for measuring function execution time.
Parameters:
- metric (str): Metric name (defaults to function name)
- tags (list): List of tags
- sample_rate (float): Sampling rate
Returns:
Decorator function
Usage:
@statsd.distributed('my_function.duration')
def my_function():
pass
"""Control client behavior, buffer management, and connection handling for optimal performance.
class DogStatsd:
def flush(self):
"""
Manually flush buffered metrics to StatsD server.
"""
def close_socket(self):
"""
Close the UDP socket connection.
"""
def enable_aggregation(self, flush_interval=0.3, max_samples_per_context=0):
"""
Enable client-side metric aggregation.
Parameters:
- flush_interval (float): Aggregation flush interval in seconds
- max_samples_per_context (int): Max samples per metric context
"""
def disable_aggregation(self):
"""
Disable client-side metric aggregation.
"""
def enable_background_sender(self):
"""
Enable background thread for metric sending.
"""
def disable_background_sender(self):
"""
Disable background thread sending.
"""
def wait_for_pending(self):
"""
Wait for all pending metrics to be sent.
"""Pre-configured global instance for immediate use without initialization.
# Global statsd instance
statsd = DogStatsd()
# Use directly without initialization
statsd.increment('web.requests')
statsd.gauge('system.memory.usage', 75.2)
statsd.timing('db.query.time', 142)from datadog import initialize, statsd
# Initialize Datadog (configures global statsd instance)
initialize(
statsd_host='localhost',
statsd_port=8125,
statsd_constant_tags=['env:production', 'service:web']
)
# Submit various metrics
statsd.increment('web.requests', tags=['endpoint:/api/users'])
statsd.gauge('system.cpu.usage', 75.5, tags=['host:web01'])
statsd.histogram('response.time', 245.7, tags=['endpoint:/api/users'])
statsd.timing('db.query.duration', 89, tags=['table:users', 'operation:select'])
# Submit event
statsd.event(
'Deployment completed',
'Version 1.2.3 deployed successfully',
alert_type='success',
tags=['version:1.2.3', 'env:production']
)
# Submit service check
statsd.service_check(
'database.connection',
0, # OK status
tags=['db:postgresql', 'host:db01'],
message='Database connection healthy'
)from datadog.dogstatsd import DogStatsd
# Create custom client with specific configuration
custom_statsd = DogStatsd(
host='statsd.internal.com',
port=8125,
namespace='myapp',
constant_tags=['service:api', 'version:1.0'],
max_buffer_size=1024,
default_sample_rate=0.1, # Sample 10% of metrics
use_ms=True # Use milliseconds for timing
)
# All metrics will be prefixed with 'myapp.' and include constant tags
custom_statsd.increment('requests.count') # Sends: myapp.requests.count
custom_statsd.timing('request.duration', 250) # Value in millisecondsfrom datadog import statsd
# Time function execution automatically
@statsd.timed('function.process_data.duration', tags=['version:v2'])
def process_data(data):
# Function implementation
time.sleep(0.1) # Simulated work
return len(data)
# Use distribution for more detailed statistics
@statsd.distributed('function.calculate.time')
def expensive_calculation(x, y):
# Complex calculation
result = sum(i * x * y for i in range(1000))
return result
# Function calls automatically submit timing metrics
result = process_data([1, 2, 3, 4, 5])
calc_result = expensive_calculation(10, 20)import time
from datadog import statsd
# Time a code block
with statsd.timed('database.backup.duration', tags=['type:full']):
# Backup operations
time.sleep(5) # Simulated backup time
print("Backup completed")
# The timing metric is automatically submitted when exiting the contextfrom datadog.dogstatsd import DogStatsd
# Create client with aggregation enabled
aggregated_statsd = DogStatsd(
host='localhost',
port=8125,
max_buffer_len=50, # Buffer up to 50 metrics
disable_telemetry=False
)
# Enable aggregation for high-throughput scenarios
aggregated_statsd.enable_aggregation(
flush_interval=1.0, # Flush every second
max_samples_per_context=100 # Max 100 samples per metric
)
# Submit many metrics rapidly - they'll be aggregated
for i in range(1000):
aggregated_statsd.increment('high_volume.counter', tags=[f'iteration:{i%10}'])
aggregated_statsd.gauge('random.value', random.random() * 100)
# Manually flush if needed
aggregated_statsd.flush()
# Clean shutdown
aggregated_statsd.wait_for_pending()
aggregated_statsd.close_socket()from datadog.dogstatsd import DogStatsd
# Use Unix Domain Socket for better performance
socket_statsd = DogStatsd(
socket_path='/var/run/datadog/dsd.socket',
constant_tags=['transport:uds']
)
# Submit metrics via UDS
socket_statsd.increment('uds.test.counter')
socket_statsd.gauge('uds.test.gauge', 42.0)from datadog import statsd
import logging
# StatsD operations are fire-and-forget by design
# but you can add error handling for critical metrics
def safe_metric_submit():
try:
statsd.increment('critical.business.metric')
statsd.gauge('critical.system.health', get_health_score())
except Exception as e:
# Log error but don't block application
logging.warning(f"Failed to submit metrics: {e}")
# Application continues regardless of metric submission success
# Handle sampling for high-volume metrics
def submit_high_volume_metric(value):
# Only submit 1% of metrics to reduce load
statsd.histogram('high_volume.metric', value, sample_rate=0.01)
# The sample_rate tells StatsD to multiply the received values
# to estimate the true volume# Good: Use hierarchical naming with dots
statsd.increment('web.requests.success')
statsd.increment('web.requests.error')
statsd.gauge('system.memory.usage')
statsd.timing('database.query.users.select')
# Avoid: Inconsistent or flat naming
statsd.increment('web_success') # Inconsistent separator
statsd.increment('requests') # Too generic# Good: Use tags for dimensions, not metric names
statsd.increment('web.requests', tags=[
'endpoint:/api/users',
'method:GET',
'status:200',
'region:us-east-1'
])
# Avoid: Encoding dimensions in metric names
statsd.increment('web.requests.api.users.GET.200.us_east_1') # Creates many metrics# Use sampling for metrics that fire very frequently
statsd.increment('trace.span.created', sample_rate=0.1) # Sample 10%
statsd.timing('cache.access.time', duration, sample_rate=0.05) # Sample 5%
# Don't sample critical business metrics
statsd.increment('payment.processed') # Always submit (sample_rate=1.0)
statsd.gauge('service.health.score', score) # Always submitInstall with Tessl CLI
npx tessl i tessl/pypi-datadog