or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-metrics.mdimplementations.mdindex.mdmetric-groups.mdreporters.mdspecialized-groups.mdtracing.md

specialized-groups.mddocs/

0

# Specialized Metric Groups

1

2

Component-specific metric groups tailored for different parts of the Flink runtime. These specialized interfaces provide context-aware metric organization and expose relevant sub-groups for specific Flink components like operators, sources, sinks, and coordinators.

3

4

## Capabilities

5

6

### OperatorMetricGroup Interface

7

8

Special metric group representing Flink operators, providing access to I/O-specific metrics through a dedicated sub-group.

9

10

```java { .api }

11

/**

12

* Special MetricGroup representing an Operator.

13

* You should only update the metrics in the main operator thread.

14

*/

15

@PublicEvolving

16

public interface OperatorMetricGroup extends MetricGroup {

17

/**

18

* Returns the I/O metric group for this operator.

19

* @return OperatorIOMetricGroup for I/O metrics

20

*/

21

OperatorIOMetricGroup getIOMetricGroup();

22

}

23

```

24

25

**Usage Examples:**

26

27

```java

28

// In an operator implementation

29

public class MyMapOperator extends AbstractStreamOperator<String> {

30

private Counter processedRecords;

31

private Counter droppedRecords;

32

private Gauge<Integer> bufferSize;

33

34

@Override

35

public void open() throws Exception {

36

super.open();

37

38

// Get operator-specific metric group

39

OperatorMetricGroup operatorMetrics = getRuntimeContext().getMetricGroup();

40

41

// Register operator-level metrics

42

processedRecords = operatorMetrics.counter("records-processed");

43

droppedRecords = operatorMetrics.counter("records-dropped");

44

bufferSize = operatorMetrics.gauge("buffer-size", () -> internalBuffer.size());

45

46

// Access I/O metrics sub-group

47

OperatorIOMetricGroup ioMetrics = operatorMetrics.getIOMetricGroup();

48

// I/O metrics are typically managed by the runtime, but can be accessed here

49

}

50

51

public void processElement(StreamRecord<String> element) {

52

String value = element.getValue();

53

54

if (shouldProcess(value)) {

55

// Process element...

56

processedRecords.inc();

57

} else {

58

droppedRecords.inc();

59

}

60

}

61

}

62

```

63

64

### OperatorIOMetricGroup Interface

65

66

Metric group specifically for operator I/O metrics, tracking input and output throughput, backpressure, and buffer utilization.

67

68

```java { .api }

69

/**

70

* Metric group that contains shareable pre-defined IO-related metrics for operators.

71

* You should only update the metrics in the main operator thread.

72

*/

73

public interface OperatorIOMetricGroup extends MetricGroup {

74

/**

75

* The total number of input records since the operator started.

76

* Will also populate numRecordsInPerSecond meter.

77

*/

78

Counter getNumRecordsInCounter();

79

80

/**

81

* The total number of output records since the operator started.

82

* Will also populate numRecordsOutPerSecond meter.

83

*/

84

Counter getNumRecordsOutCounter();

85

86

/**

87

* The total number of input bytes since the task started.

88

* Will also populate numBytesInPerSecond meter.

89

*/

90

Counter getNumBytesInCounter();

91

92

/**

93

* The total number of output bytes since the task started.

94

* Will also populate numBytesOutPerSecond meter.

95

*/

96

Counter getNumBytesOutCounter();

97

}

98

```

99

100

**Usage Examples:**

101

102

```java

103

// Accessing I/O metrics (typically read-only from operator code)

104

public class StreamingOperator extends AbstractStreamOperator<Output> {

105

106

@Override

107

public void open() throws Exception {

108

super.open();

109

110

OperatorMetricGroup operatorGroup = getRuntimeContext().getMetricGroup();

111

OperatorIOMetricGroup ioGroup = operatorGroup.getIOMetricGroup();

112

113

// Access pre-defined I/O counters provided by Flink runtime

114

Counter recordsIn = ioGroup.getNumRecordsInCounter();

115

Counter recordsOut = ioGroup.getNumRecordsOutCounter();

116

Counter bytesIn = ioGroup.getNumBytesInCounter();

117

Counter bytesOut = ioGroup.getNumBytesOutCounter();

118

119

// These counters are automatically maintained by Flink runtime

120

// and also populate corresponding rate meters

121

122

// You can still add custom I/O-related metrics

123

ioGroup.gauge("custom-io-metric", () -> getCustomIOValue());

124

}

125

}

126

```

