CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-datadog

The Datadog Python library provides tools for interacting with Datadog's monitoring platform through HTTP API client functionality, DogStatsD metrics client, and command-line tools.

Pending
Overview
Eval results
Files

dogstatsd-client.mddocs/

DogStatsD Client

High-performance StatsD client for submitting metrics, events, and service checks to DogStatsD. Supports multiple transport protocols (UDP, Unix Domain Sockets), buffering, aggregation, and automatic telemetry injection for efficient real-time monitoring.

Capabilities

Metrics Submission

Submit various metric types with tags, sampling, and timing information for comprehensive application monitoring.

class DogStatsd:
    def __init__(
        self,
        host="localhost",
        port=8125,
        max_buffer_size=None,
        namespace=None,
        constant_tags=None,
        use_ms=False,
        use_default_route=False,
        socket_path=None,
        default_sample_rate=1,
        disable_telemetry=False,
        telemetry_min_flush_interval=10,
        max_buffer_len=0,
        container_id=None,
        origin_detection_enabled=True,
        cardinality=None
    ):
        """
        Initialize DogStatsD client.
        
        Parameters:
        - host (str): StatsD server hostname
        - port (int): StatsD server port  
        - max_buffer_size (int): Maximum UDP packet size in bytes
        - namespace (str): Prefix for all metric names
        - constant_tags (list): Tags applied to all metrics
        - use_ms (bool): Use milliseconds for timing metrics
        - use_default_route (bool): Dynamically set host to default route
        - socket_path (str): Unix domain socket path (overrides host/port)
        - default_sample_rate (float): Default sampling rate (0.0-1.0)
        - disable_telemetry (bool): Disable client telemetry
        - telemetry_min_flush_interval (int): Minimum telemetry flush interval
        - max_buffer_len (int): Maximum buffer length before flush
        - container_id (str): Container ID for origin detection
        - origin_detection_enabled (bool): Enable origin detection
        - cardinality (str): Cardinality level for metrics
        """

    def gauge(self, metric, value, tags=None, sample_rate=None, cardinality=None):
        """
        Submit gauge metric (current value).
        
        Parameters:
        - metric (str): Metric name
        - value (float): Current value
        - tags (list): List of tags in "key:value" format
        - sample_rate (float): Sampling rate (0.0-1.0, default: None)
        - cardinality (str): Cardinality level override
        """
        
    def gauge_with_timestamp(self, metric, value, timestamp, tags=None, sample_rate=None, cardinality=None):
        """
        Submit gauge metric with explicit timestamp.
        
        Parameters:
        - metric (str): Metric name
        - value (float): Current value
        - timestamp (int): Unix timestamp in seconds
        - tags (list): List of tags in "key:value" format
        - sample_rate (float): Sampling rate (0.0-1.0, default: None)
        - cardinality (str): Cardinality level override
        """

    def increment(self, metric, value=1, tags=None, sample_rate=None, cardinality=None):
        """
        Increment counter metric.
        
        Parameters:
        - metric (str): Metric name
        - value (int): Increment amount (default: 1)
        - tags (list): List of tags
        - sample_rate (float): Sampling rate (0.0-1.0, default: None)
        - cardinality (str): Cardinality level override
        """

    def decrement(self, metric, value=1, tags=None, sample_rate=None, cardinality=None):
        """
        Decrement counter metric.
        
        Parameters:
        - metric (str): Metric name
        - value (int): Decrement amount (default: 1)
        - tags (list): List of tags
        - sample_rate (float): Sampling rate (0.0-1.0, default: None)
        - cardinality (str): Cardinality level override
        """

    def count(self, metric, value, tags=None, sample_rate=None, cardinality=None):
        """
        Submit count metric (aggregated over flush interval).
        
        Parameters:
        - metric (str): Metric name
        - value (int): Count value
        - tags (list): List of tags
        - sample_rate (float): Sampling rate (0.0-1.0, default: None)
        - cardinality (str): Cardinality level override
        """

    def histogram(self, metric, value, tags=None, sample_rate=None, cardinality=None):
        """
        Submit histogram metric for statistical analysis.
        
        Parameters:
        - metric (str): Metric name
        - value (float): Value to add to histogram
        - tags (list): List of tags
        - sample_rate (float): Sampling rate (0.0-1.0, default: None)
        - cardinality (str): Cardinality level override
        """

    def distribution(self, metric, value, tags=None, sample_rate=None, cardinality=None):
        """
        Submit distribution metric for global statistical analysis.
        
        Parameters:
        - metric (str): Metric name
        - value (float): Value to add to distribution
        - tags (list): List of tags
        - sample_rate (float): Sampling rate (0.0-1.0, default: None)
        - cardinality (str): Cardinality level override
        """

    def timing(self, metric, value, tags=None, sample_rate=None, cardinality=None):
        """
        Submit timing metric in milliseconds.
        
        Parameters:
        - metric (str): Metric name
        - value (float): Time duration in milliseconds
        - tags (list): List of tags
        - sample_rate (float): Sampling rate (0.0-1.0, default: None)
        - cardinality (str): Cardinality level override
        """

    def set(self, metric, value, tags=None, sample_rate=None, cardinality=None):
        """
        Submit set metric (count unique values).
        
        Parameters:
        - metric (str): Metric name
        - value (str): Unique value to count
        - tags (list): List of tags
        - sample_rate (float): Sampling rate
        """

