or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assertions.mdcontainers.mdexternal-systems.mdindex.mdjunit-integration.mdmetrics.mdtest-environments.mdtest-suites.md

metrics.mddocs/

0

# Metrics and Monitoring

1

2

The metrics framework provides utilities for querying and validating Flink job metrics via REST API. This enables performance testing, behavior validation, and monitoring of connector operations during test execution.

3

4

## Capabilities

5

6

### Metric Querier

7

8

Main utility class for querying job metrics from Flink cluster via REST API.

9

10

```java { .api }

11

/**

12

* Utility for querying job metrics via Flink REST API

13

*/

14

public class MetricQuerier {

15

16

/**

17

* Create metric querier with configuration

18

* @param configuration Flink configuration for REST client

19

* @throws ConfigurationException if configuration is invalid

20

*/

21

public MetricQuerier(Configuration configuration) throws ConfigurationException;

22

23

/**

24

* Get job details including vertex information

25

* @param client REST client instance

26

* @param endpoint Test environment REST endpoint

27

* @param jobId Job ID to query

28

* @return Job details with vertex information

29

* @throws Exception if query fails

30

*/

31

public static JobDetailsInfo getJobDetails(

32

RestClient client,

33

TestEnvironment.Endpoint endpoint,

34

JobID jobId

35

) throws Exception;

36

37

/**

38

* Get aggregated metrics for source/sink operator

39

* @param endpoint Test environment REST endpoint

40

* @param jobId Job ID to query

41

* @param sourceOrSinkName Name of source or sink operator

42

* @param metricName Name of metric to query (e.g., "numRecordsIn", "numRecordsOut")

43

* @param filter Optional filter for metric selection (e.g., "Writer" for Sink V2)

44

* @return Aggregated metric value across all subtasks

45

* @throws Exception if query fails or metric not found

46

*/

47

public Double getAggregatedMetricsByRestAPI(

48

TestEnvironment.Endpoint endpoint,

49

JobID jobId,

50

String sourceOrSinkName,

51

String metricName,

52

String filter

53

) throws Exception;

54

55

/**

56

* Get list of available metrics for job vertex

57

* @param endpoint Test environment REST endpoint

58

* @param jobId Job ID to query

59

* @param vertexId Vertex ID to query metrics for

60

* @return Response containing available metrics

61

* @throws Exception if query fails

62

*/

63

public AggregatedMetricsResponseBody getMetricList(

64

TestEnvironment.Endpoint endpoint,

65

JobID jobId,

66

JobVertexID vertexId

67

) throws Exception;

68

69

/**

70

* Get specific metrics with filtering

71

* @param endpoint Test environment REST endpoint

72

* @param jobId Job ID to query

73

* @param vertexId Vertex ID to query

74

* @param filters Comma-separated list of metric filters

75

* @return Response containing filtered metrics

76

* @throws Exception if query fails

77

*/

78

public AggregatedMetricsResponseBody getMetrics(

79

TestEnvironment.Endpoint endpoint,

80

JobID jobId,

81

JobVertexID vertexId,

82

String filters

83

) throws Exception;

84

}

85

```

86

87

**Usage Examples:**

88

89

```java

90

// Create metric querier

91

MetricQuerier metricQuerier = new MetricQuerier(new Configuration());

92

93

// Query sink metrics

94

Double numRecordsOut = metricQuerier.getAggregatedMetricsByRestAPI(

95

testEnv.getRestEndpoint(),

96

jobClient.getJobID(),

97

"MySink", // sink operator name

98

MetricNames.NUM_RECORDS_SEND, // metric name

99

"Writer" // filter for Sink V2

100

);

101

102

// Validate expected record count

103

assertThat(numRecordsOut).isEqualTo(expectedRecordCount);

104

```

105

106

### Common Metrics

107

108

Standard metrics available for source and sink connectors.

109

110