127

128

### SourceReaderMetricGroup Interface

129

130

Specialized metric group for source readers in the new source API, providing context for source-specific metrics.

131

132

```java { .api }

133

/**

134

* Pre-defined metrics for SourceReader.

135

* You should only update the metrics in the main operator thread.

136

*/

137

public interface SourceReaderMetricGroup extends OperatorMetricGroup {

138

/** The total number of record that failed to consume, process, or emit. */

139

Counter getNumRecordsInErrorsCounter();

140

141

/**

142

* Sets an optional gauge for the number of bytes that have not been fetched by the source.

143

* e.g. the remaining bytes in a file after the file descriptor reading position.

144

*

145

* Note that not every source can report this metric in an plausible and efficient way.

146

*/

147

void setPendingBytesGauge(Gauge<Long> pendingBytesGauge);

148

149

/**

150

* Sets an optional gauge for the number of records that have not been fetched by the source.

151

* e.g. the available records after the consumer offset in a Kafka partition.

152

*

153

* Note that not every source can report this metric in an plausible and efficient way.

154

*/

155

void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge);

156

}

157

```

158

159

**Usage Examples:**

160

161

```java

162

// In a source reader implementation

163

public class KafkaSourceReader implements SourceReader<Record, KafkaPartitionSplit> {

164

private SourceReaderMetricGroup readerMetrics;

165

private Counter recordsRead;

166

private Counter bytesRead;

167

private Gauge<Integer> pendingSplits;

168

private Histogram recordSize;

169

170

public void initialize(SourceReaderContext context) {

171

this.readerMetrics = context.getMetricGroup();

172

173

// Access pre-defined error counter

174

Counter errorCounter = readerMetrics.getNumRecordsInErrorsCounter();

175

176

// Set optional pending gauges if source can provide these metrics

177

if (canTrackPendingBytes()) {

178

readerMetrics.setPendingBytesGauge(() -> calculatePendingBytes());

179

}

180

if (canTrackPendingRecords()) {

181

readerMetrics.setPendingRecordsGauge(() -> calculatePendingRecords());

182

}

183

184

// Register additional custom source reader metrics

185

recordsRead = readerMetrics.counter("records-read");

186

bytesRead = readerMetrics.counter("bytes-read");

187

pendingSplits = readerMetrics.gauge("pending-splits", () -> splitQueue.size());

188

recordSize = readerMetrics.histogram("record-size", new MyHistogram());

189

}

190

191

public InputStatus pollNext(ReaderOutput<Record> output) {

192

Record record = pollRecord();

193

if (record != null) {

194

output.collect(record);

195

recordsRead.inc();

196

bytesRead.inc(record.sizeInBytes());

197

recordSize.update(record.sizeInBytes());

198

return InputStatus.MORE_AVAILABLE;

199

}

200

return InputStatus.NOTHING_AVAILABLE;

201

}

202

}

203

```

204

205

### SinkWriterMetricGroup Interface

206

207

Metric group for sink writers, enabling tracking of writing performance and success rates.

208

209