Events and Service Checks

Submit custom events and service health status for monitoring application state and significant occurrences.

class DogStatsd:
    def event(
        self,
        title,
        text,
        alert_type="info",
        aggregation_key=None,
        source_type_name=None,
        date_happened=None,
        priority="normal",
        tags=None,
        hostname=None
    ):
        """
        Submit custom event.
        
        Parameters:
        - title (str): Event title
        - text (str): Event description
        - alert_type (str): 'error', 'warning', 'info', or 'success'
        - aggregation_key (str): Key for grouping related events
        - source_type_name (str): Source type (e.g., 'my_app')
        - date_happened (int): Unix timestamp when event occurred
        - priority (str): 'normal' or 'low'
        - tags (list): List of tags
        - hostname (str): Host name for the event
        """

    def service_check(
        self,
        check_name,
        status,
        tags=None,
        timestamp=None,
        hostname=None,
        message=None
    ):
        """
        Submit service check status.
        
        Parameters:
        - check_name (str): Name of the service check
        - status (int): Check status (0=OK, 1=WARNING, 2=CRITICAL, 3=UNKNOWN)
        - tags (list): List of tags
        - timestamp (int): Unix timestamp for the check time
        - hostname (str): Host name for the check
        - message (str): Additional message for the check
        """

Decorators and Context Managers

Use decorators and context managers for automatic timing and distribution measurement.

class DogStatsd:
    def timed(self, metric=None, tags=None, sample_rate=1, use_ms=None):
        """
        Timing decorator for measuring function execution time.
        
        Parameters:
        - metric (str): Metric name (defaults to function name)
        - tags (list): List of tags
        - sample_rate (float): Sampling rate
        - use_ms (bool): Use milliseconds (overrides client setting)
        
        Returns:
        Decorator function
        
        Usage:
        @statsd.timed('my_function.duration')
        def my_function():
            pass
        """

    def distributed(self, metric=None, tags=None, sample_rate=1):
        """
        Distribution decorator for measuring function execution time.
        
        Parameters:
        - metric (str): Metric name (defaults to function name)
        - tags (list): List of tags  
        - sample_rate (float): Sampling rate
        
        Returns:
        Decorator function
        
        Usage:
        @statsd.distributed('my_function.duration')
        def my_function():
            pass
        """

Connection and Buffer Management

Control client behavior, buffer management, and connection handling for optimal performance.

class DogStatsd:
    def flush(self):
        """
        Manually flush buffered metrics to StatsD server.
        """

    def close_socket(self):
        """
        Close the UDP socket connection.
        """

    def enable_aggregation(self, flush_interval=0.3, max_samples_per_context=0):
        """
        Enable client-side metric aggregation.
        
        Parameters:
        - flush_interval (float): Aggregation flush interval in seconds
        - max_samples_per_context (int): Max samples per metric context
        """

    def disable_aggregation(self):
        """
        Disable client-side metric aggregation.
        """

    def enable_background_sender(self):
        """
        Enable background thread for metric sending.
        """

    def disable_background_sender(self):
        """
        Disable background thread sending.
        """

    def wait_for_pending(self):
        """
        Wait for all pending metrics to be sent.
        """

Global StatsD Instance

Pre-configured global instance for immediate use without initialization.

# Global statsd instance
statsd = DogStatsd()

# Use directly without initialization
statsd.increment('web.requests')
statsd.gauge('system.memory.usage', 75.2)
statsd.timing('db.query.time', 142)

Usage Examples

Basic Metrics Submission

from datadog import initialize, statsd

# Initialize Datadog (configures global statsd instance)
initialize(
    statsd_host='localhost',
    statsd_port=8125,
    statsd_constant_tags=['env:production', 'service:web']
)

# Submit various metrics
statsd.increment('web.requests', tags=['endpoint:/api/users'])
statsd.gauge('system.cpu.usage', 75.5, tags=['host:web01'])
statsd.histogram('response.time', 245.7, tags=['endpoint:/api/users'])
statsd.timing('db.query.duration', 89, tags=['table:users', 'operation:select'])

# Submit event
statsd.event(
    'Deployment completed',
    'Version 1.2.3 deployed successfully',
    alert_type='success',
    tags=['version:1.2.3', 'env:production']
)

# Submit service check
statsd.service_check(
    'database.connection',
    0,  # OK status
    tags=['db:postgresql', 'host:db01'],
    message='Database connection healthy'
)

Custom Client Configuration

