or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced.mdcollectors.mdcontext-managers.mdcore-metrics.mdexposition.mdindex.mdregistry.md

context-managers.mddocs/

0

# Context Managers

1

2

Context managers and decorators for automatic instrumentation including timing operations, counting exceptions, and tracking in-progress work. These utilities provide convenient ways to instrument code without manual metric updates.

3

4

## Capabilities

5

6

### Timer

7

8

A context manager and decorator that automatically measures elapsed time and records it to a metric. Works with Histogram, Summary, and Gauge metrics.

9

10

```python { .api }

11

class Timer:

12

def __init__(self, metric, callback_name):

13

"""

14

Create a Timer for the given metric.

15

16

Parameters:

17

- metric: Metric instance (Histogram, Summary, or Gauge)

18

- callback_name: Method name to call on metric ('observe' for Histogram/Summary, 'set' for Gauge)

19

"""

20

21

def __enter__(self):

22

"""Enter the timing context."""

23

return self

24

25

def __exit__(self, typ, value, traceback):

26

"""Exit the timing context and record elapsed time."""

27

28

def labels(self, *args, **kw) -> None:

29

"""

30

Update the Timer to use the labeled metric instance.

31

32

Parameters:

33

- args: Label values as positional arguments

34

- kw: Label values as keyword arguments

35

36

Note: Modifies the Timer instance in place

37

"""

38

39

def __call__(self, f) -> Callable:

40

"""

41

Use as a decorator.

42

43

Parameters:

44

- f: Function to decorate

45

46

Returns:

47

Decorated function that times execution

48

"""

49

```

50

51

**Usage Example:**

52

53

```python

54

from prometheus_client import Histogram, Summary, Gauge

55

import time

56

import random

57

58

# Create metrics that support timing

59

request_duration = Histogram(

60

'http_request_duration_seconds',

61

'HTTP request duration',

62

['method', 'endpoint']

63

)

64

65

response_time = Summary(

66

'http_response_time_seconds',

67

'HTTP response time summary'

68

)

69

70

last_request_duration = Gauge(

71

'last_request_duration_seconds',

72

'Duration of the last request'

73

)

74

75

# Context manager usage

76

with request_duration.labels('GET', '/api/users').time():

77

# Simulate API call

78

time.sleep(random.uniform(0.1, 0.5))

79

80

with response_time.time():

81

# Simulate processing

82

time.sleep(0.2)

83

84

# Gauge timing (records last duration)

85

with last_request_duration.time():

86

time.sleep(0.3)

87

88

# Decorator usage

89

@request_duration.labels('POST', '/api/orders').time()

90

def create_order():

91

time.sleep(random.uniform(0.2, 0.8))

92

return {'order_id': 12345}

93

94

@response_time.time()

95

def process_request():

96

time.sleep(random.uniform(0.1, 0.4))

97

return "processed"

98

99

# Call decorated functions

100

order = create_order()

101

result = process_request()

102

103

# Multiple label combinations

104

endpoints = ['/users', '/orders', '/products']

105

methods = ['GET', 'POST', 'PUT']

106

107

for endpoint in endpoints:

108

for method in methods:

109

with request_duration.labels(method, endpoint).time():

110

time.sleep(random.uniform(0.05, 0.2))

111

```

112

113

### ExceptionCounter

114

115

A context manager and decorator that automatically counts exceptions of specified types. Useful for monitoring error rates and debugging application issues.

116

117

```python { .api }

118

class ExceptionCounter:

119

def __init__(self, counter: "Counter", exception: Union[Type[BaseException], Tuple[Type[BaseException], ...]]) -> None:

120

"""

121

Create an ExceptionCounter.

122

123

Parameters:

124

- counter: Counter metric instance

125

- exception: Exception type(s) to count (single type or tuple of types)

126

"""

127

128

def __enter__(self) -> None:

129

"""Enter the exception counting context."""

130

131

def __exit__(self, typ, value, traceback) -> Literal[False]:

132

"""

133

Exit the context and count exceptions if they occurred.

134

135

Parameters:

136

- typ: Exception type (None if no exception)

137

- value: Exception instance

138

- traceback: Exception traceback

139

140

Returns:

141

False (does not suppress exceptions)

142

"""

143

144

def __call__(self, f) -> Callable:

145

"""

146

Use as a decorator.

147

148

Parameters:

149

- f: Function to decorate

150

151

Returns:

152

Decorated function that counts exceptions

153

"""

154

```