```java { .api }

210

/**

211

* Pre-defined metrics for sinks.

212

* You should only update the metrics in the main operator thread.

213

*/

214

public interface SinkWriterMetricGroup extends OperatorMetricGroup {

215

/** The total number of records failed to send. */

216

Counter getNumRecordsOutErrorsCounter();

217

218

/**

219

* The total number of records failed to send.

220

* This metric has the same value as numRecordsOutError.

221

*/

222

Counter getNumRecordsSendErrorsCounter();

223

224

/**

225

* The total number of records have been sent to the downstream system.

226

* This metric has the same value as numRecordsOut of the operator.

227

* Note: this counter will count all records the SinkWriter sent.

228

*/

229

Counter getNumRecordsSendCounter();

230

231

/**

232

* The total number of output send bytes since the task started.

233

* This metric has the same value as numBytesOut of the operator.

234

*/

235

Counter getNumBytesSendCounter();

236

237

/**

238

* Sets an optional gauge for the time it takes to send the last record.

239

* This metric is an instantaneous value recorded for the last processed record.

240

*/

241

void setCurrentSendTimeGauge(Gauge<Long> currentSendTimeGauge);

242

}

243

```

244

245

**Usage Examples:**

246

247

```java

248

// In a sink writer implementation

249

public class DatabaseSinkWriter implements SinkWriter<Record> {

250

private SinkWriterMetricGroup writerMetrics;

251

private Counter recordsWritten;

252

private Counter writesSucceeded;

253

private Counter writesFailed;

254

private Histogram writeLatency;

255

private Meter writeRate;

256

257

public void initialize(SinkWriter.Context context) {

258

this.writerMetrics = context.getMetricGroup();

259

260

// Access pre-defined sink writer counters

261

Counter outErrors = writerMetrics.getNumRecordsOutErrorsCounter();

262

Counter sendErrors = writerMetrics.getNumRecordsSendErrorsCounter();

263

Counter recordsSent = writerMetrics.getNumRecordsSendCounter();

264

Counter bytesSent = writerMetrics.getNumBytesSendCounter();

265

266

// Set optional send time gauge if sink can provide this metric

267

if (canTrackSendTime()) {

268

writerMetrics.setCurrentSendTimeGauge(() -> lastSendTimeMillis);

269

}

270

271

// Register additional custom sink writer metrics

272

recordsWritten = writerMetrics.counter("records-written");

273

writeLatency = writerMetrics.histogram("write-latency", new LatencyHistogram());

274

writeRate = writerMetrics.meter("write-rate", new MeterView(30));

275

276

// Connection-specific metrics

277

MetricGroup connectionGroup = writerMetrics.addGroup("connection");

278

connectionGroup.gauge("pool-size", () -> connectionPool.getActiveConnections());

279

connectionGroup.gauge("pool-utilization", () -> connectionPool.getUtilization());

280

}

281

282

@Override

283

public void write(Record record, Context context) {

284

long startTime = System.currentTimeMillis();

285

286

try {

287

database.write(record);

288

recordsWritten.inc();

289

writesSucceeded.inc();

290

writeRate.markEvent();

291

292

long latency = System.currentTimeMillis() - startTime;

293

writeLatency.update(latency);

294

295

} catch (Exception e) {

296

writesFailed.inc();

297

throw new RuntimeException("Failed to write record", e);

298

}

299

}

300

}

301

```

302

303

### SinkCommitterMetricGroup Interface

304

305

Metric group for sink committers, tracking commit operations and success rates in two-phase commit scenarios.

306

307

```java { .api }

308

/**

309

* Pre-defined metrics for sinks.

310

* You should only update the metrics in the main operator thread.

311

*/

312

public interface SinkCommitterMetricGroup extends OperatorMetricGroup {

313

/** The total number of committables arrived. */

314

Counter getNumCommittablesTotalCounter();

315

316

/** The total number of committable failures. */

317

Counter getNumCommittablesFailureCounter();

318

319

/** The total number of committable retry. */

320

Counter getNumCommittablesRetryCounter();

321

322

/** The total number of successful committables. */

323

Counter getNumCommittablesSuccessCounter();

324

325

/** The total number of already committed committables. */

326

Counter getNumCommittablesAlreadyCommittedCounter();

327

328

/** The pending committables. */

329

void setCurrentPendingCommittablesGauge(Gauge<Integer> currentPendingCommittablesGauge);

330

}

331

```

332

333

**Usage Examples:**

334

335

