or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced.mdcollectors.mdcontext-managers.mdcore-metrics.mdexposition.mdindex.mdregistry.md

advanced.mddocs/

0

# Advanced Features

1

2

Advanced functionality for building custom metric collectors, multiprocess support, and specialized integrations including metric families, sample types, and bridge components. These features enable sophisticated monitoring scenarios and integration with complex deployment architectures.

3

4

## Capabilities

5

6

### Custom Metric Collectors

7

8

Building custom collectors that generate metrics dynamically, integrate with external systems, or provide specialized monitoring capabilities.

9

10

```python { .api }

11

class Collector:

12

def collect(self) -> Iterable[Metric]:

13

"""

14

Collect and return metrics.

15

16

This abstract method must be implemented by all collectors.

17

18

Returns:

19

Iterable of Metric objects

20

"""

21

22

def describe(self) -> Iterable[Metric]:

23

"""

24

Describe the metrics that this collector will provide.

25

26

Default implementation returns collect() results.

27

Override for better performance if metrics are expensive to collect.

28

29

Returns:

30

Iterable of Metric objects (without samples)

31

"""

32

```

33

34

**Usage Example:**

35

36

```python

37

from prometheus_client import Collector, Metric, CollectorRegistry, start_http_server

38

import psutil

39

import json

40

import requests

41

import time

42

43

class SystemResourceCollector(Collector):

44

"""Comprehensive system resource collector."""

45

46

def collect(self):

47

# CPU metrics

48

cpu_metric = Metric('system_cpu_usage_percent', 'CPU usage by core', 'gauge')

49

for i, percent in enumerate(psutil.cpu_percent(percpu=True)):

50

cpu_metric.add_sample('system_cpu_usage_percent', {'core': str(i)}, percent)

51

yield cpu_metric

52

53

# Memory metrics

54

memory = psutil.virtual_memory()

55

memory_metric = Metric('system_memory_bytes', 'System memory usage', 'gauge')

56

memory_metric.add_sample('system_memory_bytes', {'type': 'total'}, memory.total)

57

memory_metric.add_sample('system_memory_bytes', {'type': 'available'}, memory.available)

58

memory_metric.add_sample('system_memory_bytes', {'type': 'used'}, memory.used)

59

memory_metric.add_sample('system_memory_bytes', {'type': 'cached'}, memory.cached)

60

yield memory_metric

61

62

# Disk I/O

63

disk_io = psutil.disk_io_counters()

64

if disk_io:

65

io_metric = Metric('system_disk_io_bytes_total', 'Disk I/O bytes', 'counter')

66

io_metric.add_sample('system_disk_io_bytes_total', {'direction': 'read'}, disk_io.read_bytes)

67

io_metric.add_sample('system_disk_io_bytes_total', {'direction': 'write'}, disk_io.write_bytes)

68

yield io_metric

69

70

class APIHealthCollector(Collector):

71

"""Collector that monitors external API health."""

72

73

def __init__(self, services):

74

self.services = services # Dict of service_name -> url

75

76

def collect(self):

77

health_metric = Metric('external_service_up', 'External service availability', 'gauge')

78

response_time_metric = Metric('external_service_response_time_seconds', 'Response time', 'gauge')

79

80

for service_name, url in self.services.items():

81

try:

82

start_time = time.time()

83

response = requests.get(f"{url}/health", timeout=5)

84

response_time = time.time() - start_time

85

86

# Service is up if we get any response

87

health_metric.add_sample('external_service_up', {'service': service_name}, 1.0)

88

response_time_metric.add_sample(

89

'external_service_response_time_seconds',

90

{'service': service_name},

91

response_time

92

)

93

94

except (requests.RequestException, requests.Timeout):

95

# Service is down

96

health_metric.add_sample('external_service_up', {'service': service_name}, 0.0)

97

98

yield health_metric

99

yield response_time_metric

100

101

class DatabaseCollector(Collector):

102

"""Collector for database metrics."""

103

104

def __init__(self, db_connection):

105

self.db_connection = db_connection

106

107

def collect(self):

108

# Query database for metrics

109

try:

110

cursor = self.db_connection.cursor()

111

112

# Table sizes

113

cursor.execute("SELECT table_name, table_rows FROM information_schema.tables WHERE table_schema = 'myapp'")

114

table_metric = Metric('database_table_rows', 'Rows in database tables', 'gauge')

115

116

for table_name, row_count in cursor.fetchall():

117

table_metric.add_sample('database_table_rows', {'table': table_name}, row_count or 0)

118

119

yield table_metric

120

121

# Connection pool status

122

cursor.execute("SHOW STATUS LIKE 'Threads_connected'")

123

result = cursor.fetchone()

124

if result:

125

conn_metric = Metric('database_connections_active', 'Active database connections', 'gauge')

126

conn_metric.add_sample('database_connections_active', {}, float(result[1]))

127

yield conn_metric

128

129

except Exception as e:

130

# Return error metric if database is unavailable

131

error_metric = Metric('database_collector_errors_total', 'Database collector errors', 'counter')

132

error_metric.add_sample('database_collector_errors_total', {'error': str(type(e).__name__)}, 1.0)

133

yield error_metric

134

135

# Register custom collectors

136

registry = CollectorRegistry()

137

138

# System resources

139

system_collector = SystemResourceCollector()

140

registry.register(system_collector)

141

142

# External services

143

api_services = {

144

'user_service': 'http://user-service:8080',

145

'payment_service': 'http://payment-service:8080',

146

'inventory_service': 'http://inventory-service:8080'

147

}

148

api_collector = APIHealthCollector(api_services)

149

registry.register(api_collector)

150

151

# Database (mock connection for example)

152

class MockDBConnection:

153

def cursor(self):

154

return MockCursor()

155

156

class MockCursor:

157

def execute(self, query):

158

pass

159

def fetchall(self):

160

return [('users', 1000), ('orders', 500), ('products', 200)]

161

def fetchone(self):

162

return ('Threads_connected', '25')

163

164

db_collector = DatabaseCollector(MockDBConnection())

165

registry.register(db_collector)

166

167

# Start server with custom collectors

168

start_http_server(8000, registry=registry)

169

```