from datadog.dogstatsd import DogStatsd

# Create custom client with specific configuration
custom_statsd = DogStatsd(
    host='statsd.internal.com',
    port=8125,
    namespace='myapp',
    constant_tags=['service:api', 'version:1.0'],
    max_buffer_size=1024,
    default_sample_rate=0.1,  # Sample 10% of metrics
    use_ms=True  # Use milliseconds for timing
)

# All metrics will be prefixed with 'myapp.' and include constant tags
custom_statsd.increment('requests.count')  # Sends: myapp.requests.count
custom_statsd.timing('request.duration', 250)  # Value in milliseconds

Using Decorators for Automatic Timing

from datadog import statsd

# Time function execution automatically
@statsd.timed('function.process_data.duration', tags=['version:v2'])
def process_data(data):
    # Function implementation
    time.sleep(0.1)  # Simulated work
    return len(data)

# Use distribution for more detailed statistics
@statsd.distributed('function.calculate.time')
def expensive_calculation(x, y):
    # Complex calculation
    result = sum(i * x * y for i in range(1000))
    return result

# Function calls automatically submit timing metrics
result = process_data([1, 2, 3, 4, 5])
calc_result = expensive_calculation(10, 20)

Context Manager for Code Block Timing

import time
from datadog import statsd

# Time a code block
with statsd.timed('database.backup.duration', tags=['type:full']):
    # Backup operations
    time.sleep(5)  # Simulated backup time
    print("Backup completed")

# The timing metric is automatically submitted when exiting the context

Advanced Configuration with Aggregation

from datadog.dogstatsd import DogStatsd

# Create client with aggregation enabled
aggregated_statsd = DogStatsd(
    host='localhost',
    port=8125,
    max_buffer_len=50,  # Buffer up to 50 metrics
    disable_telemetry=False
)

# Enable aggregation for high-throughput scenarios
aggregated_statsd.enable_aggregation(
    flush_interval=1.0,  # Flush every second
    max_samples_per_context=100  # Max 100 samples per metric
)

# Submit many metrics rapidly - they'll be aggregated
for i in range(1000):
    aggregated_statsd.increment('high_volume.counter', tags=[f'iteration:{i%10}'])
    aggregated_statsd.gauge('random.value', random.random() * 100)

# Manually flush if needed
aggregated_statsd.flush()

# Clean shutdown
aggregated_statsd.wait_for_pending()
aggregated_statsd.close_socket()

Unix Domain Socket Configuration

from datadog.dogstatsd import DogStatsd

# Use Unix Domain Socket for better performance
socket_statsd = DogStatsd(
    socket_path='/var/run/datadog/dsd.socket',
    constant_tags=['transport:uds']
)

# Submit metrics via UDS
socket_statsd.increment('uds.test.counter')
socket_statsd.gauge('uds.test.gauge', 42.0)

Error Handling and Reliability

from datadog import statsd
import logging

# StatsD operations are fire-and-forget by design
# but you can add error handling for critical metrics

def safe_metric_submit():
    try:
        statsd.increment('critical.business.metric')
        statsd.gauge('critical.system.health', get_health_score())
    except Exception as e:
        # Log error but don't block application
        logging.warning(f"Failed to submit metrics: {e}")
        
    # Application continues regardless of metric submission success

# Handle sampling for high-volume metrics
def submit_high_volume_metric(value):
    # Only submit 1% of metrics to reduce load
    statsd.histogram('high_volume.metric', value, sample_rate=0.01)

# The sample_rate tells StatsD to multiply the received values
# to estimate the true volume

Best Practices

Metric Naming Conventions

# Good: Use hierarchical naming with dots
statsd.increment('web.requests.success')
statsd.increment('web.requests.error')
statsd.gauge('system.memory.usage')
statsd.timing('database.query.users.select')

# Avoid: Inconsistent or flat naming
statsd.increment('web_success')  # Inconsistent separator
statsd.increment('requests')     # Too generic

Effective Tagging Strategy

# Good: Use tags for dimensions, not metric names
statsd.increment('web.requests', tags=[
    'endpoint:/api/users',
    'method:GET', 
    'status:200',
    'region:us-east-1'
])

# Avoid: Encoding dimensions in metric names
statsd.increment('web.requests.api.users.GET.200.us_east_1')  # Creates many metrics

Sampling for High-Volume Metrics

# Use sampling for metrics that fire very frequently
statsd.increment('trace.span.created', sample_rate=0.1)  # Sample 10%
statsd.timing('cache.access.time', duration, sample_rate=0.05)  # Sample 5%

# Don't sample critical business metrics
statsd.increment('payment.processed')  # Always submit (sample_rate=1.0)
statsd.gauge('service.health.score', score)  # Always submit

Install with Tessl CLI

npx tessl i tessl/pypi-datadog

docs

command-line-tools.md

configuration.md

dogstatsd-client.md

error-handling.md

http-api-client.md

index.md

threadstats.md

tile.json