or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-exchange.mdexecution-graph.mdhigh-availability.mdindex.mdjob-management.mdmessage-passing.mdmetrics.mdmini-cluster.mdrpc-framework.mdstate-management.mdtask-execution.md

metrics.mddocs/

0

# Metrics System

1

2

The Metrics System provides comprehensive infrastructure for collecting, registering, and reporting runtime metrics from Flink applications. This system enables monitoring of job performance, resource usage, throughput, and custom application metrics across the distributed cluster.

3

4

## Core Components

5

6

### MetricRegistry

7

8

Central registry for Flink metrics that handles metric registration, reporter management, and metric lifecycle.

9

10

```java { .api }

11

public class MetricRegistry implements MetricRegistryImpl {

12

public static MetricRegistry create(MetricRegistryConfiguration config);

13

14

public void register(Metric metric, String metricName, AbstractMetricGroup group);

15

public void unregister(Metric metric, String metricName, AbstractMetricGroup group);

16

17

public void startReporters(Configuration config);

18

public void stopReporters();

19

20

public char getDelimiter();

21

public int getNumberReporters();

22

23

public ScopeFormats getScopeFormats();

24

25

@Override

26

public void close();

27

}

28

```

29

30

### MetricRegistryConfiguration

31

32

Configuration class for metrics registry setup and behavior customization.

33

34

```java { .api }

35

public class MetricRegistryConfiguration {

36

public static MetricRegistryConfiguration fromConfiguration(Configuration config);

37

public static MetricRegistryConfiguration defaultMetricRegistryConfiguration();

38

39

public long getQueryServiceUpdateInterval();

40

public int getQueryServicePort();

41

public String getQueryServiceBindAddress();

42

43

public List<ReporterSetup> getReporterConfigurations();

44

public ScopeFormats getScopeFormats();

45

46

public char getDelimiter();

47

public List<String> getExcludedMetrics();

48

}

49

```

50

51

## Metric Types

52

53

### Counter

54

55

Metric that tracks a count that can only increase.

56

57

```java { .api }

58

public interface Counter extends Metric {

59

void inc();

60

void inc(long n);

61

long getCount();

62

63

// Static factory method

64

static Counter of(LongCounter longCounter);

65

}

66

```

67

68

### Gauge

69

70

Metric that provides an instantaneous measurement of a value.

71

72

```java { .api }

73

public interface Gauge<T> extends Metric {

74

T getValue();

75

76

// Static factory methods

77

static <T> Gauge<T> of(Supplier<T> supplier);

78

static Gauge<Double> of(DoubleSupplier supplier);

79

static Gauge<Long> of(LongSupplier supplier);

80

}

81

```

82

83

### Meter

84

85

Metric that tracks the rate of events over time.

86

87

```java { .api }

88

public interface Meter extends Metric {

89

void markEvent();

90

void markEvent(long n);

91

92

double getRate();

93

long getCount();

94

}

95

```

96

97

### Histogram

98

99

Metric that tracks the distribution of values over time.

100

101

```java { .api }

102

public interface Histogram extends Metric {

103

void update(long value);

104

105

long getCount();

106

HistogramStatistics getStatistics();

107

}

108

109

public interface HistogramStatistics {

110

double getQuantile(double quantile);

111

long[] getValues();

112

int size();

113

double getMean();

114

double getStdDev();

115

long getMax();

116

long getMin();

117

}

118

```

119

120

## Metric Groups

121

122

### MetricGroup

123

124

Base interface for organizing metrics into hierarchical groups with scoped naming.

125

126

```java { .api }

127

public interface MetricGroup {

128

Counter counter(String name);

129

Counter counter(String name, Counter counter);

130

131

<T, C extends Counter> C counter(String name, C counter);

132

133

<T> Gauge<T> gauge(String name, Gauge<T> gauge);

134

135

Histogram histogram(String name, Histogram histogram);

136

137

Meter meter(String name, Meter meter);

138

139

MetricGroup addGroup(String name);

140

MetricGroup addGroup(String key, String value);

141

142

String[] getScopeComponents();

143

Map<String, String> getAllVariables();

144

String getMetricIdentifier(String metricName);

145

String getMetricIdentifier(String metricName, CharacterFilter filter);

146

}

147

```