170

171

### Metric Families for Custom Collectors

172

173

Pre-built metric family classes that simplify creating metrics in custom collectors.

174

175

```python { .api }

176

class CounterMetricFamily(Metric):

177

def __init__(self, name: str, documentation: str, value: Optional[float] = None,

178

labels: Optional[Sequence[str]] = None, created: Optional[float] = None,

179

unit: str = '', exemplar: Optional[Exemplar] = None) -> None:

180

"""Create a CounterMetricFamily for custom collectors."""

181

182

def add_metric(self, labels: Sequence[str], value: float, created: Optional[float] = None,

183

timestamp: Optional[Union[Timestamp, float]] = None,

184

exemplar: Optional[Exemplar] = None) -> None:

185

"""Add a sample to this counter metric family."""

186

187

class GaugeMetricFamily(Metric):

188

def __init__(self, name: str, documentation: str, value=None, labels=None, unit='') -> None:

189

"""Create a GaugeMetricFamily for custom collectors."""

190

191

def add_metric(self, labels: Sequence[str], value: float, timestamp=None) -> None:

192

"""Add a sample to this gauge metric family."""

193

194

class HistogramMetricFamily(Metric):

195

def __init__(self, name: str, documentation: str, buckets=None, sum_value=None, labels=None, unit='') -> None:

196

"""Create a HistogramMetricFamily for custom collectors."""

197

198

def add_metric(self, labels: Sequence[str], buckets: Sequence, sum_value: Optional[float], timestamp=None) -> None:

199

"""Add a sample to this histogram metric family."""

200

201

class SummaryMetricFamily(Metric):

202

def __init__(self, name: str, documentation: str, count_value=None, sum_value=None, labels=None, unit='') -> None:

203

"""Create a SummaryMetricFamily for custom collectors."""

204

205

def add_metric(self, labels: Sequence[str], count_value: int, sum_value: float, timestamp=None) -> None:

206

"""Add a sample to this summary metric family."""

207

208

class InfoMetricFamily(Metric):

209

def __init__(self, name: str, documentation: str, value=None, labels=None) -> None:

210

"""Create an InfoMetricFamily for custom collectors."""

211

212

def add_metric(self, labels: Sequence[str], value: Dict[str, str], timestamp=None) -> None:

213

"""Add a sample to this info metric family."""

214

215

class StateSetMetricFamily(Metric):

216

def __init__(self, name: str, documentation: str, value=None, labels=None) -> None:

217

"""Create a StateSetMetricFamily for custom collectors."""

218

219

def add_metric(self, labels: Sequence[str], value: Dict[str, bool], timestamp=None) -> None:

220

"""Add a sample to this state set metric family."""

221

222

class GaugeHistogramMetricFamily(Metric):

223

def __init__(self, name: str, documentation: str, buckets: Optional[Sequence[Tuple[str, float]]] = None,

224

gsum_value: Optional[float] = None, labels: Optional[Sequence[str]] = None,

225

unit: str = '') -> None:

226

"""Create a GaugeHistogramMetricFamily for custom collectors."""

227

228

def add_metric(self, labels: Sequence[str], buckets: Sequence[Tuple[str, float]],

229

gsum_value: Optional[float], timestamp: Optional[Union[float, Timestamp]] = None) -> None:

230

"""Add a sample to this gauge histogram metric family."""

231

```

