or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

command-line-tools.mdconfiguration.mddogstatsd-client.mderror-handling.mdhttp-api-client.mdindex.mdthreadstats.md

threadstats.mddocs/

0

# ThreadStats

1

2

Thread-safe metrics collection system that aggregates metrics in background threads without hindering application performance. Ideal for high-throughput applications where metrics submission must not impact critical application threads.

3

4

## Capabilities

5

6

### Thread-Safe Metrics Collection

7

8

Collect metrics from multiple threads safely with automatic aggregation and background flushing to prevent performance degradation.

9

10

```python { .api }

11

class ThreadStats:

12

def __init__(self, namespace="", constant_tags=None, compress_payload=False):

13

"""

14

Initialize ThreadStats client.

15

16

Parameters:

17

- namespace (str): Namespace to prefix all metric names (default: "")

18

- constant_tags (list): Tags to attach to every metric reported by this client (default: None)

19

- compress_payload (bool): Compress the payload using zlib (default: False)

20

"""

21

22

def start(

23

self,

24

flush_interval=10,

25

roll_up_interval=10,

26

device=None,

27

flush_in_thread=True,

28

flush_in_greenlet=False,

29

disabled=False

30

):

31

"""

32

Start background metrics collection thread.

33

34

Parameters:

35

- flush_interval (int): The number of seconds to wait between flushes (default: 10)

36

- roll_up_interval (int): Roll up interval for metrics aggregation (default: 10)

37

- device: Device parameter for metrics (default: None)

38

- flush_in_thread (bool): True if you'd like to spawn a thread to flush metrics (default: True)

39

- flush_in_greenlet (bool): Set to true if you'd like to flush in a gevent greenlet (default: False)

40

- disabled (bool): Disable metrics collection (default: False)

41

"""

42

43

def stop(self):

44

"""

45

Stop background collection thread and flush remaining metrics.

46

"""

47

```

48

49

### Metric Submission Methods

50

51

Submit various metric types with automatic thread-safe aggregation and batching.

52

53

```python { .api }

54

class ThreadStats:

55

def gauge(self, metric, value, tags=None, sample_rate=1, timestamp=None):

56

"""

57

Submit gauge metric (current value).

58

59

Parameters:

60

- metric (str): Metric name

61

- value (float): Current gauge value

62

- tags (list): List of tags in "key:value" format

63

- sample_rate (float): Sampling rate (0.0-1.0)

64

- timestamp (int): Unix timestamp (optional)

65

"""

66

67

def increment(self, metric, value=1, tags=None, sample_rate=1):

68

"""

69

Increment counter metric.

70

71

Parameters:

72

- metric (str): Metric name

73

- value (int): Increment amount (default: 1)

74

- tags (list): List of tags

75

- sample_rate (float): Sampling rate

76

"""

77

78

def decrement(self, metric, value=1, tags=None, sample_rate=1):

79

"""

80

Decrement counter metric.

81

82

Parameters:

83

- metric (str): Metric name

84

- value (int): Decrement amount (default: 1)

85

- tags (list): List of tags

86

- sample_rate (float): Sampling rate

87

"""

88

89

def histogram(self, metric, value, tags=None, sample_rate=1):

90

"""

91

Submit histogram metric for statistical analysis.

92

93

Parameters:

94

- metric (str): Metric name

95

- value (float): Value to add to histogram

96

- tags (list): List of tags

97

- sample_rate (float): Sampling rate

98

"""

99

100

def distribution(self, metric, value, tags=None, sample_rate=1):

101

"""

102

Submit distribution metric for global statistical analysis.

103

104

Parameters:

105

- metric (str): Metric name

106

- value (float): Value to add to distribution

107

- tags (list): List of tags

108

- sample_rate (float): Sampling rate

109

"""

110

111

def timing(self, metric, value, tags=None, sample_rate=1):

112

"""

113

Submit timing metric in milliseconds.

114

115

Parameters:

116

- metric (str): Metric name

117

- value (float): Time duration in milliseconds

118

- tags (list): List of tags

119

- sample_rate (float): Sampling rate

120

"""

121

122

def set(self, metric, value, tags=None, sample_rate=1):

123

"""

124

Submit set metric (count unique values).

125

126

Parameters:

127

- metric (str): Metric name

128

- value (str): Unique value to count

129

- tags (list): List of tags

130

- sample_rate (float): Sampling rate

131

"""

132

133

def event(

134

self,

135

title,

136

text,

137

alert_type="info",

138

aggregation_key=None,

139

source_type_name=None,

140

date_happened=None,

141

priority="normal",

142

tags=None

143

):

144

"""

145

Submit custom event.

146

147

Parameters:

148

- title (str): Event title

149

- text (str): Event description

150

- alert_type (str): 'error', 'warning', 'info', or 'success'

151

- aggregation_key (str): Key for grouping related events

152

- source_type_name (str): Source type identifier

153

- date_happened (int): Unix timestamp when event occurred

154

- priority (str): 'normal' or 'low'

155

- tags (list): List of tags

156

"""

157

```