148

149

### AbstractMetricGroup

150

151

Abstract base implementation providing common metric group functionality.

152

153

```java { .api }

154

public abstract class AbstractMetricGroup implements MetricGroup {

155

protected AbstractMetricGroup(MetricRegistry registry, String[] scope, AbstractMetricGroup parent);

156

157

protected void addMetric(String name, Metric metric);

158

protected void removeMetric(String name);

159

160

public final MetricGroup addGroup(String name);

161

public final MetricGroup addGroup(String key, String value);

162

163

protected abstract String getGroupName(String name);

164

165

public final String[] getScopeComponents();

166

public final Map<String, String> getAllVariables();

167

public final String getMetricIdentifier(String metricName);

168

public final String getMetricIdentifier(String metricName, CharacterFilter filter);

169

170

@Override

171

public void close();

172

}

173

```

174

175

### ComponentMetricGroup

176

177

Specialized metric group for cluster components (JobManager, TaskManager, etc.).

178

179

```java { .api }

180

public class ComponentMetricGroup extends AbstractMetricGroup {

181

public ComponentMetricGroup(MetricRegistry registry, String componentName);

182

183

public <J> JobManagerMetricGroup addJobManager(Configuration config, String hostname, String jobManagerId);

184

public TaskManagerMetricGroup addTaskManager(Configuration config, String hostname, String taskManagerId);

185

186

protected String getGroupName(String name);

187

}

188

```

189

190

### JobManagerMetricGroup

191

192

Metric group for JobManager-specific metrics.

193

194

```java { .api }

195

public class JobManagerMetricGroup extends ComponentMetricGroup {

196

public JobManagerMetricGroup(MetricRegistry registry, String hostname, String jobManagerId);

197

198

public JobMetricGroup addJob(JobGraph jobGraph);

199

public JobMetricGroup addJob(JobID jobId, String jobName);

200

201

public String hostname();

202

public String jobManagerId();

203

204

protected String getGroupName(String name);

205

}

206

```

207

208

### TaskManagerMetricGroup

209

210

Metric group for TaskManager-specific metrics.

211

212

```java { .api }

213

public class TaskManagerMetricGroup extends ComponentMetricGroup {

214

public TaskManagerMetricGroup(MetricRegistry registry, String hostname, String taskManagerId);

215

216

public TaskMetricGroup addTaskForJob(JobID jobId, String jobName, JobVertexID jobVertexId,

217

ExecutionAttemptID executionAttemptId, String taskName,

218

int subtaskIndex, int attemptNumber);

219

220

public String hostname();

221

public String taskManagerId();

222

223

protected String getGroupName(String name);

224

}

225

```

226

227

## Reporters

228

229

### MetricReporter

230

231

Base interface for metric reporters that output metrics to external monitoring systems.

232

233

```java { .api }

234

public interface MetricReporter extends AutoCloseable {

235

void open(MetricConfig config);

236

237

void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);

238

void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);

239

240

@Override

241

void close();

242

}

243

```

244

245

### ScheduledDropwizardReporter

246

247

Reporter that integrates with Dropwizard metrics and supports scheduled reporting.

248

249

```java { .api }

250

public abstract class ScheduledDropwizardReporter implements MetricReporter, Scheduled {

251

protected final com.codahale.metrics.MetricRegistry registry = new com.codahale.metrics.MetricRegistry();

252

253

@Override

254

public void open(MetricConfig config);

255

256

@Override

257

public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);

258

259

@Override

260

public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);

261

262

public abstract void report();

263

264

protected com.codahale.metrics.MetricRegistry getRegistry();

265

266

@Override

267

public void close();

268

}

269

```

