0
# ThreadStats
1
2
Thread-safe metrics collection system that aggregates metrics in background threads without hindering application performance. Ideal for high-throughput applications where metrics submission must not impact critical application threads.
3
4
## Capabilities
5
6
### Thread-Safe Metrics Collection
7
8
Collect metrics from multiple threads safely with automatic aggregation and background flushing to prevent performance degradation.
9
10
```python { .api }
11
class ThreadStats:
12
def __init__(self, namespace="", constant_tags=None, compress_payload=False):
13
"""
14
Initialize ThreadStats client.
15
16
Parameters:
17
- namespace (str): Namespace to prefix all metric names (default: "")
18
- constant_tags (list): Tags to attach to every metric reported by this client (default: None)
19
- compress_payload (bool): Compress the payload using zlib (default: False)
20
"""
21
22
def start(
23
self,
24
flush_interval=10,
25
roll_up_interval=10,
26
device=None,
27
flush_in_thread=True,
28
flush_in_greenlet=False,
29
disabled=False
30
):
31
"""
32
Start background metrics collection thread.
33
34
Parameters:
35
- flush_interval (int): The number of seconds to wait between flushes (default: 10)
36
- roll_up_interval (int): Roll up interval for metrics aggregation (default: 10)
37
- device: Device parameter for metrics (default: None)
38
- flush_in_thread (bool): True if you'd like to spawn a thread to flush metrics (default: True)
39
- flush_in_greenlet (bool): Set to true if you'd like to flush in a gevent greenlet (default: False)
40
- disabled (bool): Disable metrics collection (default: False)
41
"""
42
43
def stop(self):
44
"""
45
Stop background collection thread and flush remaining metrics.
46
"""
47
```
48
49
### Metric Submission Methods
50
51
Submit various metric types with automatic thread-safe aggregation and batching.
52
53
```python { .api }
54
class ThreadStats:
55
def gauge(self, metric, value, tags=None, sample_rate=1, timestamp=None):
56
"""
57
Submit gauge metric (current value).
58
59
Parameters:
60
- metric (str): Metric name
61
- value (float): Current gauge value
62
- tags (list): List of tags in "key:value" format
63
- sample_rate (float): Sampling rate (0.0-1.0)
64
- timestamp (int): Unix timestamp (optional)
65
"""
66
67
def increment(self, metric, value=1, tags=None, sample_rate=1):
68
"""
69
Increment counter metric.
70
71
Parameters:
72
- metric (str): Metric name
73
- value (int): Increment amount (default: 1)
74
- tags (list): List of tags
75
- sample_rate (float): Sampling rate
76
"""
77
78
def decrement(self, metric, value=1, tags=None, sample_rate=1):
79
"""
80
Decrement counter metric.
81
82
Parameters:
83
- metric (str): Metric name
84
- value (int): Decrement amount (default: 1)
85
- tags (list): List of tags
86
- sample_rate (float): Sampling rate
87
"""
88
89
def histogram(self, metric, value, tags=None, sample_rate=1):
90
"""
91
Submit histogram metric for statistical analysis.
92
93
Parameters:
94
- metric (str): Metric name
95
- value (float): Value to add to histogram
96
- tags (list): List of tags
97
- sample_rate (float): Sampling rate
98
"""
99
100
def distribution(self, metric, value, tags=None, sample_rate=1):
101
"""
102
Submit distribution metric for global statistical analysis.
103
104
Parameters:
105
- metric (str): Metric name
106
- value (float): Value to add to distribution
107
- tags (list): List of tags
108
- sample_rate (float): Sampling rate
109
"""
110
111
def timing(self, metric, value, tags=None, sample_rate=1):
112
"""
113
Submit timing metric in milliseconds.
114
115
Parameters:
116
- metric (str): Metric name
117
- value (float): Time duration in milliseconds
118
- tags (list): List of tags
119
- sample_rate (float): Sampling rate
120
"""
121
122
def set(self, metric, value, tags=None, sample_rate=1):
123
"""
124
Submit set metric (count unique values).
125
126
Parameters:
127
- metric (str): Metric name
128
- value (str): Unique value to count
129
- tags (list): List of tags
130
- sample_rate (float): Sampling rate
131
"""
132
133
def event(
134
self,
135
title,
136
text,
137
alert_type="info",
138
aggregation_key=None,
139
source_type_name=None,
140
date_happened=None,
141
priority="normal",
142
tags=None
143
):
144
"""
145
Submit custom event.
146
147
Parameters:
148
- title (str): Event title
149
- text (str): Event description
150
- alert_type (str): 'error', 'warning', 'info', or 'success'
151
- aggregation_key (str): Key for grouping related events
152
- source_type_name (str): Source type identifier
153
- date_happened (int): Unix timestamp when event occurred
154
- priority (str): 'normal' or 'low'
155
- tags (list): List of tags
156
"""
157
```
158
159
### Timing Utilities
160
161
Context managers and decorators for automatic timing measurement without manual calculation.
162
163
```python { .api }
164
class ThreadStats:
165
def timer(self, metric=None, tags=None, sample_rate=1):
166
"""
167
Context manager for timing code blocks.
168
169
Parameters:
170
- metric (str): Metric name for timing
171
- tags (list): List of tags
172
- sample_rate (float): Sampling rate
173
174
Returns:
175
Context manager that submits timing metric on exit
176
177
Usage:
178
with stats.timer('operation.duration'):
179
# Timed operation
180
pass
181
"""
182
183
def timed(self, metric=None, tags=None, sample_rate=1):
184
"""
185
Timing decorator for measuring function execution time.
186
187
Parameters:
188
- metric (str): Metric name (defaults to function name)
189
- tags (list): List of tags
190
- sample_rate (float): Sampling rate
191
192
Returns:
193
Decorator function
194
195
Usage:
196
@stats.timed('function.process.duration')
197
def process_data():
198
pass
199
"""
200
```
201
202
### Buffer Management
203
204
Control metric aggregation, flushing behavior, and memory management for optimal performance.
205
206
```python { .api }
207
class ThreadStats:
208
def flush(self, timestamp=None):
209
"""
210
Manually flush aggregated metrics to Datadog API.
211
212
Parameters:
213
- timestamp (int): Timestamp for the flush operation
214
"""
215
```
216
217
### AWS Lambda Integration
218
219
Specialized functions for AWS Lambda environments with automatic context handling and optimized flushing.
220
221
```python { .api }
222
def datadog_lambda_wrapper(lambda_func):
223
"""
224
Decorator for AWS Lambda functions to enable metrics collection.
225
226
Parameters:
227
- lambda_func (function): Lambda handler function
228
229
Returns:
230
Wrapped function with automatic metrics setup and flushing
231
232
Usage:
233
@datadog_lambda_wrapper
234
def lambda_handler(event, context):
235
lambda_metric('custom.metric', 1)
236
return {'statusCode': 200}
237
"""
238
239
def lambda_metric(metric_name, value, tags=None, timestamp=None):
240
"""
241
Submit metric from AWS Lambda function.
242
243
Parameters:
244
- metric_name (str): Metric name
245
- value (float): Metric value
246
- tags (list): List of tags
247
- timestamp (int): Unix timestamp
248
249
Note: Use with datadog_lambda_wrapper for automatic flushing
250
"""
251
```
252
253
## Usage Examples
254
255
### Basic ThreadStats Usage
256
257
```python
258
from datadog.threadstats import ThreadStats
259
import time
260
import threading
261
262
# Initialize ThreadStats client
263
stats = ThreadStats()
264
265
# Start background collection thread
266
stats.start(flush_interval=5, roll_up_interval=10)
267
268
# Submit metrics from multiple threads safely
269
def worker_thread(thread_id):
270
for i in range(100):
271
stats.increment('worker.task.completed', tags=[f'thread:{thread_id}'])
272
stats.gauge('worker.queue.size', 50 - i, tags=[f'thread:{thread_id}'])
273
stats.timing('worker.task.duration', 25 + thread_id * 5)
274
time.sleep(0.1)
275
276
# Create multiple worker threads
277
threads = []
278
for i in range(5):
279
t = threading.Thread(target=worker_thread, args=(i,))
280
threads.append(t)
281
t.start()
282
283
# Wait for all threads to complete
284
for t in threads:
285
t.join()
286
287
# Stop collection and flush remaining metrics
288
stats.stop()
289
```
290
291
### Using Timer Context Manager
292
293
```python
294
from datadog.threadstats import ThreadStats
295
import requests
296
297
stats = ThreadStats()
298
stats.start()
299
300
# Time different operations
301
with stats.timer('api.request.duration', tags=['endpoint:users']):
302
response = requests.get('https://api.example.com/users')
303
304
with stats.timer('database.query.time', tags=['table:orders', 'operation:select']):
305
# Simulated database query
306
time.sleep(0.05)
307
308
with stats.timer('file.processing.time', tags=['type:csv']):
309
# File processing operation
310
process_large_file('data.csv')
311
312
stats.stop()
313
```
314
315
### Function Timing with Decorators
316
317
```python
318
from datadog.threadstats import ThreadStats
319
320
stats = ThreadStats()
321
stats.start()
322
323
@stats.timed('data.processing.duration', tags=['version:v2'])
324
def process_user_data(user_data):
325
# Complex data processing
326
processed = []
327
for item in user_data:
328
# Processing logic
329
processed.append(transform_data(item))
330
return processed
331
332
@stats.timed('cache.operation.time')
333
def update_cache(key, value):
334
# Cache update operation
335
cache_client.set(key, value, ttl=3600)
336
return True
337
338
# Function calls automatically submit timing metrics
339
users = get_user_data()
340
result = process_user_data(users)
341
update_cache('processed_users', result)
342
343
stats.stop()
344
```
345
346
### High-Throughput Metrics Collection
347
348
```python
349
from datadog.threadstats import ThreadStats
350
import concurrent.futures
351
import random
352
353
# Configure for high-throughput scenario
354
stats = ThreadStats(
355
flush_interval=2, # Flush every 2 seconds
356
roll_up_interval=5, # Aggregate over 5-second windows
357
flush_in_thread=True # Use background thread
358
)
359
360
stats.start()
361
362
def simulate_high_traffic():
363
"""Simulate high-volume application metrics."""
364
for _ in range(10000):
365
# Application metrics
366
stats.increment('app.requests.total')
367
stats.gauge('app.memory.usage', random.uniform(50, 90))
368
stats.histogram('app.response.time', random.uniform(10, 500))
369
370
# Business metrics
371
if random.random() < 0.1: # 10% chance
372
stats.increment('business.conversion.event')
373
374
if random.random() < 0.05: # 5% chance
375
stats.event(
376
'User signup',
377
'New user registration completed',
378
alert_type='info',
379
tags=['source:web', 'funnel:signup']
380
)
381
382
# Run simulation in multiple threads
383
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
384
futures = [executor.submit(simulate_high_traffic) for _ in range(10)]
385
concurrent.futures.wait(futures)
386
387
# Graceful shutdown with final flush
388
stats.stop()
389
```
390
391
### AWS Lambda Integration
392
393
```python
394
from datadog.threadstats.aws_lambda import datadog_lambda_wrapper, lambda_metric
395
import json
396
397
@datadog_lambda_wrapper
398
def lambda_handler(event, context):
399
"""AWS Lambda handler with automatic Datadog metrics."""
400
401
# Submit custom metrics
402
lambda_metric('lambda.invocation.count', 1, tags=['function:user-processor'])
403
lambda_metric('lambda.event.size', len(json.dumps(event)))
404
405
try:
406
# Process the event
407
result = process_event(event)
408
409
# Success metrics
410
lambda_metric('lambda.processing.success', 1)
411
lambda_metric('lambda.result.size', len(str(result)))
412
413
return {
414
'statusCode': 200,
415
'body': json.dumps(result)
416
}
417
418
except Exception as e:
419
# Error metrics
420
lambda_metric('lambda.processing.error', 1, tags=['error_type:processing'])
421
422
return {
423
'statusCode': 500,
424
'body': json.dumps({'error': str(e)})
425
}
426
427
def process_event(event):
428
"""Business logic for processing Lambda events."""
429
# Simulate processing time
430
lambda_metric('lambda.processing.duration', 250)
431
return {'processed': True, 'items': len(event.get('items', []))}
432
```
433
434
### Integration with Web Frameworks
435
436
```python
437
from datadog.threadstats import ThreadStats
438
from flask import Flask, request
439
import time
440
441
app = Flask(__name__)
442
stats = ThreadStats()
443
stats.start()
444
445
@app.before_request
446
def before_request():
447
request.start_time = time.time()
448
stats.increment('web.request.started', tags=[
449
f'endpoint:{request.endpoint}',
450
f'method:{request.method}'
451
])
452
453
@app.after_request
454
def after_request(response):
455
# Calculate request duration
456
duration = (time.time() - request.start_time) * 1000 # Convert to ms
457
458
# Submit request metrics
459
stats.timing('web.request.duration', duration, tags=[
460
f'endpoint:{request.endpoint}',
461
f'method:{request.method}',
462
f'status:{response.status_code}'
463
])
464
465
stats.gauge('web.response.size', len(response.get_data()))
466
467
return response
468
469
@app.route('/api/users')
470
def get_users():
471
# Business logic metrics
472
stats.increment('api.users.request')
473
474
with stats.timer('database.query.users'):
475
users = fetch_users_from_db()
476
477
stats.gauge('api.users.returned', len(users))
478
return {'users': users}
479
480
if __name__ == '__main__':
481
try:
482
app.run()
483
finally:
484
stats.stop() # Ensure metrics are flushed on shutdown
485
```
486
487
### Custom Aggregation Configuration
488
489
```python
490
from datadog.threadstats import ThreadStats
491
492
# Custom configuration for specific use case
493
stats = ThreadStats(
494
api_key='your-api-key',
495
app_key='your-app-key',
496
host_name='custom-host',
497
flush_interval=30, # Flush every 30 seconds
498
roll_up_interval=60, # Aggregate over 1-minute windows
499
flush_in_thread=True, # Background flushing
500
device='container-1' # Custom device identifier
501
)
502
503
stats.start()
504
505
# Submit metrics with custom aggregation
506
for i in range(1000):
507
stats.increment('custom.counter', tags=['batch:hourly'])
508
stats.gauge('custom.processing.rate', i * 0.5)
509
510
# These will be aggregated over the roll_up_interval
511
if i % 100 == 0:
512
stats.event(
513
f'Batch checkpoint {i}',
514
f'Processed {i} items in current batch',
515
tags=['batch:hourly', f'checkpoint:{i}']
516
)
517
518
# Manual flush if needed before automatic interval
519
stats.flush()
520
521
stats.stop()
522
```
523
524
## Best Practices
525
526
### Thread Safety and Performance
527
528
```python
529
# Good: One ThreadStats instance shared across threads
530
stats = ThreadStats()
531
stats.start()
532
533
def worker_function(worker_id):
534
# Safe to call from multiple threads
535
stats.increment('worker.processed', tags=[f'worker:{worker_id}'])
536
537
# Avoid: Creating multiple ThreadStats instances
538
# This wastes resources and can cause metric duplication
539
```
540
541
### Proper Lifecycle Management
542
543
```python
544
from datadog.threadstats import ThreadStats
545
546
class MetricsManager:
547
def __init__(self):
548
self.stats = ThreadStats()
549
550
def start(self):
551
self.stats.start()
552
553
def stop(self):
554
"""Ensure clean shutdown with metric flushing."""
555
self.stats.stop() # This flushes remaining metrics
556
557
def __enter__(self):
558
self.start()
559
return self.stats
560
561
def __exit__(self, exc_type, exc_val, exc_tb):
562
self.stop()
563
564
# Use context manager for automatic cleanup
565
with MetricsManager() as stats:
566
stats.increment('app.started')
567
# Do application work
568
stats.increment('app.finished')
569
# Metrics are automatically flushed on exit
570
```
571
572
### Lambda-Specific Considerations
573
574
```python
575
# For AWS Lambda, use the wrapper and lambda_metric functions
576
from datadog.threadstats.aws_lambda import datadog_lambda_wrapper, lambda_metric
577
578
@datadog_lambda_wrapper
579
def lambda_handler(event, context):
580
# Don't create ThreadStats instances in Lambda
581
# Use lambda_metric instead for automatic lifecycle management
582
lambda_metric('lambda.execution', 1)
583
584
# Business logic
585
result = process_lambda_event(event)
586
587
lambda_metric('lambda.success', 1)
588
return result
589
```