0
# Context Managers
1
2
Context managers and decorators for automatic instrumentation including timing operations, counting exceptions, and tracking in-progress work. These utilities provide convenient ways to instrument code without manual metric updates.
3
4
## Capabilities
5
6
### Timer
7
8
A context manager and decorator that automatically measures elapsed time and records it to a metric. Works with Histogram, Summary, and Gauge metrics.
9
10
```python { .api }
11
class Timer:
12
def __init__(self, metric, callback_name):
13
"""
14
Create a Timer for the given metric.
15
16
Parameters:
17
- metric: Metric instance (Histogram, Summary, or Gauge)
18
- callback_name: Method name to call on metric ('observe' for Histogram/Summary, 'set' for Gauge)
19
"""
20
21
def __enter__(self):
22
"""Enter the timing context."""
23
return self
24
25
def __exit__(self, typ, value, traceback):
26
"""Exit the timing context and record elapsed time."""
27
28
def labels(self, *args, **kw) -> None:
29
"""
30
Update the Timer to use the labeled metric instance.
31
32
Parameters:
33
- args: Label values as positional arguments
34
- kw: Label values as keyword arguments
35
36
Note: Modifies the Timer instance in place
37
"""
38
39
def __call__(self, f) -> Callable:
40
"""
41
Use as a decorator.
42
43
Parameters:
44
- f: Function to decorate
45
46
Returns:
47
Decorated function that times execution
48
"""
49
```
50
51
**Usage Example:**
52
53
```python
54
from prometheus_client import Histogram, Summary, Gauge
55
import time
56
import random
57
58
# Create metrics that support timing
59
request_duration = Histogram(
60
'http_request_duration_seconds',
61
'HTTP request duration',
62
['method', 'endpoint']
63
)
64
65
response_time = Summary(
66
'http_response_time_seconds',
67
'HTTP response time summary'
68
)
69
70
last_request_duration = Gauge(
71
'last_request_duration_seconds',
72
'Duration of the last request'
73
)
74
75
# Context manager usage
76
with request_duration.labels('GET', '/api/users').time():
77
# Simulate API call
78
time.sleep(random.uniform(0.1, 0.5))
79
80
with response_time.time():
81
# Simulate processing
82
time.sleep(0.2)
83
84
# Gauge timing (records last duration)
85
with last_request_duration.time():
86
time.sleep(0.3)
87
88
# Decorator usage
89
@request_duration.labels('POST', '/api/orders').time()
90
def create_order():
91
time.sleep(random.uniform(0.2, 0.8))
92
return {'order_id': 12345}
93
94
@response_time.time()
95
def process_request():
96
time.sleep(random.uniform(0.1, 0.4))
97
return "processed"
98
99
# Call decorated functions
100
order = create_order()
101
result = process_request()
102
103
# Multiple label combinations
104
endpoints = ['/users', '/orders', '/products']
105
methods = ['GET', 'POST', 'PUT']
106
107
for endpoint in endpoints:
108
for method in methods:
109
with request_duration.labels(method, endpoint).time():
110
time.sleep(random.uniform(0.05, 0.2))
111
```
112
113
### ExceptionCounter
114
115
A context manager and decorator that automatically counts exceptions of specified types. Useful for monitoring error rates and debugging application issues.
116
117
```python { .api }
118
class ExceptionCounter:
119
def __init__(self, counter: "Counter", exception: Union[Type[BaseException], Tuple[Type[BaseException], ...]]) -> None:
120
"""
121
Create an ExceptionCounter.
122
123
Parameters:
124
- counter: Counter metric instance
125
- exception: Exception type(s) to count (single type or tuple of types)
126
"""
127
128
def __enter__(self) -> None:
129
"""Enter the exception counting context."""
130
131
def __exit__(self, typ, value, traceback) -> Literal[False]:
132
"""
133
Exit the context and count exceptions if they occurred.
134
135
Parameters:
136
- typ: Exception type (None if no exception)
137
- value: Exception instance
138
- traceback: Exception traceback
139
140
Returns:
141
False (does not suppress exceptions)
142
"""
143
144
def __call__(self, f) -> Callable:
145
"""
146
Use as a decorator.
147
148
Parameters:
149
- f: Function to decorate
150
151
Returns:
152
Decorated function that counts exceptions
153
"""
154
```
155
156
**Usage Example:**
157
158
```python
159
from prometheus_client import Counter
160
import random
161
import time
162
163
# Create counters for different exception types
164
http_errors = Counter(
165
'http_errors_total',
166
'Total HTTP errors',
167
['error_type', 'endpoint']
168
)
169
170
database_errors = Counter(
171
'database_errors_total',
172
'Database connection errors'
173
)
174
175
general_errors = Counter(
176
'application_errors_total',
177
'General application errors',
178
['function']
179
)
180
181
# Context manager usage - count specific exceptions
182
def risky_api_call():
183
if random.random() < 0.3:
184
raise ConnectionError("API unavailable")
185
elif random.random() < 0.2:
186
raise ValueError("Invalid response")
187
return "success"
188
189
# Count ConnectionError exceptions
190
with http_errors.labels('connection_error', '/api/external').count_exceptions(ConnectionError):
191
result = risky_api_call()
192
193
# Count multiple exception types
194
with http_errors.labels('client_error', '/api/users').count_exceptions((ValueError, TypeError)):
195
result = risky_api_call()
196
197
# Database operation with exception counting
198
def connect_to_database():
199
if random.random() < 0.1:
200
raise ConnectionError("Database unavailable")
201
return "connected"
202
203
with database_errors.count_exceptions(): # Counts all exceptions
204
connection = connect_to_database()
205
206
# Decorator usage
207
@general_errors.labels('data_processing').count_exceptions()
208
def process_data(data):
209
if not data:
210
raise ValueError("Empty data")
211
if len(data) > 1000:
212
raise MemoryError("Data too large")
213
return f"processed {len(data)} items"
214
215
@http_errors.labels('timeout', '/api/slow').count_exceptions(TimeoutError)
216
def slow_api_call():
217
time.sleep(2)
218
if random.random() < 0.2:
219
raise TimeoutError("Request timeout")
220
return "completed"
221
222
# Call decorated functions
223
try:
224
result = process_data([]) # Will raise ValueError and increment counter
225
except ValueError:
226
pass
227
228
try:
229
result = slow_api_call() # May raise TimeoutError and increment counter
230
except TimeoutError:
231
pass
232
233
# Labeled counters with exception counting
234
services = ['auth', 'payments', 'inventory']
235
error_types = ['connection', 'timeout', 'validation']
236
237
def simulate_service_call(service):
238
if random.random() < 0.15:
239
if random.random() < 0.5:
240
raise ConnectionError(f"{service} unavailable")
241
else:
242
raise TimeoutError(f"{service} timeout")
243
return f"{service} success"
244
245
for service in services:
246
# Count different error types separately
247
with http_errors.labels('connection', service).count_exceptions(ConnectionError):
248
with http_errors.labels('timeout', service).count_exceptions(TimeoutError):
249
try:
250
result = simulate_service_call(service)
251
print(f"{service}: {result}")
252
except (ConnectionError, TimeoutError) as e:
253
print(f"{service}: {e}")
254
```
255
256
### InprogressTracker
257
258
A context manager and decorator that tracks the number of operations currently in progress. Useful for monitoring concurrent operations, queue depths, and system load.
259
260
```python { .api }
261
class InprogressTracker:
262
def __init__(self, gauge):
263
"""
264
Create an InprogressTracker.
265
266
Parameters:
267
- gauge: Gauge metric instance to track in-progress operations
268
"""
269
270
def __enter__(self) -> None:
271
"""Enter the tracking context and increment the gauge."""
272
273
def __exit__(self, typ, value, traceback):
274
"""Exit the tracking context and decrement the gauge."""
275
276
def __call__(self, f) -> Callable:
277
"""
278
Use as a decorator.
279
280
Parameters:
281
- f: Function to decorate
282
283
Returns:
284
Decorated function that tracks execution
285
"""
286
```
287
288
**Usage Example:**
289
290
```python
291
from prometheus_client import Gauge
292
import time
293
import random
294
import threading
295
import concurrent.futures
296
297
# Create gauges for tracking in-progress operations
298
active_requests = Gauge(
299
'http_requests_inprogress',
300
'Number of HTTP requests currently being processed',
301
['endpoint']
302
)
303
304
database_connections = Gauge(
305
'database_connections_active',
306
'Number of active database connections'
307
)
308
309
background_jobs = Gauge(
310
'background_jobs_inprogress',
311
'Number of background jobs currently running',
312
['job_type']
313
)
314
315
# Context manager usage
316
def handle_request(endpoint):
317
with active_requests.labels(endpoint).track_inprogress():
318
# Simulate request processing
319
time.sleep(random.uniform(0.1, 1.0))
320
return f"Processed {endpoint}"
321
322
def database_query():
323
with database_connections.track_inprogress():
324
# Simulate database operation
325
time.sleep(random.uniform(0.05, 0.3))
326
return "query result"
327
328
# Decorator usage
329
@background_jobs.labels('email_sender').track_inprogress()
330
def send_email_batch():
331
time.sleep(random.uniform(1.0, 3.0))
332
return "emails sent"
333
334
@background_jobs.labels('data_sync').track_inprogress()
335
def sync_data():
336
time.sleep(random.uniform(2.0, 5.0))
337
return "data synced"
338
339
@active_requests.labels('/api/upload').track_inprogress()
340
def handle_file_upload():
341
time.sleep(random.uniform(0.5, 2.0))
342
return "file uploaded"
343
344
# Simulate concurrent operations
345
def simulate_concurrent_requests():
346
endpoints = ['/api/users', '/api/orders', '/api/products']
347
348
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
349
# Submit multiple concurrent requests
350
futures = []
351
for _ in range(10):
352
endpoint = random.choice(endpoints)
353
future = executor.submit(handle_request, endpoint)
354
futures.append(future)
355
356
# Wait for completion
357
for future in concurrent.futures.as_completed(futures):
358
result = future.result()
359
print(f"Request completed: {result}")
360
361
def simulate_database_load():
362
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
363
# Submit database queries
364
futures = [executor.submit(database_query) for _ in range(8)]
365
366
for future in concurrent.futures.as_completed(futures):
367
result = future.result()
368
print(f"Query completed: {result}")
369
370
def simulate_background_jobs():
371
job_functions = [send_email_batch, sync_data, handle_file_upload]
372
373
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
374
# Submit background jobs
375
futures = [executor.submit(random.choice(job_functions)) for _ in range(6)]
376
377
for future in concurrent.futures.as_completed(futures):
378
result = future.result()
379
print(f"Job completed: {result}")
380
381
# Run simulations in separate threads
382
request_thread = threading.Thread(target=simulate_concurrent_requests)
383
db_thread = threading.Thread(target=simulate_database_load)
384
job_thread = threading.Thread(target=simulate_background_jobs)
385
386
request_thread.start()
387
db_thread.start()
388
job_thread.start()
389
390
# Monitor progress
391
for i in range(10):
392
time.sleep(1)
393
print(f"Active requests: {active_requests._value._value}")
394
print(f"DB connections: {database_connections._value._value}")
395
print(f"Background jobs: {background_jobs._child_samples()}")
396
397
request_thread.join()
398
db_thread.join()
399
job_thread.join()
400
```
401
402
### Combined Usage
403
404
Example showing how to use multiple context managers together for comprehensive instrumentation.
405
406
**Usage Example:**
407
408
```python
409
from prometheus_client import Counter, Gauge, Histogram, start_http_server
410
import time
411
import random
412
import threading
413
414
# Create comprehensive metrics
415
request_count = Counter(
416
'api_requests_total',
417
'Total API requests',
418
['method', 'endpoint', 'status']
419
)
420
421
request_duration = Histogram(
422
'api_request_duration_seconds',
423
'API request duration',
424
['method', 'endpoint']
425
)
426
427
active_requests = Gauge(
428
'api_requests_inprogress',
429
'Active API requests',
430
['endpoint']
431
)
432
433
request_errors = Counter(
434
'api_request_errors_total',
435
'API request errors',
436
['endpoint', 'error_type']
437
)
438
439
def api_endpoint(method, endpoint):
440
"""Fully instrumented API endpoint handler."""
441
442
# Track active requests
443
with active_requests.labels(endpoint).track_inprogress():
444
# Time the request
445
with request_duration.labels(method, endpoint).time():
446
# Count exceptions
447
with request_errors.labels(endpoint, 'connection').count_exceptions(ConnectionError):
448
with request_errors.labels(endpoint, 'timeout').count_exceptions(TimeoutError):
449
with request_errors.labels(endpoint, 'validation').count_exceptions(ValueError):
450
try:
451
# Simulate API processing
452
processing_time = random.uniform(0.1, 2.0)
453
time.sleep(processing_time)
454
455
# Simulate occasional errors
456
error_chance = random.random()
457
if error_chance < 0.05:
458
raise ConnectionError("External service unavailable")
459
elif error_chance < 0.08:
460
raise TimeoutError("Request timeout")
461
elif error_chance < 0.1:
462
raise ValueError("Invalid request data")
463
464
# Success
465
request_count.labels(method, endpoint, '200').inc()
466
return f"Success: {method} {endpoint}"
467
468
except (ConnectionError, TimeoutError, ValueError):
469
# Error cases
470
request_count.labels(method, endpoint, '500').inc()
471
return f"Error: {method} {endpoint}"
472
473
# Start metrics server
474
start_http_server(8000)
475
476
# Simulate API traffic
477
def generate_traffic():
478
endpoints = ['/users', '/orders', '/products', '/health']
479
methods = ['GET', 'POST', 'PUT', 'DELETE']
480
481
while True:
482
endpoint = random.choice(endpoints)
483
method = random.choice(methods)
484
485
# Process request in separate thread to allow concurrency
486
thread = threading.Thread(
487
target=lambda: api_endpoint(method, endpoint),
488
daemon=True
489
)
490
thread.start()
491
492
# Variable request rate
493
time.sleep(random.uniform(0.1, 0.5))
494
495
# Run traffic generator
496
traffic_thread = threading.Thread(target=generate_traffic, daemon=True)
497
traffic_thread.start()
498
499
print("API simulation running with comprehensive instrumentation:")
500
print("- Request counting by method, endpoint, and status")
501
print("- Request duration timing")
502
print("- Active request tracking")
503
print("- Exception counting by type")
504
print("- Metrics available at http://localhost:8000")
505
506
try:
507
while True:
508
time.sleep(5)
509
# Print current status
510
print(f"Active requests: {sum(g._value._value for g in active_requests._metrics.values())}")
511
except KeyboardInterrupt:
512
print("Shutting down...")
513
```
514
515
### Advanced Context Manager Patterns
516
517
Custom context managers that combine multiple instrumentation types.
518
519
**Usage Example:**
520
521
```python
522
from prometheus_client import Counter, Gauge, Histogram
523
import contextlib
524
import time
525
526
# Create metrics
527
operation_count = Counter('operations_total', 'Operations', ['type', 'status'])
528
operation_duration = Histogram('operation_duration_seconds', 'Operation duration', ['type'])
529
active_operations = Gauge('operations_active', 'Active operations', ['type'])
530
operation_errors = Counter('operation_errors_total', 'Operation errors', ['type', 'error'])
531
532
@contextlib.contextmanager
533
def instrument_operation(operation_type):
534
"""Custom context manager that applies multiple instrumentations."""
535
536
# Start tracking
537
active_operations.labels(operation_type).inc()
538
start_time = time.time()
539
540
try:
541
yield
542
# Success
543
operation_count.labels(operation_type, 'success').inc()
544
545
except Exception as e:
546
# Error
547
operation_count.labels(operation_type, 'error').inc()
548
operation_errors.labels(operation_type, type(e).__name__).inc()
549
raise
550
551
finally:
552
# Always record duration and decrement active count
553
duration = time.time() - start_time
554
operation_duration.labels(operation_type).observe(duration)
555
active_operations.labels(operation_type).dec()
556
557
# Usage
558
with instrument_operation('database_query'):
559
time.sleep(0.1) # Simulate database query
560
561
with instrument_operation('api_call'):
562
if random.random() < 0.2:
563
raise ConnectionError("API failed")
564
time.sleep(0.2) # Simulate API call
565
566
# Decorator version
567
def instrumented_operation(operation_type):
568
def decorator(func):
569
def wrapper(*args, **kwargs):
570
with instrument_operation(operation_type):
571
return func(*args, **kwargs)
572
return wrapper
573
return decorator
574
575
@instrumented_operation('file_processing')
576
def process_file(filename):
577
time.sleep(random.uniform(0.5, 2.0))
578
if random.random() < 0.1:
579
raise IOError(f"Cannot read {filename}")
580
return f"Processed {filename}"
581
582
# Test the decorated function
583
for i in range(10):
584
try:
585
result = process_file(f"file_{i}.txt")
586
print(result)
587
except IOError as e:
588
print(f"Error: {e}")
589
```