270

271

### Scheduled

272

273

Interface for reporters that support scheduled/periodic reporting.

274

275

```java { .api }

276

public interface Scheduled {

277

void report();

278

}

279

```

280

281

## Configuration

282

283

### MetricConfig

284

285

Configuration container for metric reporters.

286

287

```java { .api }

288

public class MetricConfig {

289

public static final String REPORTER_CLASS = "class";

290

public static final String REPORTER_INTERVAL = "interval";

291

292

public String getString(String key, String defaultValue);

293

public int getInteger(String key, int defaultValue);

294

public long getLong(String key, long defaultValue);

295

public boolean getBoolean(String key, boolean defaultValue);

296

297

public Properties getProperties();

298

299

public void setString(String key, String value);

300

public void setInteger(String key, int value);

301

public void setLong(String key, long value);

302

public void setBoolean(String key, boolean value);

303

}

304

```

305

306

### ReporterSetup

307

308

Configuration setup for individual metric reporters.

309

310

```java { .api }

311

public class ReporterSetup {

312

public ReporterSetup(String name, MetricConfig configuration);

313

314

public String getName();

315

public MetricConfig getConfiguration();

316

317

public Optional<String> getClassName();

318

public Optional<String> getFactoryClassName();

319

public Optional<Long> getIntervalSettings();

320

}

321

```

322

323

## Scope and Formatting

324

325

### ScopeFormats

326

327

Defines scope format patterns for different component types.

328

329

```java { .api }

330

public class ScopeFormats {

331

public static final ScopeFormats fromConfig(Configuration config);

332

333

public String[] getJobManagerFormat();

334

public String[] getTaskManagerFormat();

335

public String[] getJobFormat();

336

public String[] getTaskFormat();

337

public String[] getOperatorFormat();

338

339

public String getJobManagerScope(Configuration config, String hostname, String jmId);

340

public String getTaskManagerScope(Configuration config, String hostname, String tmId);

341

342

// Format variables

343

public static final String SCOPE_HOST = "<host>";

344

public static final String SCOPE_TASKMANAGER_ID = "<tm_id>";

345

public static final String SCOPE_JOB_ID = "<job_id>";

346

public static final String SCOPE_JOB_NAME = "<job_name>";

347

public static final String SCOPE_TASK_VERTEX_ID = "<task_id>";

348

public static final String SCOPE_TASK_NAME = "<task_name>";

349

public static final String SCOPE_TASK_SUBTASK_INDEX = "<subtask_index>";

350

public static final String SCOPE_TASK_ATTEMPT_ID = "<task_attempt_id>";

351

public static final String SCOPE_TASK_ATTEMPT_NUM = "<task_attempt_num>";

352

public static final String SCOPE_OPERATOR_ID = "<operator_id>";

353

public static final String SCOPE_OPERATOR_NAME = "<operator_name>";

354

}

355

```

356

357

### CharacterFilter

358

359

Interface for filtering characters in metric names and identifiers.

360

361

```java { .api }

362

public interface CharacterFilter {

363

String filterCharacters(String input);

364

365

CharacterFilter NO_OP_FILTER = input -> input;

366

}

367

```

368

369

## Usage Examples

370

371

### Basic Metrics Registration

372

373