158

159

### Timing Utilities

160

161

Context managers and decorators for automatic timing measurement without manual calculation.

162

163

```python { .api }

164

class ThreadStats:

165

def timer(self, metric=None, tags=None, sample_rate=1):

166

"""

167

Context manager for timing code blocks.

168

169

Parameters:

170

- metric (str): Metric name for timing

171

- tags (list): List of tags

172

- sample_rate (float): Sampling rate

173

174

Returns:

175

Context manager that submits timing metric on exit

176

177

Usage:

178

with stats.timer('operation.duration'):

179

# Timed operation

180

pass

181

"""

182

183

def timed(self, metric=None, tags=None, sample_rate=1):

184

"""

185

Timing decorator for measuring function execution time.

186

187

Parameters:

188

- metric (str): Metric name (defaults to function name)

189

- tags (list): List of tags

190

- sample_rate (float): Sampling rate

191

192

Returns:

193

Decorator function

194

195

Usage:

196

@stats.timed('function.process.duration')

197

def process_data():

198

pass

199

"""

200

```

201

202

### Buffer Management

203

204

Control metric aggregation, flushing behavior, and memory management for optimal performance.

205

206

```python { .api }

207

class ThreadStats:

208

def flush(self, timestamp=None):

209

"""

210

Manually flush aggregated metrics to Datadog API.

211

212

Parameters:

213

- timestamp (int): Timestamp for the flush operation

214

"""

215

```

216

217

### AWS Lambda Integration

218

219

Specialized functions for AWS Lambda environments with automatic context handling and optimized flushing.

220

221

```python { .api }

222

def datadog_lambda_wrapper(lambda_func):

223

"""

224

Decorator for AWS Lambda functions to enable metrics collection.

225

226

Parameters:

227

- lambda_func (function): Lambda handler function

228

229

Returns:

230

Wrapped function with automatic metrics setup and flushing

231

232

Usage:

233

@datadog_lambda_wrapper

234

def lambda_handler(event, context):

235

lambda_metric('custom.metric', 1)

236

return {'statusCode': 200}

237

"""

238

239

def lambda_metric(metric_name, value, tags=None, timestamp=None):

240

"""

241

Submit metric from AWS Lambda function.

242

243

Parameters:

244

- metric_name (str): Metric name

245

- value (float): Metric value

246

- tags (list): List of tags

247

- timestamp (int): Unix timestamp

248

249

Note: Use with datadog_lambda_wrapper for automatic flushing

250

"""

251

```

252

253

## Usage Examples

254

255

### Basic ThreadStats Usage

256

257

```python

258

from datadog.threadstats import ThreadStats

259

import time

260

import threading

261

262

# Initialize ThreadStats client

263

stats = ThreadStats()

264

265

# Start background collection thread

266

stats.start(flush_interval=5, roll_up_interval=10)

267

268

# Submit metrics from multiple threads safely

269

def worker_thread(thread_id):

270

for i in range(100):

271

stats.increment('worker.task.completed', tags=[f'thread:{thread_id}'])

272

stats.gauge('worker.queue.size', 50 - i, tags=[f'thread:{thread_id}'])

273

stats.timing('worker.task.duration', 25 + thread_id * 5)

274

time.sleep(0.1)

275

276

# Create multiple worker threads

277

threads = []

278

for i in range(5):

279

t = threading.Thread(target=worker_thread, args=(i,))

280

threads.append(t)

281

t.start()

282

283

# Wait for all threads to complete

284

for t in threads:

285

t.join()

286

287

# Stop collection and flush remaining metrics

288

stats.stop()

289

```