```java

336

// In a sink committer implementation

337

public class TransactionalSinkCommitter implements SinkCommitter<CommitInfo> {

338

private SinkCommitterMetricGroup committerMetrics;

339

private Counter commitsAttempted;

340

private Counter commitsSucceeded;

341

private Counter commitsFailed;

342

private Histogram commitLatency;

343

private Gauge<Integer> pendingCommits;

344

345

public void initialize(SinkCommitter.Context context) {

346

this.committerMetrics = context.getMetricGroup();

347

348

// Access pre-defined committer counters

349

Counter totalCommittables = committerMetrics.getNumCommittablesTotalCounter();

350

Counter failureCommittables = committerMetrics.getNumCommittablesFailureCounter();

351

Counter retryCommittables = committerMetrics.getNumCommittablesRetryCounter();

352

Counter successCommittables = committerMetrics.getNumCommittablesSuccessCounter();

353

Counter alreadyCommitted = committerMetrics.getNumCommittablesAlreadyCommittedCounter();

354

355

// Set optional pending committables gauge if committer can provide this metric

356

if (canTrackPendingCommittables()) {

357

committerMetrics.setCurrentPendingCommittablesGauge(() -> pendingCommittables.size());

358

}

359

360

// Register additional custom committer metrics

361

commitLatency = committerMetrics.histogram("commit-latency", new LatencyHistogram());

362

363

// Transaction-specific metrics

364

MetricGroup txnGroup = committerMetrics.addGroup("transactions");

365

txnGroup.counter("transactions-started");

366

txnGroup.counter("transactions-committed");

367

txnGroup.counter("transactions-aborted");

368

}

369

370

@Override

371

public List<CommitInfo> commit(List<CommitInfo> commitInfos) {

372

List<CommitInfo> failedCommits = new ArrayList<>();

373

374

for (CommitInfo commitInfo : commitInfos) {

375

long startTime = System.currentTimeMillis();

376

commitsAttempted.inc();

377

378

try {

379

database.commit(commitInfo.getTransactionId());

380

commitsSucceeded.inc();

381

382

long latency = System.currentTimeMillis() - startTime;

383

commitLatency.update(latency);

384

385

} catch (Exception e) {

386

commitsFailed.inc();

387

failedCommits.add(commitInfo);

388

}

389

}

390

391

return failedCommits; // Return failed commits for retry

392

}

393

}

394

```

395

396

### CacheMetricGroup Interface

397

398

Metric group for cache operations, enabling tracking of cache performance, hit rates, and memory usage. This interface provides pre-defined methods for registering cache-related metrics.

399

400

```java { .api }

401

/**

402

* Pre-defined metrics for cache.

403

* Please note that these methods should only be invoked once.

404

* Registering a metric with same name for multiple times would lead to an undefined behavior.

405

*/

406

public interface CacheMetricGroup extends MetricGroup {

407

/** The number of cache hits. */

408

void hitCounter(Counter hitCounter);

409

410

/** The number of cache misses. */

411

void missCounter(Counter missCounter);

412

413

/** The number of times to load data into cache from external system. */

414

void loadCounter(Counter loadCounter);

415

416

/** The number of load failures. */

417

void numLoadFailuresCounter(Counter numLoadFailuresCounter);

418

419

/** The time spent for the latest load operation. */

420

void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

421

422

/** The number of records in cache. */

423

void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

424

425

/** The number of bytes used by cache. */

426

void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);

427

}

428

```

429

430

**Usage Examples:**

431

432