```java

374

import org.apache.flink.metrics.*;

375

import org.apache.flink.runtime.metrics.MetricRegistry;

376

import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;

377

378

// Create metrics registry

379

MetricRegistryConfiguration config = MetricRegistryConfiguration.defaultMetricRegistryConfiguration();

380

MetricRegistry metricRegistry = MetricRegistry.create(config);

381

382

// Create metric group

383

ComponentMetricGroup rootGroup = new ComponentMetricGroup(metricRegistry, "MyApplication");

384

MetricGroup jobGroup = rootGroup.addGroup("job", "data-processing");

385

386

// Register different metric types

387

Counter recordsProcessed = jobGroup.counter("records_processed");

388

Gauge<Long> memoryUsage = jobGroup.gauge("memory_usage", () -> Runtime.getRuntime().totalMemory());

389

Meter throughput = jobGroup.meter("throughput", new MeterView(recordsProcessed, 60));

390

Histogram latency = jobGroup.histogram("latency", new DescriptiveStatisticsHistogram(1000));

391

392

// Use metrics in application

393

for (int i = 0; i < 1000; i++) {

394

// Process record

395

processRecord();

396

397

// Update metrics

398

recordsProcessed.inc();

399

latency.update(System.currentTimeMillis() - startTime);

400

throughput.markEvent();

401

}

402

403

// Clean up

404

metricRegistry.close();

405

```

406

407

### Custom Metric Reporter

408

409

```java

410

import org.apache.flink.metrics.reporter.MetricReporter;

411

import org.apache.flink.metrics.*;

412

413

public class CustomMetricReporter implements MetricReporter {

414

private final Map<String, Metric> metrics = new ConcurrentHashMap<>();

415

416

@Override

417

public void open(MetricConfig config) {

418

String endpoint = config.getString("endpoint", "localhost:8080");

419

String interval = config.getString("interval", "10");

420

421

System.out.println("Opening custom reporter with endpoint: " + endpoint);

422

423

// Start reporting thread

424

ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

425

scheduler.scheduleAtFixedRate(this::report, 0, Long.parseLong(interval), TimeUnit.SECONDS);

426

}

427

428

@Override

429

public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {

430

String fullName = group.getMetricIdentifier(metricName);

431

metrics.put(fullName, metric);

432

System.out.println("Added metric: " + fullName);

433

}

434

435

@Override

436

public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {

437

String fullName = group.getMetricIdentifier(metricName);

438

metrics.remove(fullName);

439

System.out.println("Removed metric: " + fullName);

440

}

441

442

private void report() {

443

System.out.println("=== Custom Metric Report ===");

444

for (Map.Entry<String, Metric> entry : metrics.entrySet()) {

445

String name = entry.getKey();

446

Metric metric = entry.getValue();

447

448

if (metric instanceof Counter) {

449

Counter counter = (Counter) metric;

450

System.out.println(name + " (Counter): " + counter.getCount());

451

} else if (metric instanceof Gauge) {

452

Gauge<?> gauge = (Gauge<?>) metric;

453

System.out.println(name + " (Gauge): " + gauge.getValue());

454

} else if (metric instanceof Meter) {

455

Meter meter = (Meter) metric;

456

System.out.println(name + " (Meter): " + meter.getRate() + " events/sec");

457

} else if (metric instanceof Histogram) {

458

Histogram histogram = (Histogram) metric;

459

HistogramStatistics stats = histogram.getStatistics();

460

System.out.println(name + " (Histogram): count=" + histogram.getCount() +

461

", mean=" + stats.getMean() + ", max=" + stats.getMax());

462

}

463

}

464

}

465

466

@Override

467

public void close() {

468

System.out.println("Closing custom reporter");

469

}

470

}

471

```

472

473

### Metrics Configuration

474

475

