0
# Exposition
1
2
Functions and servers for outputting metrics in Prometheus format, including HTTP servers, WSGI/ASGI applications, file output, and push gateway integration. These components enable metrics to be scraped by Prometheus or pushed to centralized gateways.
3
4
## Capabilities
5
6
### Content Generation
7
8
Core functions for generating Prometheus format output from collected metrics.
9
10
```python { .api }
11
CONTENT_TYPE_LATEST: str = 'text/plain; version=0.0.4; charset=utf-8'
12
13
def generate_latest(registry: CollectorRegistry = REGISTRY) -> bytes:
14
"""
15
Generate the latest Prometheus text format output.
16
17
Parameters:
18
- registry: Registry to collect metrics from
19
20
Returns:
21
Metrics in Prometheus text format as bytes
22
"""
23
24
def choose_encoder(accept_header: str) -> Tuple[Callable, str]:
25
"""
26
Choose appropriate encoder based on Accept header.
27
28
Parameters:
29
- accept_header: HTTP Accept header string
30
31
Returns:
32
Tuple of (encoder_function, content_type)
33
"""
34
```
35
36
**Usage Example:**
37
38
```python
39
from prometheus_client import Counter, generate_latest, CONTENT_TYPE_LATEST
40
41
# Create some metrics
42
requests = Counter('http_requests_total', 'Total HTTP requests')
43
requests.inc(42)
44
45
# Generate Prometheus format output
46
metrics_data = generate_latest()
47
print(metrics_data.decode('utf-8'))
48
49
# Use in HTTP response
50
content_type = CONTENT_TYPE_LATEST
51
# Return metrics_data with content_type header
52
```
53
54
### HTTP Servers
55
56
Built-in HTTP servers for exposing metrics endpoints that Prometheus can scrape.
57
58
```python { .api }
59
def start_http_server(
60
port: int,
61
addr: str = '0.0.0.0',
62
registry: CollectorRegistry = REGISTRY,
63
certfile=None,
64
keyfile=None,
65
client_cafile=None,
66
client_capath=None,
67
protocol=ssl.PROTOCOL_TLS_SERVER,
68
client_auth_required: bool = False
69
) -> Tuple[WSGIServer, threading.Thread]:
70
"""
71
Start HTTP server for metrics.
72
73
Parameters:
74
- port: Port to listen on
75
- addr: Address to bind to
76
- registry: Registry to serve metrics from
77
- certfile: SSL certificate file for HTTPS
78
- keyfile: SSL private key file for HTTPS
79
- client_cafile: CA file for client certificate validation
80
- client_capath: CA directory for client certificate validation
81
- protocol: SSL protocol version
82
- client_auth_required: Whether to require client certificates
83
84
Returns:
85
Tuple of (server, thread) - server can be used to shutdown
86
"""
87
88
start_wsgi_server = start_http_server # Alias for backward compatibility
89
```
90
91
**Usage Example:**
92
93
```python
94
from prometheus_client import Counter, start_http_server
95
import time
96
import threading
97
98
# Create metrics
99
requests = Counter('app_requests_total', 'Total application requests')
100
101
# Start metrics server
102
server, thread = start_http_server(8000)
103
print("Metrics server started on http://localhost:8000")
104
105
# Application logic
106
def app_work():
107
while True:
108
requests.inc()
109
time.sleep(1)
110
111
# Run application
112
app_thread = threading.Thread(target=app_work, daemon=True)
113
app_thread.start()
114
115
# Keep main thread alive
116
try:
117
while True:
118
time.sleep(1)
119
except KeyboardInterrupt:
120
print("Shutting down...")
121
server.shutdown()
122
123
# HTTPS example
124
server, thread = start_http_server(
125
8443,
126
certfile='/path/to/cert.pem',
127
keyfile='/path/to/key.pem'
128
)
129
```
130
131
### WSGI Integration
132
133
WSGI application factory for integrating metrics into existing web applications.
134
135
```python { .api }
136
def make_wsgi_app(
137
registry: CollectorRegistry = REGISTRY,
138
disable_compression: bool = False
139
) -> Callable:
140
"""
141
Create a WSGI application that serves metrics.
142
143
Parameters:
144
- registry: Registry to serve metrics from
145
- disable_compression: Whether to disable gzip compression
146
147
Returns:
148
WSGI application callable
149
"""
150
```
151
152
**Usage Example:**
153
154
```python
155
from prometheus_client import Counter, make_wsgi_app
156
from wsgiref.simple_server import make_server
157
from urllib.parse import parse_qs
158
159
# Create metrics
160
requests = Counter('wsgi_requests_total', 'Total WSGI requests', ['path'])
161
162
# Create metrics WSGI app
163
metrics_app = make_wsgi_app()
164
165
# Main application
166
def application(environ, start_response):
167
path = environ['PATH_INFO']
168
169
if path == '/metrics':
170
# Serve metrics
171
return metrics_app(environ, start_response)
172
elif path == '/':
173
# Main application logic
174
requests.labels(path='/').inc()
175
status = '200 OK'
176
headers = [('Content-type', 'text/plain')]
177
start_response(status, headers)
178
return [b'Hello World!']
179
else:
180
# 404
181
requests.labels(path=path).inc()
182
status = '404 Not Found'
183
headers = [('Content-type', 'text/plain')]
184
start_response(status, headers)
185
return [b'Not Found']
186
187
# Run server
188
with make_server('', 8000, application) as httpd:
189
print("Server running on http://localhost:8000")
190
print("Metrics available at http://localhost:8000/metrics")
191
httpd.serve_forever()
192
```
193
194
### ASGI Integration
195
196
ASGI application factory for integrating metrics into modern async web applications.
197
198
```python { .api }
199
def make_asgi_app(
200
registry: CollectorRegistry = REGISTRY,
201
disable_compression: bool = False
202
) -> Callable:
203
"""
204
Create an ASGI application that serves metrics.
205
206
Parameters:
207
- registry: Registry to serve metrics from
208
- disable_compression: Whether to disable gzip compression
209
210
Returns:
211
ASGI application callable
212
"""
213
```
214
215
**Usage Example:**
216
217
```python
218
from prometheus_client import Counter, make_asgi_app
219
import uvicorn
220
221
# Create metrics
222
requests = Counter('asgi_requests_total', 'Total ASGI requests', ['path'])
223
224
# Create metrics ASGI app
225
metrics_app = make_asgi_app()
226
227
# Main ASGI application
228
async def application(scope, receive, send):
229
if scope['type'] == 'http':
230
path = scope['path']
231
232
if path == '/metrics':
233
# Serve metrics
234
await metrics_app(scope, receive, send)
235
elif path == '/':
236
# Main application logic
237
requests.labels(path='/').inc()
238
await send({
239
'type': 'http.response.start',
240
'status': 200,
241
'headers': [[b'content-type', b'text/plain']],
242
})
243
await send({
244
'type': 'http.response.body',
245
'body': b'Hello World!',
246
})
247
else:
248
# 404
249
requests.labels(path=path).inc()
250
await send({
251
'type': 'http.response.start',
252
'status': 404,
253
'headers': [[b'content-type', b'text/plain']],
254
})
255
await send({
256
'type': 'http.response.body',
257
'body': b'Not Found',
258
})
259
260
# Run with uvicorn
261
if __name__ == "__main__":
262
uvicorn.run(application, host="0.0.0.0", port=8000)
263
```
264
265
### MetricsHandler
266
267
HTTP request handler class for serving metrics, useful for custom HTTP server implementations.
268
269
```python { .api }
270
class MetricsHandler(BaseHTTPRequestHandler):
271
registry: CollectorRegistry = REGISTRY
272
273
def do_GET(self) -> None:
274
"""Handle GET requests to serve metrics."""
275
276
@classmethod
277
def factory(cls, registry: CollectorRegistry) -> type:
278
"""
279
Create a MetricsHandler class with custom registry.
280
281
Parameters:
282
- registry: Registry to serve metrics from
283
284
Returns:
285
MetricsHandler class configured with the registry
286
"""
287
```
288
289
**Usage Example:**
290
291
```python
292
from prometheus_client import Counter, MetricsHandler, CollectorRegistry
293
from http.server import HTTPServer
294
import threading
295
296
# Create custom registry
297
custom_registry = CollectorRegistry()
298
requests = Counter('custom_requests_total', 'Custom requests', registry=custom_registry)
299
requests.inc(10)
300
301
# Create handler with custom registry
302
CustomHandler = MetricsHandler.factory(custom_registry)
303
304
# Start server
305
server = HTTPServer(('', 8000), CustomHandler)
306
thread = threading.Thread(target=server.serve_forever, daemon=True)
307
thread.start()
308
309
print("Custom metrics server running on http://localhost:8000")
310
```
311
312
### File Output
313
314
Write metrics to text files for consumption by node exporter or other file-based collectors.
315
316
```python { .api }
317
def write_to_textfile(path: str, registry: CollectorRegistry) -> None:
318
"""
319
Write metrics to a text file in Prometheus format.
320
321
Parameters:
322
- path: File path to write to
323
- registry: Registry to collect metrics from
324
325
Note: This function is atomic - it writes to a temporary file and then renames it.
326
"""
327
```
328
329
**Usage Example:**
330
331
```python
332
from prometheus_client import Counter, CollectorRegistry, write_to_textfile
333
import os
334
import time
335
336
# Create registry with metrics
337
registry = CollectorRegistry()
338
batch_jobs = Counter('batch_jobs_completed_total', 'Completed batch jobs', registry=registry)
339
batch_duration = Counter('batch_processing_seconds_total', 'Total batch processing time', registry=registry)
340
341
# Simulate batch processing
342
start_time = time.time()
343
# ... do batch work ...
344
batch_jobs.inc()
345
batch_duration.inc(time.time() - start_time)
346
347
# Write metrics to file for node exporter
348
metrics_dir = '/var/lib/node_exporter/textfile_collector'
349
os.makedirs(metrics_dir, exist_ok=True)
350
write_to_textfile(f'{metrics_dir}/batch_metrics.prom', registry)
351
352
# File will contain:
353
# # HELP batch_jobs_completed_total Completed batch jobs
354
# # TYPE batch_jobs_completed_total counter
355
# batch_jobs_completed_total 1.0
356
# # HELP batch_processing_seconds_total Total batch processing time
357
# # TYPE batch_processing_seconds_total counter
358
# batch_processing_seconds_total 0.123
359
```
360
361
### Push Gateway Integration
362
363
Functions for pushing metrics to Prometheus push gateway for batch jobs and short-lived processes.
364
365
```python { .api }
366
def push_to_gateway(
367
gateway: str,
368
job: str,
369
registry: CollectorRegistry,
370
grouping_key=None,
371
timeout: Optional[float] = 30,
372
handler: Callable = default_handler
373
) -> None:
374
"""
375
Push metrics to the push gateway (PUT method - replaces all metrics for job).
376
377
Parameters:
378
- gateway: Push gateway URL (e.g., 'localhost:9091')
379
- job: Job name for grouping
380
- registry: Registry to push metrics from
381
- grouping_key: Additional labels for grouping
382
- timeout: Request timeout in seconds
383
- handler: Custom authentication/request handler
384
"""
385
386
def pushadd_to_gateway(
387
gateway: str,
388
job: str,
389
registry: Optional[CollectorRegistry],
390
grouping_key=None,
391
timeout: Optional[float] = 30,
392
handler: Callable = default_handler
393
) -> None:
394
"""
395
Push-add metrics to the push gateway (POST method - adds to existing metrics).
396
397
Parameters:
398
- gateway: Push gateway URL
399
- job: Job name for grouping
400
- registry: Registry to push metrics from
401
- grouping_key: Additional labels for grouping
402
- timeout: Request timeout in seconds
403
- handler: Custom authentication/request handler
404
"""
405
406
def delete_from_gateway(
407
gateway: str,
408
job: str,
409
grouping_key=None,
410
timeout: Optional[float] = 30,
411
handler: Callable = default_handler
412
) -> None:
413
"""
414
Delete metrics from the push gateway.
415
416
Parameters:
417
- gateway: Push gateway URL
418
- job: Job name to delete
419
- grouping_key: Additional labels for grouping
420
- timeout: Request timeout in seconds
421
- handler: Custom authentication/request handler
422
"""
423
424
def instance_ip_grouping_key() -> Dict[str, Any]:
425
"""
426
Get grouping key with instance IP address.
427
428
Returns:
429
Dictionary with 'instance' key containing local IP address
430
"""
431
```
432
433
**Usage Example:**
434
435
```python
436
from prometheus_client import Counter, CollectorRegistry, push_to_gateway, instance_ip_grouping_key
437
import time
438
439
# Create registry for batch job
440
registry = CollectorRegistry()
441
job_duration = Counter('batch_job_duration_seconds', 'Batch job duration', registry=registry)
442
records_processed = Counter('batch_records_processed_total', 'Records processed', registry=registry)
443
444
# Simulate batch job
445
start_time = time.time()
446
447
# Process records
448
for i in range(1000):
449
# Process record
450
records_processed.inc()
451
time.sleep(0.001) # Simulate work
452
453
# Record total duration
454
job_duration.inc(time.time() - start_time)
455
456
# Push metrics to gateway
457
push_to_gateway(
458
'pushgateway.example.com:9091',
459
job='batch_processor',
460
registry=registry,
461
grouping_key={'batch_id': '20231015_001'}
462
)
463
464
# Use instance IP for grouping
465
push_to_gateway(
466
'localhost:9091',
467
job='worker',
468
registry=registry,
469
grouping_key=instance_ip_grouping_key()
470
)
471
472
# Clean up when job is done
473
delete_from_gateway(
474
'localhost:9091',
475
job='batch_processor',
476
grouping_key={'batch_id': '20231015_001'}
477
)
478
```
479
480
### Authentication Handlers
481
482
Authentication handlers for push gateway and other HTTP-based exposition methods.
483
484
```python { .api }
485
def default_handler(
486
url: str,
487
method: str,
488
timeout: Optional[float],
489
headers: List,
490
data: bytes
491
) -> Callable:
492
"""Default HTTP handler with no authentication."""
493
494
def basic_auth_handler(
495
url: str,
496
method: str,
497
timeout: Optional[float],
498
headers: List,
499
data: bytes,
500
username=None,
501
password=None
502
) -> Callable:
503
"""
504
HTTP handler with basic authentication.
505
506
Parameters:
507
- username: Username for basic auth
508
- password: Password for basic auth
509
"""
510
511
def tls_auth_handler(
512
url: str,
513
method: str,
514
timeout: Optional[float],
515
headers: List,
516
data: bytes,
517
certfile: str,
518
keyfile: str,
519
cafile=None,
520
protocol=ssl.PROTOCOL_TLS_CLIENT,
521
insecure_skip_verify: bool = False
522
) -> Callable:
523
"""
524
HTTP handler with TLS client certificate authentication.
525
526
Parameters:
527
- certfile: Client certificate file
528
- keyfile: Client private key file
529
- cafile: CA certificate file for server verification
530
- protocol: SSL protocol version
531
- insecure_skip_verify: Skip server certificate verification
532
"""
533
534
def passthrough_redirect_handler(
535
url: str,
536
method: str,
537
timeout: Optional[float],
538
headers: List,
539
data: bytes
540
) -> Callable:
541
"""HTTP handler that follows redirects."""
542
```
543
544
**Usage Example:**
545
546
```python
547
from prometheus_client import Counter, push_to_gateway, basic_auth_handler, tls_auth_handler
548
549
registry = CollectorRegistry()
550
counter = Counter('authenticated_pushes', 'Pushes with auth', registry=registry)
551
counter.inc()
552
553
# Push with basic authentication
554
push_to_gateway(
555
'secure-gateway.example.com:9091',
556
job='authenticated_job',
557
registry=registry,
558
handler=lambda url, method, timeout, headers, data: basic_auth_handler(
559
url, method, timeout, headers, data,
560
username='user',
561
password='pass'
562
)
563
)
564
565
# Push with TLS client certificate
566
push_to_gateway(
567
'tls-gateway.example.com:9091',
568
job='tls_job',
569
registry=registry,
570
handler=lambda url, method, timeout, headers, data: tls_auth_handler(
571
url, method, timeout, headers, data,
572
certfile='/path/to/client.crt',
573
keyfile='/path/to/client.key',
574
cafile='/path/to/ca.crt'
575
)
576
)
577
```