232

233

**Usage Example:**

234

235

```python

236

from prometheus_client import (

237

Collector, CounterMetricFamily, GaugeMetricFamily,

238

HistogramMetricFamily, SummaryMetricFamily, InfoMetricFamily,

239

CollectorRegistry

240

)

241

import time

242

import random

243

244

class ApplicationMetricsCollector(Collector):

245

"""Example collector using metric families."""

246

247

def collect(self):

248

# Counter family - request counts by endpoint

249

request_counter = CounterMetricFamily(

250

'app_requests_total',

251

'Total application requests',

252

labels=['endpoint', 'method']

253

)

254

255

# Simulate request data

256

endpoints_data = {

257

('/api/users', 'GET'): 1500,

258

('/api/users', 'POST'): 300,

259

('/api/orders', 'GET'): 800,

260

('/api/orders', 'POST'): 200,

261

}

262

263

for (endpoint, method), count in endpoints_data.items():

264

request_counter.add_metric([endpoint, method], count)

265

266

yield request_counter

267

268

# Gauge family - current resource usage

269

resource_gauge = GaugeMetricFamily(

270

'app_resource_usage',

271

'Current resource usage',

272

labels=['resource', 'unit']

273

)

274

275

resource_gauge.add_metric(['memory', 'bytes'], 1073741824) # 1GB

276

resource_gauge.add_metric(['cpu', 'percent'], 45.2)

277

resource_gauge.add_metric(['connections', 'count'], 127)

278

279

yield resource_gauge

280

281

# Histogram family - response times

282

response_histogram = HistogramMetricFamily(

283

'app_response_time_seconds',

284

'Response time distribution',

285

labels=['endpoint']

286

)

287

288

# Simulate histogram data (buckets with cumulative counts)

289

buckets = [

290

('0.1', 100), # 100 requests <= 0.1s

291

('0.25', 200), # 200 requests <= 0.25s

292

('0.5', 280), # 280 requests <= 0.5s

293

('1.0', 300), # 300 requests <= 1.0s

294

('+Inf', 305), # 305 total requests

295

]

296

297

response_histogram.add_metric(['/api/users'], buckets, 45.7) # sum=45.7

298

response_histogram.add_metric(['/api/orders'], buckets, 32.1) # sum=32.1

299

300

yield response_histogram

301

302

# Summary family - request sizes

303

size_summary = SummaryMetricFamily(

304

'app_request_size_bytes',

305

'Request size summary',

306

labels=['endpoint']

307

)

308

309

size_summary.add_metric(['/api/users'], count_value=500, sum_value=250000)

310

size_summary.add_metric(['/api/orders'], count_value=300, sum_value=150000)

311

312

yield size_summary

313

314

# Info family - application metadata

315

app_info = InfoMetricFamily(

316

'app_info',

317

'Application information'

318

)

319

320

app_info.add_metric([], {

321

'version': '2.1.0',

322

'build_date': '2023-10-15',

323

'git_commit': 'abc123def',

324

'environment': 'production'

325

})

326

327

yield app_info

328

329

# Use the collector

330

registry = CollectorRegistry()

331

app_collector = ApplicationMetricsCollector()

332

registry.register(app_collector)

333

334

# Generate output

335

from prometheus_client import generate_latest

336

output = generate_latest(registry)

337

print(output.decode('utf-8'))

338

```