```java

476

import org.apache.flink.configuration.Configuration;

477

import org.apache.flink.configuration.MetricOptions;

478

479

// Configure metrics system

480

Configuration config = new Configuration();

481

482

// Enable metrics

483

config.setString(MetricOptions.REPORTERS_LIST, "prometheus,slf4j");

484

485

// Configure Prometheus reporter

486

config.setString("metrics.reporter.prometheus.class", "org.apache.flink.metrics.prometheus.PrometheusReporter");

487

config.setInteger("metrics.reporter.prometheus.port", 9249);

488

489

// Configure SLF4J reporter

490

config.setString("metrics.reporter.slf4j.class", "org.apache.flink.metrics.slf4j.Slf4jReporter");

491

config.setString("metrics.reporter.slf4j.interval", "10 SECONDS");

492

493

// Configure scope formats

494

config.setString(MetricOptions.SCOPE_NAMING_JM, "<host>.jobmanager.<jm_id>");

495

config.setString(MetricOptions.SCOPE_NAMING_TM, "<host>.taskmanager.<tm_id>");

496

config.setString(MetricOptions.SCOPE_NAMING_JOB, "<host>.jobmanager.<jm_id>.<job_name>");

497

config.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>");

498

499

// Set metric exclusions

500

config.setString("metrics.reporter.prometheus.excludes", "*.taskmanager.Status.JVM.CPU.*;*.taskmanager.Status.JVM.Memory.Heap.Max");

501

502

// Create registry with configuration

503

MetricRegistryConfiguration registryConfig = MetricRegistryConfiguration.fromConfiguration(config);

504

MetricRegistry metricRegistry = MetricRegistry.create(registryConfig);

505

```

506

507

### Task-Level Metrics

508

509

```java

510

import org.apache.flink.streaming.api.functions.ProcessFunction;

511

import org.apache.flink.metrics.Counter;

512

import org.apache.flink.metrics.Histogram;

513

import org.apache.flink.metrics.Gauge;

514

515

public class MetricsAwareProcessFunction extends ProcessFunction<String, String> {

516

private transient Counter recordsProcessed;

517

private transient Counter recordsFiltered;

518

private transient Histogram processingLatency;

519

private transient Gauge<Long> currentBacklog;

520

521

private volatile long backlogSize = 0;

522

523

@Override

524

public void open(Configuration parameters) throws Exception {

525

super.open(parameters);

526

527

// Get metric group from runtime context

528

MetricGroup metricGroup = getRuntimeContext().getMetricGroup();

529

530

// Register metrics

531

recordsProcessed = metricGroup.counter("records_processed");

532

recordsFiltered = metricGroup.counter("records_filtered");

533

processingLatency = metricGroup.histogram("processing_latency_ms",

534

new DescriptiveStatisticsHistogram(10000));

535

currentBacklog = metricGroup.gauge("current_backlog", () -> backlogSize);

536

537

// Register custom metrics with specific names

538

metricGroup.gauge("memory_usage_mb", () ->

539

(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024);

540

}

541

542

@Override

543

public void processElement(String value, Context ctx, Collector<String> out) throws Exception {

544

long startTime = System.currentTimeMillis();

545

546

try {

547

// Process the record

548

if (shouldFilter(value)) {

549

recordsFiltered.inc();

550

return;

551

}

552

553

String processed = processRecord(value);

554

out.collect(processed);

555

556

recordsProcessed.inc();

557

558

} finally {

559

// Track processing latency

560

long latency = System.currentTimeMillis() - startTime;

561

processingLatency.update(latency);

562

563

// Update backlog estimate

564

backlogSize = estimateBacklogSize();

565

}

566

}

567

568

private boolean shouldFilter(String value) {

569

// Filtering logic

570

return value.isEmpty();

571

}

572

573

private String processRecord(String value) {

574

// Processing logic

575

return value.toUpperCase();

576

}

577

578

private long estimateBacklogSize() {

579

// Estimate current backlog size

580

return 0; // Simplified for example

581

}

582

}

583

```

584

585

### Operator Metrics

586

587