155

156

**Usage Example:**

157

158

```python

159

from prometheus_client import Counter

160

import random

161

import time

162

163

# Create counters for different exception types

164

http_errors = Counter(

165

'http_errors_total',

166

'Total HTTP errors',

167

['error_type', 'endpoint']

168

)

169

170

database_errors = Counter(

171

'database_errors_total',

172

'Database connection errors'

173

)

174

175

general_errors = Counter(

176

'application_errors_total',

177

'General application errors',

178

['function']

179

)

180

181

# Context manager usage - count specific exceptions

182

def risky_api_call():

183

if random.random() < 0.3:

184

raise ConnectionError("API unavailable")

185

elif random.random() < 0.2:

186

raise ValueError("Invalid response")

187

return "success"

188

189

# Count ConnectionError exceptions

190

with http_errors.labels('connection_error', '/api/external').count_exceptions(ConnectionError):

191

result = risky_api_call()

192

193

# Count multiple exception types

194

with http_errors.labels('client_error', '/api/users').count_exceptions((ValueError, TypeError)):

195

result = risky_api_call()

196

197

# Database operation with exception counting

198

def connect_to_database():

199

if random.random() < 0.1:

200

raise ConnectionError("Database unavailable")

201

return "connected"

202

203

with database_errors.count_exceptions(): # Counts all exceptions

204

connection = connect_to_database()

205

206

# Decorator usage

207

@general_errors.labels('data_processing').count_exceptions()

208

def process_data(data):

209

if not data:

210

raise ValueError("Empty data")

211

if len(data) > 1000:

212

raise MemoryError("Data too large")

213

return f"processed {len(data)} items"

214

215

@http_errors.labels('timeout', '/api/slow').count_exceptions(TimeoutError)

216

def slow_api_call():

217

time.sleep(2)

218

if random.random() < 0.2:

219

raise TimeoutError("Request timeout")

220

return "completed"

221

222

# Call decorated functions

223

try:

224

result = process_data([]) # Will raise ValueError and increment counter

225

except ValueError:

226

pass

227

228

try:

229

result = slow_api_call() # May raise TimeoutError and increment counter

230

except TimeoutError:

231

pass

232

233

# Labeled counters with exception counting

234

services = ['auth', 'payments', 'inventory']

235

error_types = ['connection', 'timeout', 'validation']

236

237

def simulate_service_call(service):

238

if random.random() < 0.15:

239

if random.random() < 0.5:

240

raise ConnectionError(f"{service} unavailable")

241

else:

242

raise TimeoutError(f"{service} timeout")

243

return f"{service} success"

244

245

for service in services:

246

# Count different error types separately

247

with http_errors.labels('connection', service).count_exceptions(ConnectionError):

248

with http_errors.labels('timeout', service).count_exceptions(TimeoutError):

249

try:

250

result = simulate_service_call(service)

251

print(f"{service}: {result}")

252

except (ConnectionError, TimeoutError) as e:

253

print(f"{service}: {e}")

254

```

255

256

### InprogressTracker

257

258

A context manager and decorator that tracks the number of operations currently in progress. Useful for monitoring concurrent operations, queue depths, and system load.

259

260

```python { .api }

261

class InprogressTracker:

262

def __init__(self, gauge):

263

"""

264

Create an InprogressTracker.

265

266

Parameters:

267

- gauge: Gauge metric instance to track in-progress operations

268

"""

269

270

def __enter__(self) -> None:

271

"""Enter the tracking context and increment the gauge."""

272

273

def __exit__(self, typ, value, traceback):

274

"""Exit the tracking context and decrement the gauge."""

275

276

def __call__(self, f) -> Callable:

277

"""

278

Use as a decorator.

279

280

Parameters:

281

- f: Function to decorate

282

283

Returns:

284

Decorated function that tracks execution

285

"""

286

```

287

288

**Usage Example:**

289

290