290

291

### Using Timer Context Manager

292

293

```python

294

from datadog.threadstats import ThreadStats

295

import requests

296

297

stats = ThreadStats()

298

stats.start()

299

300

# Time different operations

301

with stats.timer('api.request.duration', tags=['endpoint:users']):

302

response = requests.get('https://api.example.com/users')

303

304

with stats.timer('database.query.time', tags=['table:orders', 'operation:select']):

305

# Simulated database query

306

time.sleep(0.05)

307

308

with stats.timer('file.processing.time', tags=['type:csv']):

309

# File processing operation

310

process_large_file('data.csv')

311

312

stats.stop()

313

```

314

315

### Function Timing with Decorators

316

317

```python

318

from datadog.threadstats import ThreadStats

319

320

stats = ThreadStats()

321

stats.start()

322

323

@stats.timed('data.processing.duration', tags=['version:v2'])

324

def process_user_data(user_data):

325

# Complex data processing

326

processed = []

327

for item in user_data:

328

# Processing logic

329

processed.append(transform_data(item))

330

return processed

331

332

@stats.timed('cache.operation.time')

333

def update_cache(key, value):

334

# Cache update operation

335

cache_client.set(key, value, ttl=3600)

336

return True

337

338

# Function calls automatically submit timing metrics

339

users = get_user_data()

340

result = process_user_data(users)

341

update_cache('processed_users', result)

342

343

stats.stop()

344

```

345

346

### High-Throughput Metrics Collection

347

348

```python

349

from datadog.threadstats import ThreadStats

350

import concurrent.futures

351

import random

352

353

# Configure for high-throughput scenario

354

stats = ThreadStats(

355

flush_interval=2, # Flush every 2 seconds

356

roll_up_interval=5, # Aggregate over 5-second windows

357

flush_in_thread=True # Use background thread

358

)

359

360

stats.start()

361

362

def simulate_high_traffic():

363

"""Simulate high-volume application metrics."""

364

for _ in range(10000):

365

# Application metrics

366

stats.increment('app.requests.total')

367

stats.gauge('app.memory.usage', random.uniform(50, 90))

368

stats.histogram('app.response.time', random.uniform(10, 500))

369

370

# Business metrics

371

if random.random() < 0.1: # 10% chance

372

stats.increment('business.conversion.event')

373

374

if random.random() < 0.05: # 5% chance

375

stats.event(

376

'User signup',

377

'New user registration completed',

378

alert_type='info',

379

tags=['source:web', 'funnel:signup']

380

)

381

382

# Run simulation in multiple threads

383

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:

384

futures = [executor.submit(simulate_high_traffic) for _ in range(10)]

385

concurrent.futures.wait(futures)

386

387

# Graceful shutdown with final flush

388

stats.stop()

389

```

390

391

### AWS Lambda Integration

392

393

```python

394

from datadog.threadstats.aws_lambda import datadog_lambda_wrapper, lambda_metric

395

import json

396

397

@datadog_lambda_wrapper

398

def lambda_handler(event, context):

399

"""AWS Lambda handler with automatic Datadog metrics."""

400

401

# Submit custom metrics

402

lambda_metric('lambda.invocation.count', 1, tags=['function:user-processor'])

403

lambda_metric('lambda.event.size', len(json.dumps(event)))

404

405

try:

406

# Process the event

407

result = process_event(event)

408

409

# Success metrics

410

lambda_metric('lambda.processing.success', 1)

411

lambda_metric('lambda.result.size', len(str(result)))

412

413

return {

414

'statusCode': 200,

415

'body': json.dumps(result)

416

}

417

418

except Exception as e:

419

# Error metrics

420

lambda_metric('lambda.processing.error', 1, tags=['error_type:processing'])

421

422

return {

423

'statusCode': 500,

424

'body': json.dumps({'error': str(e)})

425

}

426

427

def process_event(event):

428

"""Business logic for processing Lambda events."""

429

# Simulate processing time

430

lambda_metric('lambda.processing.duration', 250)

431

return {'processed': True, 'items': len(event.get('items', []))}

432

```

433

434

### Integration with Web Frameworks

435

436