339

340

### Multiprocess Support

341

342

Support for collecting metrics across multiple processes using shared memory files.

343

344

```python { .api }

345

class MultiProcessCollector:

346

def __init__(self, registry, path=None) -> None:

347

"""

348

Create a MultiProcessCollector.

349

350

Parameters:

351

- registry: Registry to register with

352

- path: Path to multiprocess metrics directory (defaults to PROMETHEUS_MULTIPROC_DIR env var)

353

"""

354

355

@staticmethod

356

def merge(files, accumulate=True):

357

"""

358

Merge metrics from multiple process files.

359

360

Parameters:

361

- files: List of file paths containing metrics

362

- accumulate: Whether to accumulate values across processes

363

364

Returns:

365

Dictionary of merged metrics

366

"""

367

```

368

369

**Usage Example:**

370

371

```python

372

import os

373

import multiprocessing

374

import time

375

from prometheus_client import Counter, Gauge, Histogram, CollectorRegistry, generate_latest

376

from prometheus_client.multiprocess import MultiProcessCollector

377

378

# Set up multiprocess directory

379

os.environ['PROMETHEUS_MULTIPROC_DIR'] = '/tmp/prometheus_multiproc'

380

os.makedirs('/tmp/prometheus_multiproc', exist_ok=True)

381

382

def worker_function(worker_id, duration):

383

"""Worker function that generates metrics."""

384

385

# Create metrics in worker process

386

work_counter = Counter('worker_tasks_completed_total', 'Tasks completed by worker', ['worker_id'])

387

work_duration = Histogram('worker_task_duration_seconds', 'Task duration', ['worker_id'])

388

389

for i in range(10):

390

start_time = time.time()

391

392

# Simulate work

393

time.sleep(duration)

394

395

# Record metrics

396

work_counter.labels(worker_id=str(worker_id)).inc()

397

work_duration.labels(worker_id=str(worker_id)).observe(time.time() - start_time)

398

399

print(f"Worker {worker_id} completed task {i+1}")

400

401

def main_multiprocess_example():

402

"""Example of multiprocess metrics collection."""

403

404

# Create main process registry with multiprocess collector

405

registry = CollectorRegistry()

406

MultiProcessCollector(registry)

407

408

# Create some metrics in main process

409

main_counter = Counter('main_process_operations_total', 'Main process operations')

410

main_counter.inc(5)

411

412

# Start worker processes

413

processes = []

414

for worker_id in range(3):

415

p = multiprocessing.Process(

416

target=worker_function,

417

args=(worker_id, 0.1)

418

)

419

p.start()

420

processes.append(p)

421

422

# Wait for workers to complete

423

for p in processes:

424

p.join()

425

426

# Collect all metrics from all processes

427

print("Multiprocess metrics:")

428

output = generate_latest(registry)

429

print(output.decode('utf-8'))

430

431

# Clean up

432

import shutil

433

shutil.rmtree('/tmp/prometheus_multiproc')

434

435

if __name__ == '__main__':

436

main_multiprocess_example()

437

```

438

439

### Sample and Data Types

440

441

Advanced data types for representing metrics samples, timestamps, and exemplars.

442

443