```java

588

import org.apache.flink.streaming.api.operators.AbstractStreamOperator;

589

import org.apache.flink.streaming.api.operators.OneInputStreamOperator;

590

import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

591

592

public class MetricsAwareOperator extends AbstractStreamOperator<String>

593

implements OneInputStreamOperator<String, String> {

594

595

private transient Counter inputRecords;

596

private transient Counter outputRecords;

597

private transient Meter inputRate;

598

private transient Histogram processingTime;

599

600

@Override

601

public void open() throws Exception {

602

super.open();

603

604

// Get operator metric group

605

MetricGroup operatorGroup = getMetricGroup();

606

607

// Register operator metrics

608

inputRecords = operatorGroup.counter("input_records");

609

outputRecords = operatorGroup.counter("output_records");

610

inputRate = operatorGroup.meter("input_rate", new MeterView(inputRecords));

611

processingTime = operatorGroup.histogram("processing_time_ns",

612

new DescriptiveStatisticsHistogram(1000));

613

614

// Add operator-specific metrics

615

operatorGroup.gauge("operator_busy_time_per_sec", () -> calculateBusyTime());

616

}

617

618

@Override

619

public void processElement(StreamRecord<String> element) throws Exception {

620

long startTime = System.nanoTime();

621

622

inputRecords.inc();

623

inputRate.markEvent();

624

625

try {

626

// Process element

627

String result = element.getValue().toLowerCase();

628

629

// Emit result

630

output.collect(new StreamRecord<>(result, element.getTimestamp()));

631

outputRecords.inc();

632

633

} finally {

634

// Track processing time

635

long duration = System.nanoTime() - startTime;

636

processingTime.update(duration);

637

}

638

}

639

640

private double calculateBusyTime() {

641

// Calculate operator busy time percentage

642

return 0.0; // Simplified for example

643

}

644

}

645

```

646

647

## Common Patterns

648

649

### Metric Lifecycle Management

650

651

```java

652

public class MetricsManager implements AutoCloseable {

653

private final MetricRegistry registry;

654

private final Map<String, MetricGroup> groups = new ConcurrentHashMap<>();

655

656

public MetricsManager(Configuration config) {

657

MetricRegistryConfiguration registryConfig = MetricRegistryConfiguration.fromConfiguration(config);

658

this.registry = MetricRegistry.create(registryConfig);

659

registry.startReporters(config);

660

}

661

662

public MetricGroup createGroup(String... path) {

663

String groupKey = String.join(".", path);

664

return groups.computeIfAbsent(groupKey, k -> {

665

ComponentMetricGroup root = new ComponentMetricGroup(registry, path[0]);

666

MetricGroup current = root;

667

for (int i = 1; i < path.length; i++) {

668

current = current.addGroup(path[i]);

669

}

670

return current;

671

});

672

}

673

674

public void removeGroup(String... path) {

675

String groupKey = String.join(".", path);

676

MetricGroup group = groups.remove(groupKey);

677

if (group != null) {

678

group.close();

679

}

680

}

681

682

@Override

683

public void close() {

684

groups.values().forEach(MetricGroup::close);

685

groups.clear();

686

687

registry.stopReporters();

688

registry.close();

689

}

690

}

691

```

692

693

### Conditional Metric Registration

694

695

```java

696

public class ConditionalMetrics {

697

private final MetricGroup metricGroup;

698

private final boolean metricsEnabled;

699

700

public ConditionalMetrics(MetricGroup metricGroup, Configuration config) {

701

this.metricGroup = metricGroup;

702

this.metricsEnabled = config.getBoolean("metrics.enabled", true);

703

}

704

705

public Counter createCounter(String name) {

706

if (metricsEnabled) {

707

return metricGroup.counter(name);

708

} else {

709

return new NoOpCounter();

710

}

711

}

712

713

public <T> Gauge<T> createGauge(String name, Supplier<T> valueSupplier) {

714

if (metricsEnabled) {

715

return metricGroup.gauge(name, Gauge.of(valueSupplier));

716

} else {

717

return new NoOpGauge<>();

718

}

719

}

720

721

// No-op implementations for when metrics are disabled

722

private static class NoOpCounter implements Counter {

723

@Override public void inc() {}

724

@Override public void inc(long n) {}

725

@Override public long getCount() { return 0; }

726

}

727

728

private static class NoOpGauge<T> implements Gauge<T> {

729

@Override public T getValue() { return null; }

730

}

731

}

732

```