```java

433

// In a caching operator or function

434

public class CachingFunction extends RichMapFunction<Input, Output> {

435

private CacheMetricGroup cacheMetrics;

436

private Counter hits;

437

private Counter misses;

438

private Counter loads;

439

private Counter loadFailures;

440

private Gauge<Long> loadTime;

441

private Gauge<Long> cachedRecords;

442

private Gauge<Long> cachedBytes;

443

444

@Override

445

public void open(Configuration config) throws Exception {

446

super.open(config);

447

448

// Get cache metric group

449

this.cacheMetrics = getRuntimeContext().getMetricGroup().addGroup("cache");

450

451

// Register cache metrics - each method should only be called once

452

this.hits = new SimpleCounter();

453

cacheMetrics.hitCounter(hits);

454

455

this.misses = new SimpleCounter();

456

cacheMetrics.missCounter(misses);

457

458

this.loads = new SimpleCounter();

459

cacheMetrics.loadCounter(loads);

460

461

this.loadFailures = new SimpleCounter();

462

cacheMetrics.numLoadFailuresCounter(loadFailures);

463

464

// Register gauges for cache state

465

cacheMetrics.latestLoadTimeGauge(() -> lastLoadTimeMillis);

466

cacheMetrics.numCachedRecordsGauge(() -> cache.size());

467

cacheMetrics.numCachedBytesGauge(() -> cache.getEstimatedSize());

468

}

469

470

@Override

471

public Output map(Input input) throws Exception {

472

Output cached = cache.get(input.getKey());

473

if (cached != null) {

474

hits.inc();

475

return cached;

476

}

477

478

// Cache miss - load from external system

479

misses.inc();

480

long startTime = System.currentTimeMillis();

481

482

try {

483

loads.inc();

484

Output result = loadFromExternalSystem(input);

485

cache.put(input.getKey(), result);

486

487

lastLoadTimeMillis = System.currentTimeMillis() - startTime;

488

return result;

489

490

} catch (Exception e) {

491

loadFailures.inc();

492

throw e;

493

}

494

}

495

}

496

```

497

498

### SplitEnumeratorMetricGroup Interface

499

500

Metric group for split enumerators in the source connector API, tracking split assignment and discovery.

501

502

```java { .api }

503

/**

504

* Metric group for split enumerators.

505

*/

506

public interface SplitEnumeratorMetricGroup extends MetricGroup {

507

// Extends MetricGroup with split enumerator context

508

}

509

```

510

511

**Usage Examples:**

512

513

```java

514

// In a split enumerator implementation

515

public class KafkaSplitEnumerator implements SplitEnumerator<KafkaPartitionSplit, KafkaState> {

516

private SplitEnumeratorMetricGroup enumeratorMetrics;

517

private Counter splitsDiscovered;

518

private Counter splitsAssigned;

519

private Gauge<Integer> unassignedSplits;

520

private Gauge<Integer> activeReaders;

521

522

public void initialize(SplitEnumeratorContext<KafkaPartitionSplit> context) {

523

this.enumeratorMetrics = context.getMetricGroup();

524

525

// Register enumerator metrics

526

splitsDiscovered = enumeratorMetrics.counter("splits-discovered");

527

splitsAssigned = enumeratorMetrics.counter("splits-assigned");

528

unassignedSplits = enumeratorMetrics.gauge("unassigned-splits", () -> unassignedSplitQueue.size());

529

activeReaders = enumeratorMetrics.gauge("active-readers", () -> readerStates.size());

530

531

// Topic-specific metrics

532

MetricGroup topicGroup = enumeratorMetrics.addGroup("topics");

533

for (String topic : monitoredTopics) {

534

MetricGroup tGroup = topicGroup.addGroup("topic", topic);

535

tGroup.gauge("partitions", () -> getPartitionCount(topic));

536

tGroup.counter("splits-for-topic");

537

}

538

}

539

540

@Override

541

public void handleSplitRequest(int subtaskId, String requesterHostname) {

542

List<KafkaPartitionSplit> availableSplits = getAvailableSplits();

543

544

if (!availableSplits.isEmpty()) {

545

context.assignSplit(availableSplits.get(0), subtaskId);

546

splitsAssigned.inc();

547

}

548

}

549

550

@Override

551

public void addSplitsBack(List<KafkaPartitionSplit> splits, int subtaskId) {

552

unassignedSplitQueue.addAll(splits);

553

// Splits are back in the pool for reassignment

554

}

555

}

556

```

557

558

### OperatorCoordinatorMetricGroup Interface

559

560