```python { .api }

444

class Sample(NamedTuple):

445

name: str

446

labels: Dict[str, str]

447

value: float

448

timestamp: Optional[Union[float, Timestamp]] = None

449

exemplar: Optional[Exemplar] = None

450

native_histogram: Optional[NativeHistogram] = None

451

452

class Exemplar(NamedTuple):

453

labels: Dict[str, str]

454

value: float

455

timestamp: Optional[Union[float, Timestamp]] = None

456

457

class Timestamp:

458

def __init__(self, sec: float, nsec: float) -> None:

459

"""

460

Create a timestamp with nanosecond precision.

461

462

Parameters:

463

- sec: Seconds since Unix epoch

464

- nsec: Nanoseconds component

465

"""

466

467

def __str__(self) -> str:

468

"""String representation of timestamp."""

469

470

def __float__(self) -> float:

471

"""Convert to float (seconds since epoch)."""

472

473

class BucketSpan(NamedTuple):

474

offset: int

475

length: int

476

477

class NativeHistogram(NamedTuple):

478

count_value: float

479

sum_value: float

480

schema: int

481

zero_threshold: float

482

zero_count: float

483

pos_spans: Optional[Sequence[BucketSpan]] = None

484

neg_spans: Optional[Sequence[BucketSpan]] = None

485

pos_deltas: Optional[Sequence[int]] = None

486

neg_deltas: Optional[Sequence[int]] = None

487

```

488

489

**Usage Example:**

490

491

```python

492

from prometheus_client import Metric, Sample, Exemplar, Timestamp, CollectorRegistry

493

import time

494

495

class CustomSampleCollector:

496

"""Collector that demonstrates advanced sample types."""

497

498

def collect(self):

499

# Create metric with custom samples

500

custom_metric = Metric('custom_samples', 'Custom sample demonstration', 'gauge')

501

502

# Add sample with timestamp

503

now = time.time()

504

timestamp = Timestamp(int(now), int((now % 1) * 1e9))

505

506

custom_metric.add_sample(

507

'custom_samples',

508

{'instance': 'server1', 'job': 'app'},

509

42.0,

510

timestamp=timestamp

511

)

512

513

# Add sample with exemplar (for tracing)

514

exemplar = Exemplar(

515

labels={'trace_id': 'abc123', 'span_id': 'def456'},

516

value=42.0,

517

timestamp=timestamp

518

)

519

520

custom_metric.add_sample(

521

'custom_samples',

522

{'instance': 'server2', 'job': 'app'},

523

38.5,

524

exemplar=exemplar

525

)

526

527

yield custom_metric

528

529

# Access sample properties

530

for sample in custom_metric.samples:

531

print(f"Sample: {sample.name}")

532

print(f"Labels: {sample.labels}")

533

print(f"Value: {sample.value}")

534

if sample.timestamp:

535

print(f"Timestamp: {sample.timestamp}")

536

if sample.exemplar:

537

print(f"Exemplar: {sample.exemplar}")

538

539

# Use the collector

540

registry = CollectorRegistry()

541

collector = CustomSampleCollector()

542

registry.register(collector)

543

544

from prometheus_client import generate_latest

545

output = generate_latest(registry)

546

print(output.decode('utf-8'))

547

```

548

549

### Validation and Configuration

550

551

Utilities for metric validation and library configuration.

552

553

```python { .api }

554

def get_legacy_validation() -> bool:

555

"""Get current legacy validation setting."""

556

557

def disable_legacy_validation() -> None:

558

"""Disable legacy metric name validation."""

559

560

def enable_legacy_validation() -> None:

561

"""Enable legacy metric name validation."""

562

563

# Utility constants

564

INF: float = float("inf")

565

MINUS_INF: float = float("-inf")

566

NaN: float = float("NaN")

567

568

def floatToGoString(d) -> str:

569

"""Convert float to Go-style string representation."""

570

```

571

572

**Usage Example:**

573

574