```python

437

from datadog.threadstats import ThreadStats

438

from flask import Flask, request

439

import time

440

441

app = Flask(__name__)

442

stats = ThreadStats()

443

stats.start()

444

445

@app.before_request

446

def before_request():

447

request.start_time = time.time()

448

stats.increment('web.request.started', tags=[

449

f'endpoint:{request.endpoint}',

450

f'method:{request.method}'

451

])

452

453

@app.after_request

454

def after_request(response):

455

# Calculate request duration

456

duration = (time.time() - request.start_time) * 1000 # Convert to ms

457

458

# Submit request metrics

459

stats.timing('web.request.duration', duration, tags=[

460

f'endpoint:{request.endpoint}',

461

f'method:{request.method}',

462

f'status:{response.status_code}'

463

])

464

465

stats.gauge('web.response.size', len(response.get_data()))

466

467

return response

468

469

@app.route('/api/users')

470

def get_users():

471

# Business logic metrics

472

stats.increment('api.users.request')

473

474

with stats.timer('database.query.users'):

475

users = fetch_users_from_db()

476

477

stats.gauge('api.users.returned', len(users))

478

return {'users': users}

479

480

if __name__ == '__main__':

481

try:

482

app.run()

483

finally:

484

stats.stop() # Ensure metrics are flushed on shutdown

485

```

486

487

### Custom Aggregation Configuration

488

489

```python

490

from datadog.threadstats import ThreadStats

491

492

# Custom configuration for specific use case

493

stats = ThreadStats(

494

api_key='your-api-key',

495

app_key='your-app-key',

496

host_name='custom-host',

497

flush_interval=30, # Flush every 30 seconds

498

roll_up_interval=60, # Aggregate over 1-minute windows

499

flush_in_thread=True, # Background flushing

500

device='container-1' # Custom device identifier

501

)

502

503

stats.start()

504

505

# Submit metrics with custom aggregation

506

for i in range(1000):

507

stats.increment('custom.counter', tags=['batch:hourly'])

508

stats.gauge('custom.processing.rate', i * 0.5)

509

510

# These will be aggregated over the roll_up_interval

511

if i % 100 == 0:

512

stats.event(

513

f'Batch checkpoint {i}',

514

f'Processed {i} items in current batch',

515

tags=['batch:hourly', f'checkpoint:{i}']

516

)

517

518

# Manual flush if needed before automatic interval

519

stats.flush()

520

521

stats.stop()

522

```

523

524

## Best Practices

525

526

### Thread Safety and Performance

527

528

```python

529

# Good: One ThreadStats instance shared across threads

530

stats = ThreadStats()

531

stats.start()

532

533

def worker_function(worker_id):

534

# Safe to call from multiple threads

535

stats.increment('worker.processed', tags=[f'worker:{worker_id}'])

536

537

# Avoid: Creating multiple ThreadStats instances

538

# This wastes resources and can cause metric duplication

539

```

540

541

### Proper Lifecycle Management

542

543

```python

544

from datadog.threadstats import ThreadStats

545

546

class MetricsManager:

547

def __init__(self):

548

self.stats = ThreadStats()

549

550

def start(self):

551

self.stats.start()

552

553

def stop(self):

554

"""Ensure clean shutdown with metric flushing."""

555

self.stats.stop() # This flushes remaining metrics

556

557

def __enter__(self):

558

self.start()

559

return self.stats

560

561

def __exit__(self, exc_type, exc_val, exc_tb):

562

self.stop()

563

564

# Use context manager for automatic cleanup

565

with MetricsManager() as stats:

566

stats.increment('app.started')

567

# Do application work

568

stats.increment('app.finished')

569

# Metrics are automatically flushed on exit

570

```

571

572

### Lambda-Specific Considerations

573

574

```python

575

# For AWS Lambda, use the wrapper and lambda_metric functions

576

from datadog.threadstats.aws_lambda import datadog_lambda_wrapper, lambda_metric

577

578

@datadog_lambda_wrapper

579

def lambda_handler(event, context):

580

# Don't create ThreadStats instances in Lambda

581

# Use lambda_metric instead for automatic lifecycle management

582

lambda_metric('lambda.execution', 1)

583

584

# Business logic

585

result = process_lambda_event(event)

586

587

lambda_metric('lambda.success', 1)

588

return result

589

```