```python

291

from prometheus_client import Gauge

292

import time

293

import random

294

import threading

295

import concurrent.futures

296

297

# Create gauges for tracking in-progress operations

298

active_requests = Gauge(

299

'http_requests_inprogress',

300

'Number of HTTP requests currently being processed',

301

['endpoint']

302

)

303

304

database_connections = Gauge(

305

'database_connections_active',

306

'Number of active database connections'

307

)

308

309

background_jobs = Gauge(

310

'background_jobs_inprogress',

311

'Number of background jobs currently running',

312

['job_type']

313

)

314

315

# Context manager usage

316

def handle_request(endpoint):

317

with active_requests.labels(endpoint).track_inprogress():

318

# Simulate request processing

319

time.sleep(random.uniform(0.1, 1.0))

320

return f"Processed {endpoint}"

321

322

def database_query():

323

with database_connections.track_inprogress():

324

# Simulate database operation

325

time.sleep(random.uniform(0.05, 0.3))

326

return "query result"

327

328

# Decorator usage

329

@background_jobs.labels('email_sender').track_inprogress()

330

def send_email_batch():

331

time.sleep(random.uniform(1.0, 3.0))

332

return "emails sent"

333

334

@background_jobs.labels('data_sync').track_inprogress()

335

def sync_data():

336

time.sleep(random.uniform(2.0, 5.0))

337

return "data synced"

338

339

@active_requests.labels('/api/upload').track_inprogress()

340

def handle_file_upload():

341

time.sleep(random.uniform(0.5, 2.0))

342

return "file uploaded"

343

344

# Simulate concurrent operations

345

def simulate_concurrent_requests():

346

endpoints = ['/api/users', '/api/orders', '/api/products']

347

348

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

349

# Submit multiple concurrent requests

350

futures = []

351

for _ in range(10):

352

endpoint = random.choice(endpoints)

353

future = executor.submit(handle_request, endpoint)

354

futures.append(future)

355

356

# Wait for completion

357

for future in concurrent.futures.as_completed(futures):

358

result = future.result()

359

print(f"Request completed: {result}")

360

361

def simulate_database_load():

362

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:

363

# Submit database queries

364

futures = [executor.submit(database_query) for _ in range(8)]

365

366

for future in concurrent.futures.as_completed(futures):

367

result = future.result()

368

print(f"Query completed: {result}")

369

370

def simulate_background_jobs():

371

job_functions = [send_email_batch, sync_data, handle_file_upload]

372

373

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:

374

# Submit background jobs

375

futures = [executor.submit(random.choice(job_functions)) for _ in range(6)]

376

377

for future in concurrent.futures.as_completed(futures):

378

result = future.result()

379

print(f"Job completed: {result}")

380

381

# Run simulations in separate threads

382

request_thread = threading.Thread(target=simulate_concurrent_requests)

383

db_thread = threading.Thread(target=simulate_database_load)

384

job_thread = threading.Thread(target=simulate_background_jobs)

385

386

request_thread.start()

387

db_thread.start()

388

job_thread.start()

389

390

# Monitor progress

391

for i in range(10):

392

time.sleep(1)

393

print(f"Active requests: {active_requests._value._value}")

394

print(f"DB connections: {database_connections._value._value}")

395

print(f"Background jobs: {background_jobs._child_samples()}")

396

397

request_thread.join()

398

db_thread.join()

399

job_thread.join()

400

```

401

402

### Combined Usage

403

404

Example showing how to use multiple context managers together for comprehensive instrumentation.

405

406

**Usage Example:**

407

408