```python

575

from prometheus_client import Counter, disable_legacy_validation, enable_legacy_validation

576

from prometheus_client.utils import INF, floatToGoString

577

578

# Configuration

579

print(f"Legacy validation enabled: {get_legacy_validation()}")

580

581

# Allow more flexible metric names

582

disable_legacy_validation()

583

584

# Create metric with name that would fail legacy validation

585

flexible_counter = Counter('my-app:request_count', 'Requests with flexible naming')

586

flexible_counter.inc()

587

588

# Re-enable strict validation

589

enable_legacy_validation()

590

591

# Utility functions

592

print(f"Infinity as Go string: {floatToGoString(INF)}")

593

print(f"Large number as Go string: {floatToGoString(1.23e10)}")

594

595

# Constants usage

596

histogram_buckets = [0.1, 0.5, 1.0, 2.5, 5.0, 10.0, INF]

597

```

598

599

### Bridge Integrations

600

601

Integration components for connecting with other monitoring systems.

602

603

```python { .api }

604

# Graphite Bridge

605

class GraphiteBridge:

606

def __init__(

607

self,

608

address: Tuple[str, int],

609

registry: CollectorRegistry = REGISTRY,

610

_push_interval: float = 30,

611

tags: bool = False

612

) -> None:

613

"""

614

Create a bridge to Graphite.

615

616

Parameters:

617

- address: (host, port) tuple for Graphite server

618

- registry: Registry to read metrics from

619

- _push_interval: Interval between pushes in seconds

620

- tags: Whether to use Graphite tags format

621

"""

622

```

623

624

**Usage Example:**

625

626

```python

627

from prometheus_client import Counter, Gauge, CollectorRegistry

628

from prometheus_client.bridge.graphite import GraphiteBridge

629

import time

630

import threading

631

632

# Create metrics

633

registry = CollectorRegistry()

634

requests = Counter('app_requests_total', 'Total requests', ['endpoint'], registry=registry)

635

memory_usage = Gauge('app_memory_bytes', 'Memory usage', registry=registry)

636

637

# Generate some data

638

requests.labels('/api/users').inc(100)

639

requests.labels('/api/orders').inc(50)

640

memory_usage.set(1024 * 1024 * 512) # 512MB

641

642

# Bridge to Graphite (example with mock address)

643

bridge = GraphiteBridge(

644

address=('graphite.example.com', 2003),

645

registry=registry,

646

_push_interval=10, # Push every 10 seconds

647

tags=True # Use Graphite tags format

648

)

649

650

# The bridge will automatically push metrics to Graphite

651

# Metrics will be formatted as:

652

# app_requests_total;endpoint=/api/users 100 timestamp

653

# app_requests_total;endpoint=/api/orders 50 timestamp

654

# app_memory_bytes 536870912 timestamp

655

656

print("Bridge configured to push metrics to Graphite")

657

```

658

659

### OpenMetrics Support

660

661

Support for the OpenMetrics exposition format.

662

663

```python { .api }

664

# From prometheus_client.openmetrics.exposition

665

CONTENT_TYPE_LATEST: str = 'application/openmetrics-text; version=1.0.0; charset=utf-8'

666

667

def generate_latest(registry) -> bytes:

668

"""Generate OpenMetrics format output."""

669

670

def escape_metric_name(s: str) -> str:

671

"""Escape metric name for OpenMetrics format."""

672

673

def escape_label_name(s: str) -> str:

674

"""Escape label name for OpenMetrics format."""

675

```

676

677

**Usage Example:**

678

679

```python

680

from prometheus_client import Counter, Gauge, CollectorRegistry

681

from prometheus_client.openmetrics.exposition import generate_latest as openmetrics_generate

682

683

# Create metrics

684

registry = CollectorRegistry()

685

counter = Counter('test_counter', 'Test counter', registry=registry)

686

gauge = Gauge('test_gauge', 'Test gauge', ['label'], registry=registry)

687

688

counter.inc(42)

689

gauge.labels('value1').set(3.14)

690

691

# Generate OpenMetrics format

692

openmetrics_output = openmetrics_generate(registry)

693

print("OpenMetrics format:")

694

print(openmetrics_output.decode('utf-8'))

695

696

# Compare with Prometheus format

697

from prometheus_client import generate_latest as prometheus_generate

698

prometheus_output = prometheus_generate(registry)

699

print("\nPrometheus format:")

700

print(prometheus_output.decode('utf-8'))

701

```