```java { .api }

111

/**

112

* Configuration constants for connector testing

113

*/

114

public class ConnectorTestConstants {

115

public static final long METRIC_FETCHER_UPDATE_INTERVAL_MS = 1000L;

116

public static final long SLOT_REQUEST_TIMEOUT_MS = 10_000L;

117

public static final long HEARTBEAT_TIMEOUT_MS = 5_000L;

118

public static final long HEARTBEAT_INTERVAL_MS = 1000L;

119

public static final Duration DEFAULT_COLLECT_DATA_TIMEOUT = Duration.ofSeconds(120L);

120

}

121

```

122

123

Common metric names used in connector testing (from Flink's MetricNames class):

124

125

- `"numRecordsIn"` - Number of records received by source

126

- `"numRecordsOut"` - Number of records emitted by source

127

- `"numRecordsSend"` - Number of records sent by sink

128

- `"numBytesIn"` - Number of bytes received

129

- `"numBytesOut"` - Number of bytes emitted

130

131

**Usage Examples:**

132

133

```java

134

// Query different metric types using metric name strings

135

Double recordsIn = metricQuerier.getAggregatedMetricsByRestAPI(

136

endpoint, jobId, "MySource", "numRecordsIn", null);

137

138

Double recordsOut = metricQuerier.getAggregatedMetricsByRestAPI(

139

endpoint, jobId, "MySink", "numRecordsSend", "Writer");

140

141

Double bytesIn = metricQuerier.getAggregatedMetricsByRestAPI(

142

endpoint, jobId, "MySource", "numBytesIn", null);

143

```

144

145

## Integration with Test Suites

146

147

### Automatic Metrics Testing

148

149

Test suites include automatic metrics validation for sources and sinks.

150

151

```java

152

// From SinkTestSuiteBase.testMetrics()

153

@TestTemplate

154

@DisplayName("Test sink metrics")

155

public void testMetrics(

156

TestEnvironment testEnv,

157

DataStreamSinkExternalContext<T> externalContext,

158

CheckpointingMode semantic

159

) throws Exception {

160

161

// Generate and send test data

162

List<T> testRecords = generateTestData(sinkSettings, externalContext);

163

164

// Create and execute job

165

StreamExecutionEnvironment env = testEnv.createExecutionEnvironment(settings);

166

// ... job setup and execution

167

168

// Validate metrics

169

MetricQuerier queryRestClient = new MetricQuerier(new Configuration());

170

waitUntilCondition(() -> {

171

try {

172

return compareSinkMetrics(

173

queryRestClient,

174

testEnv,

175

externalContext,

176

jobClient.getJobID(),

177

sinkName,

178

MetricNames.NUM_RECORDS_SEND,

179

testRecords.size()

180

);

181

} catch (Exception e) {

182

return false; // Retry on failure

183

}

184

});

185

}

186

```

187

188

### Custom Metrics Validation

189

190

Implement custom metrics validation in your test classes.

191

192

```java

193

public class MyConnectorTestSuite extends SinkTestSuiteBase<String> {

194

195

@Test

196

public void testCustomMetrics() throws Exception {

197

// Execute connector job

198

JobClient jobClient = executeConnectorJob();

199

200

// Query custom metrics

201

MetricQuerier querier = new MetricQuerier(new Configuration());

202

203

// Validate throughput metrics

204

Double throughput = querier.getAggregatedMetricsByRestAPI(

205

testEnv.getRestEndpoint(),

206

jobClient.getJobID(),

207

"MyConnector",

208

"recordsPerSecond",

209

null

210

);

211

212

assertThat(throughput).isGreaterThan(1000.0); // Minimum throughput requirement

213

214

// Validate error metrics

215

Double errorRate = querier.getAggregatedMetricsByRestAPI(

216

testEnv.getRestEndpoint(),

217

jobClient.getJobID(),

218

"MyConnector",

219

"errorRate",

220

null

221

);

222

223

assertThat(errorRate).isLessThan(0.01); // Less than 1% error rate

224

}

225

}

226

```

227

228

## Metric Validation Patterns

229

230

### Throughput Validation

231

232

Validate connector throughput meets performance requirements.

233

234

```java

235

public void validateThroughput(JobClient jobClient, int expectedRecords, Duration testDuration) throws Exception {

236

MetricQuerier querier = new MetricQuerier(new Configuration());

237

238

// Wait for job to process all records

239

waitUntilCondition(() -> {

240

try {

241

Double processedRecords = querier.getAggregatedMetricsByRestAPI(

242

testEnv.getRestEndpoint(),

243

jobClient.getJobID(),

244

"MySource",

245

NUM_RECORDS_OUT,

246

null

247

);

248

return processedRecords >= expectedRecords;

249

} catch (Exception e) {

250

return false;

251

}

252

});

253

254

// Calculate and validate throughput

255

double throughput = expectedRecords / testDuration.toSeconds();

256

assertThat(throughput).isGreaterThan(100.0); // Records per second

257

}

258

```

259

260

### Latency Validation

261

262

Validate connector latency remains within acceptable bounds.

263

264

```java

265

public void validateLatency(JobClient jobClient) throws Exception {

266

MetricQuerier querier = new MetricQuerier(new Configuration());

267

268

// Query latency metrics

269

Double avgLatency = querier.getAggregatedMetricsByRestAPI(

270

testEnv.getRestEndpoint(),

271

jobClient.getJobID(),

272

"MyConnector",

273

LATENCY,

274

null

275

);

276

277

// Validate latency is under 100ms

278

assertThat(avgLatency).isLessThan(100.0);

279

}

280

```

281

282

### Resource Usage Validation

283

284

Validate memory and CPU usage patterns.

285

286

```java

287

public void validateResourceUsage(JobClient jobClient) throws Exception {

288

MetricQuerier querier = new MetricQuerier(new Configuration());

289

290

// Query memory usage

291

Double heapUsed = querier.getAggregatedMetricsByRestAPI(

292

testEnv.getRestEndpoint(),

293

jobClient.getJobID(),

294

"MyConnector",

295

"memoryHeapUsed",

296

null

297

);

298

299

Double heapMax = querier.getAggregatedMetricsByRestAPI(

300

testEnv.getRestEndpoint(),

301

jobClient.getJobID(),

302

"MyConnector",

303

"memoryHeapMax",

304

null

305

);

306

307

// Validate memory usage is under 80% of max

308

double memoryUsageRatio = heapUsed / heapMax;

309

assertThat(memoryUsageRatio).isLessThan(0.8);

310

}

311

```

312

313

## Advanced Metrics Scenarios

314

315

### Multi-Operator Metrics

316

317

Query metrics across multiple operators in complex pipelines.

318

319

```java

320

public void validatePipelineMetrics(JobClient jobClient) throws Exception {

321

MetricQuerier querier = new MetricQuerier(new Configuration());

322

323

// Query source metrics

324

Double sourceRecords = querier.getAggregatedMetricsByRestAPI(

325

testEnv.getRestEndpoint(), jobClient.getJobID(), "Source", NUM_RECORDS_OUT, null);

326

327

// Query transformation metrics

328

Double transformRecords = querier.getAggregatedMetricsByRestAPI(

329

testEnv.getRestEndpoint(), jobClient.getJobID(), "Transform", NUM_RECORDS_OUT, null);

330

331

// Query sink metrics

332

Double sinkRecords = querier.getAggregatedMetricsByRestAPI(

333

testEnv.getRestEndpoint(), jobClient.getJobID(), "Sink", NUM_RECORDS_SEND, "Writer");

334

335

// Validate record flow through pipeline

336

assertThat(sourceRecords).isEqualTo(transformRecords);

337

assertThat(transformRecords).isEqualTo(sinkRecords);

338

}

339

```

340

341

### Historical Metrics Comparison

342

343

Compare metrics across test runs for performance regression detection.

344

345

```java

346

public void compareWithBaseline(JobClient jobClient, MetricsBaseline baseline) throws Exception {

347

MetricQuerier querier = new MetricQuerier(new Configuration());

348

349

// Query current metrics

350

Double currentThroughput = querier.getAggregatedMetricsByRestAPI(

351

testEnv.getRestEndpoint(), jobClient.getJobID(), "MyConnector", "throughput", null);

352

353

Double currentLatency = querier.getAggregatedMetricsByRestAPI(

354

testEnv.getRestEndpoint(), jobClient.getJobID(), "MyConnector", LATENCY, null);

355

356

// Compare with baseline (allow 10% deviation)

357

assertThat(currentThroughput).isGreaterThan(baseline.getThroughput() * 0.9);

358

assertThat(currentLatency).isLessThan(baseline.getLatency() * 1.1);

359

}

360

```

361

362

### Parallel Subtask Metrics

363

364

Analyze metrics across parallel subtasks for load balancing validation.

365

366

```java

367

public void validateLoadBalancing(JobClient jobClient) throws Exception {

368

MetricQuerier querier = new MetricQuerier(new Configuration());

369

370

// Get metrics for each subtask

371

JobDetailsInfo jobDetails = MetricQuerier.getJobDetails(

372

new RestClient(new Configuration(), Executors.newCachedThreadPool()),

373

testEnv.getRestEndpoint(),

374

jobClient.getJobID()

375

);

376

377

// Find source vertex

378

JobVertexID sourceVertexId = jobDetails.getJobVertexInfos().stream()

379

.filter(v -> v.getName().contains("Source"))

380

.findFirst()

381

.map(JobDetailsInfo.JobVertexDetailsInfo::getJobVertexID)

382

.orElseThrow();

383

384

// Get per-subtask metrics

385

AggregatedMetricsResponseBody metrics = querier.getMetrics(

386

testEnv.getRestEndpoint(),

387

jobClient.getJobID(),

388

sourceVertexId,

389

NUM_RECORDS_OUT

390

);

391

392

// Validate load distribution

393

List<Double> subtaskValues = metrics.getMetrics().stream()

394

.map(AggregatedMetric::getSum)

395

.collect(Collectors.toList());

396

397

double mean = subtaskValues.stream().mapToDouble(Double::doubleValue).average().orElse(0.0);

398

double maxDeviation = subtaskValues.stream()

399

.mapToDouble(value -> Math.abs(value - mean) / mean)

400

.max().orElse(0.0);

401

402

// Ensure load is balanced within 20%

403

assertThat(maxDeviation).isLessThan(0.2);

404

}

405

```

406

407

## Error Handling

408

409

### Metric Query Failures

410

411

```java

412

try {

413

Double metric = querier.getAggregatedMetricsByRestAPI(endpoint, jobId, operatorName, metricName, filter);

414

} catch (IllegalStateException e) {

415

// Metric not found - operator name or metric name incorrect

416

fail("Metric not found: " + e.getMessage());

417

} catch (Exception e) {

418

// Network or cluster issues

419

throw new AssumptionViolatedException("Unable to query metrics", e);

420

}

421

```

422

423

### Timeout Handling

424

425

```java

426

// Use waitUntilCondition for retry logic

427

waitUntilCondition(() -> {

428

try {

429

Double metric = querier.getAggregatedMetricsByRestAPI(endpoint, jobId, operatorName, metricName, filter);

430

return Precision.equals(expectedValue, metric);

431

} catch (Exception e) {

432

// Retry on failure

433

return false;

434

}

435

}, Duration.ofMinutes(2)); // 2 minute timeout

436

```

437

438

### Debugging Metrics Issues

439

440

```java

441

// List all available metrics for debugging

442

AggregatedMetricsResponseBody allMetrics = querier.getMetricList(endpoint, jobId, vertexId);

443

allMetrics.getMetrics().forEach(metric -> {

444

System.out.println("Available metric: " + metric.getId());

445

});

446

447

// Get job details for vertex information

448

JobDetailsInfo jobDetails = MetricQuerier.getJobDetails(restClient, endpoint, jobId);

449

jobDetails.getJobVertexInfos().forEach(vertex -> {

450

System.out.println("Vertex: " + vertex.getName() + " ID: " + vertex.getJobVertexID());

451

});

452

```