Metric group for operator coordinators, which manage coordination between parallel operator instances.

561

562

```java { .api }

563

/**

564

* Metric group for operator coordinators.

565

*/

566

public interface OperatorCoordinatorMetricGroup extends MetricGroup {

567

// Extends MetricGroup with operator coordinator context

568

}

569

```

570

571

**Usage Examples:**

572

573

```java

574

// In an operator coordinator implementation

575

public class CheckpointCoordinator implements OperatorCoordinator {

576

private OperatorCoordinatorMetricGroup coordinatorMetrics;

577

private Counter coordinationEvents;

578

private Counter successfulCoordinations;

579

private Counter failedCoordinations;

580

private Gauge<Integer> pendingOperations;

581

582

public void initialize(OperatorCoordinator.Context context) {

583

this.coordinatorMetrics = context.getMetricGroup();

584

585

// Register coordinator metrics

586

coordinationEvents = coordinatorMetrics.counter("coordination-events");

587

successfulCoordinations = coordinatorMetrics.counter("successful-coordinations");

588

failedCoordinations = coordinatorMetrics.counter("failed-coordinations");

589

pendingOperations = coordinatorMetrics.gauge("pending-operations", () -> operationQueue.size());

590

591

// Per-subtask metrics

592

MetricGroup subtaskGroup = coordinatorMetrics.addGroup("subtasks");

593

for (int i = 0; i < context.currentParallelism(); i++) {

594

MetricGroup stGroup = subtaskGroup.addGroup("subtask", String.valueOf(i));

595

stGroup.counter("messages-sent");

596

stGroup.counter("messages-received");

597

stGroup.gauge("last-seen", () -> getLastSeenTime(i));

598

}

599

}

600

601

@Override

602

public void handleEventFromOperator(int subtask, OperatorEvent event) {

603

coordinationEvents.inc();

604

605

try {

606

processCoordinationEvent(subtask, event);

607

successfulCoordinations.inc();

608

} catch (Exception e) {

609

failedCoordinations.inc();

610

throw new RuntimeException("Coordination failed", e);

611

}

612

}

613

}

614

```

615

616

### CacheMetricGroup Interface

617

618

Metric group for cache-related metrics, useful for caching layers and buffering components.

619

620

```java { .api }

621

/**

622

* Metric group for cache metrics.

623

*/

624

public interface CacheMetricGroup extends MetricGroup {

625

// Extends MetricGroup with cache-specific context

626

}

627

```

628

629

**Usage Examples:**

630

631

```java

632

// In a caching component

633

public class LookupCache {

634

private CacheMetricGroup cacheMetrics;

635

private Counter hits;

636

private Counter misses;

637

private Counter evictions;

638

private Gauge<Integer> cacheSize;

639

private Gauge<Double> hitRate;

640

641

public void initialize(CacheMetricGroup metrics) {

642

this.cacheMetrics = metrics;

643

644

// Standard cache metrics

645

hits = cacheMetrics.counter("hits");

646

misses = cacheMetrics.counter("misses");

647

evictions = cacheMetrics.counter("evictions");

648

cacheSize = cacheMetrics.gauge("size", () -> cache.size());

649

hitRate = cacheMetrics.gauge("hit-rate", this::calculateHitRate);

650

651

// Memory metrics

652

MetricGroup memoryGroup = cacheMetrics.addGroup("memory");

653

memoryGroup.gauge("used-bytes", () -> cache.estimatedSize());

654

memoryGroup.gauge("max-bytes", () -> cache.maximumSize());

655

}

656

657

public Object lookup(String key) {

658

Object value = cache.get(key);

659

660

if (value != null) {

661

hits.inc();

662

return value;

663

} else {

664

misses.inc();

665

return null;

666

}

667

}

668

669

private double calculateHitRate() {

670

long totalHits = hits.getCount();

671

long totalMisses = misses.getCount();

672

long total = totalHits + totalMisses;

673

674

return total > 0 ? (double) totalHits / total : 0.0;

675

}

676

}

677

```