```python

409

from prometheus_client import Counter, Gauge, Histogram, start_http_server

410

import time

411

import random

412

import threading

413

414

# Create comprehensive metrics

415

request_count = Counter(

416

'api_requests_total',

417

'Total API requests',

418

['method', 'endpoint', 'status']

419

)

420

421

request_duration = Histogram(

422

'api_request_duration_seconds',

423

'API request duration',

424

['method', 'endpoint']

425

)

426

427

active_requests = Gauge(

428

'api_requests_inprogress',

429

'Active API requests',

430

['endpoint']

431

)

432

433

request_errors = Counter(

434

'api_request_errors_total',

435

'API request errors',

436

['endpoint', 'error_type']

437

)

438

439

def api_endpoint(method, endpoint):

440

"""Fully instrumented API endpoint handler."""

441

442

# Track active requests

443

with active_requests.labels(endpoint).track_inprogress():

444

# Time the request

445

with request_duration.labels(method, endpoint).time():

446

# Count exceptions

447

with request_errors.labels(endpoint, 'connection').count_exceptions(ConnectionError):

448

with request_errors.labels(endpoint, 'timeout').count_exceptions(TimeoutError):

449

with request_errors.labels(endpoint, 'validation').count_exceptions(ValueError):

450

try:

451

# Simulate API processing

452

processing_time = random.uniform(0.1, 2.0)

453

time.sleep(processing_time)

454

455

# Simulate occasional errors

456

error_chance = random.random()

457

if error_chance < 0.05:

458

raise ConnectionError("External service unavailable")

459

elif error_chance < 0.08:

460

raise TimeoutError("Request timeout")

461

elif error_chance < 0.1:

462

raise ValueError("Invalid request data")

463

464

# Success

465

request_count.labels(method, endpoint, '200').inc()

466

return f"Success: {method} {endpoint}"

467

468

except (ConnectionError, TimeoutError, ValueError):

469

# Error cases

470

request_count.labels(method, endpoint, '500').inc()

471

return f"Error: {method} {endpoint}"

472

473

# Start metrics server

474

start_http_server(8000)

475

476

# Simulate API traffic

477

def generate_traffic():

478

endpoints = ['/users', '/orders', '/products', '/health']

479

methods = ['GET', 'POST', 'PUT', 'DELETE']

480

481

while True:

482

endpoint = random.choice(endpoints)

483

method = random.choice(methods)

484

485

# Process request in separate thread to allow concurrency

486

thread = threading.Thread(

487

target=lambda: api_endpoint(method, endpoint),

488

daemon=True

489

)

490

thread.start()

491

492

# Variable request rate

493

time.sleep(random.uniform(0.1, 0.5))

494

495

# Run traffic generator

496

traffic_thread = threading.Thread(target=generate_traffic, daemon=True)

497

traffic_thread.start()

498

499

print("API simulation running with comprehensive instrumentation:")

500

print("- Request counting by method, endpoint, and status")

501

print("- Request duration timing")

502

print("- Active request tracking")

503

print("- Exception counting by type")

504

print("- Metrics available at http://localhost:8000")

505

506

try:

507

while True:

508

time.sleep(5)

509

# Print current status

510

print(f"Active requests: {sum(g._value._value for g in active_requests._metrics.values())}")

511

except KeyboardInterrupt:

512

print("Shutting down...")

513

```

514

515

### Advanced Context Manager Patterns

516

517

Custom context managers that combine multiple instrumentation types.

518

519

**Usage Example:**

520

521

```python

522

from prometheus_client import Counter, Gauge, Histogram

523

import contextlib

524

import time

525

526

# Create metrics

527

operation_count = Counter('operations_total', 'Operations', ['type', 'status'])

528

operation_duration = Histogram('operation_duration_seconds', 'Operation duration', ['type'])

529

active_operations = Gauge('operations_active', 'Active operations', ['type'])

530

operation_errors = Counter('operation_errors_total', 'Operation errors', ['type', 'error'])

531

532

@contextlib.contextmanager

533

def instrument_operation(operation_type):

534

"""Custom context manager that applies multiple instrumentations."""

535

536

# Start tracking

537

active_operations.labels(operation_type).inc()

538

start_time = time.time()

539

540

try:

541

yield

542

# Success

543

operation_count.labels(operation_type, 'success').inc()

544

545

except Exception as e:

546

# Error

547

operation_count.labels(operation_type, 'error').inc()

548

operation_errors.labels(operation_type, type(e).__name__).inc()

549

raise

550

551

finally:

552

# Always record duration and decrement active count

553

duration = time.time() - start_time

554

operation_duration.labels(operation_type).observe(duration)

555

active_operations.labels(operation_type).dec()

556

557

# Usage

558

with instrument_operation('database_query'):

559

time.sleep(0.1) # Simulate database query

560

561

with instrument_operation('api_call'):

562

if random.random() < 0.2:

563

raise ConnectionError("API failed")

564

time.sleep(0.2) # Simulate API call

565

566

# Decorator version

567

def instrumented_operation(operation_type):

568

def decorator(func):

569

def wrapper(*args, **kwargs):

570

with instrument_operation(operation_type):

571

return func(*args, **kwargs)

572

return wrapper

573

return decorator

574

575

@instrumented_operation('file_processing')

576

def process_file(filename):

577

time.sleep(random.uniform(0.5, 2.0))

578

if random.random() < 0.1:

579

raise IOError(f"Cannot read {filename}")

580

return f"Processed {filename}"

581

582

# Test the decorated function

583

for i in range(10):

584

try:

585

result = process_file(f"file_{i}.txt")

586

print(result)

587

except IOError as e:

588

print(f"Error: {e}")

589

```