or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

changelog-writers.mdconfiguration-options.mdindex.mdmetrics-monitoring.mdrecovery-system.mdstorage-factory.mdstorage-implementation.mdupload-system.md

metrics-monitoring.mddocs/

0

# Metrics and Monitoring

1

2

Comprehensive metrics collection for monitoring upload performance, failure rates, and system health in production environments. The metrics system provides visibility into changelog storage operations and helps with performance tuning.

3

4

## Capabilities

5

6

### ChangelogStorageMetricGroup

7

8

Main metrics container providing thread-safe collection of changelog storage performance data.

9

10

```java { .api }

11

/**

12

* Metrics related to the Changelog Storage used by the Changelog State Backend.

13

* Thread-safe implementation for use by multiple uploader threads.

14

*/

15

public class ChangelogStorageMetricGroup extends ProxyMetricGroup<MetricGroup> {

16

17

/**

18

* Creates changelog storage metric group

19

* @param parent Parent metric group for hierarchical organization

20

*/

21

public ChangelogStorageMetricGroup(MetricGroup parent);

22

23

/**

24

* Records a successful upload operation

25

* @param batchSize Number of change sets in the upload batch

26

* @param uploadSize Total size of uploaded data in bytes

27

* @param latencyNanos Upload latency in nanoseconds

28

* @param attempts Number of attempts needed for successful upload

29

* @param totalAttempts Total attempts including retries

30

*/

31

public void recordUpload(

32

int batchSize,

33

long uploadSize,

34

long latencyNanos,

35

int attempts,

36

int totalAttempts

37

);

38

39

/**

40

* Records a failed upload operation

41

* @param batchSize Number of change sets in the failed batch

42

* @param uploadSize Total size of failed data in bytes

43

* @param attempts Number of attempts made before failure

44

*/

45

public void recordUploadFailure(int batchSize, long uploadSize, int attempts);

46

47

/**

48

* Updates the current in-flight data gauge

49

* @param inFlightBytes Current amount of in-flight data in bytes

50

*/

51

public void updateInFlightData(long inFlightBytes);

52

53

/**

54

* Records queue size metrics

55

* @param queueSize Current number of tasks in upload queue

56

*/

57

public void updateQueueSize(int queueSize);

58

}

59

```

60

61

### Core Metrics

62

63

The metric group provides several categories of metrics for comprehensive monitoring:

64

65

```java { .api }

66

/**

67

* Counter metrics for tracking upload operations

68

*/

69

public class CounterMetrics {

70

/** Total number of upload requests initiated */

71

private final Counter uploadsCounter;

72

73

/** Total number of upload failures */

74

private final Counter uploadFailuresCounter;

75

}

76

77

/**

78

* Histogram metrics for tracking distributions and performance

79

*/

80

public class HistogramMetrics {

81

/** Distribution of batch sizes in upload operations */

82

private final Histogram uploadBatchSizes;

83

84

/** Distribution of upload sizes in bytes */

85

private final Histogram uploadSizes;

86

87

/** Distribution of upload latencies in nanoseconds */

88

private final Histogram uploadLatenciesNanos;

89

90

/** Distribution of attempts per successful upload */

91

private final Histogram attemptsPerUpload;

92

93

/** Distribution of total attempts including failed uploads */

94

private final Histogram totalAttemptsPerUpload;

95

}

96

97

/**

98

* Gauge metrics for real-time status monitoring

99

*/

100

public class GaugeMetrics {

101

/** Current amount of in-flight data in bytes */

102

private final Gauge<Long> inFlightDataGauge;

103

104

/** Current size of upload queue */

105

private final Gauge<Integer> queueSizeGauge;

106

}

107

```

108

109

### Metric Constants

110

111

Standard metric names for consistent reporting across Flink installations:

112

113

```java { .api }

114

/**

115

* Standard metric names for changelog storage

116

*/

117

public class MetricNames {

118

/** Counter: Total number of upload requests */

119

public static final String CHANGELOG_STORAGE_NUM_UPLOAD_REQUESTS = "numUploadRequests";

120

121

/** Counter: Total number of upload failures */

122

public static final String CHANGELOG_STORAGE_NUM_UPLOAD_FAILURES = "numUploadFailures";

123

124

/** Histogram: Upload batch sizes */

125

public static final String CHANGELOG_STORAGE_UPLOAD_BATCH_SIZES = "uploadBatchSizes";

126

127

/** Histogram: Upload sizes in bytes */

128

public static final String CHANGELOG_STORAGE_UPLOAD_SIZES = "uploadSizes";

129

130

/** Histogram: Upload latencies in nanoseconds */

131

public static final String CHANGELOG_STORAGE_UPLOAD_LATENCIES_NANOS = "uploadLatenciesNanos";

132

133

/** Histogram: Attempts per successful upload */

134

public static final String CHANGELOG_STORAGE_ATTEMPTS_PER_UPLOAD = "attemptsPerUpload";

135

136

/** Histogram: Total attempts including failures */

137

public static final String CHANGELOG_STORAGE_TOTAL_ATTEMPTS_PER_UPLOAD = "totalAttemptsPerUpload";

138

139

/** Gauge: Current in-flight data in bytes */

140

public static final String CHANGELOG_STORAGE_IN_FLIGHT_DATA = "inFlightData";

141

142

/** Gauge: Current upload queue size */

143

public static final String CHANGELOG_STORAGE_QUEUE_SIZE = "queueSize";

144

}

145

```

146

147

**Usage Examples:**

148

149

```java

150

import org.apache.flink.changelog.fs.ChangelogStorageMetricGroup;

151

import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;

152

153

// Create metrics group (typically done by storage)

154

ChangelogStorageMetricGroup metricGroup =

155

new ChangelogStorageMetricGroup(taskManagerJobMetricGroup);

156

157

// Record successful upload

158

long startTime = System.nanoTime();

159

// ... perform upload ...

160

long endTime = System.nanoTime();

161

162

metricGroup.recordUpload(

163

batchSize, // Number of change sets uploaded

164

uploadSizeBytes, // Total bytes uploaded

165

endTime - startTime, // Upload latency in nanoseconds

166

1, // Attempts needed (1 for success on first try)

167

1 // Total attempts made

168

);

169

170

// Record failed upload

171

metricGroup.recordUploadFailure(

172

failedBatchSize, // Number of change sets that failed

173

failedUploadSize, // Total bytes that failed to upload

174

maxAttempts // Number of attempts made before giving up

175

);

176

177

// Update real-time gauges

178

metricGroup.updateInFlightData(currentInFlightBytes);

179

metricGroup.updateQueueSize(currentQueueSize);

180

```

181

182

### Integration with Upload System

183

184

The metrics system integrates seamlessly with the upload components:

185

186

```java

187

// Upload scheduler records metrics during operation

188

public class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler {

189

190

private void executeUpload(Collection<UploadTask> tasks) {

191

int totalBatchSize = tasks.stream().mapToInt(task -> task.getChangeSets().size()).sum();

192

long totalSize = tasks.stream().mapToLong(task -> task.getTotalSize()).sum();

193

194

// Update queue size before upload

195

metricGroup.updateQueueSize(pendingTasks.size());

196

197

long startTime = System.nanoTime();

198

try {

199

// Perform upload

200

UploadTasksResult result = uploader.upload(tasks);

201

long endTime = System.nanoTime();

202

203

// Record successful uploads

204

result.getSuccessful().forEach((task, uploadResults) -> {

205

metricGroup.recordUpload(

206

task.getChangeSets().size(),

207

task.getTotalSize(),

208

endTime - startTime,

209

1, // Successful on first attempt

210

1

211

);

212

});

213

214

// Record failed uploads

215

result.getFailed().forEach((task, exception) -> {

216

metricGroup.recordUploadFailure(

217

task.getChangeSets().size(),

218

task.getTotalSize(),

219

maxRetryAttempts

220

);

221

});

222

223

} catch (Exception e) {

224

// Record all as failures

225

metricGroup.recordUploadFailure(totalBatchSize, totalSize, maxRetryAttempts);

226

}

227

}

228

}

229

```

230

231

### Retry Metrics Integration

232

233

The metrics system tracks retry behavior and helps tune retry policies:

234

235

```java

236

// RetryingExecutor integration with metrics

237

public class RetryingExecutor {

238

239

public <T> T execute(Callable<T> operation, RetryPolicy retryPolicy) throws Exception {

240

int attempts = 0;

241

long totalAttempts = 0;

242

Exception lastException = null;

243

244

while (attempts < maxAttempts) {

245

attempts++;

246

totalAttempts++;

247

248

try {

249

long startTime = System.nanoTime();

250

T result = operation.call();

251

long endTime = System.nanoTime();

252

253

// Record successful operation with retry metrics

254

metricGroup.recordUpload(

255

batchSize,

256

uploadSize,

257

endTime - startTime,

258

attempts, // Attempts needed for success

259

totalAttempts // Total attempts made

260

);

261

262

return result;

263

264

} catch (Exception e) {

265

lastException = e;

266

long retryDelay = retryPolicy.retryAfter(attempts, e);

267

268

if (retryDelay < 0) {

269

// No more retries

270

break;

271

}

272

273

// Sleep before retry

274

Thread.sleep(retryDelay);

275

}

276

}

277

278

// Record failure with total attempts

279

metricGroup.recordUploadFailure(batchSize, uploadSize, totalAttempts);

280

throw lastException;

281

}

282

}

283

```

284

285

### Monitoring Dashboard Integration

286

287

The metrics can be integrated with monitoring dashboards and alerting systems:

288

289

```java

290

// Example metric queries for monitoring systems:

291

292

// Upload success rate

293

// sum(rate(numUploadRequests[5m])) - sum(rate(numUploadFailures[5m])) / sum(rate(numUploadRequests[5m]))

294

295

// Average upload latency

296

// histogram_quantile(0.5, uploadLatenciesNanos)

297

298

// P99 upload latency

299

// histogram_quantile(0.99, uploadLatenciesNanos)

300

301

// Current backpressure status

302

// inFlightData > in_flight_data_limit_threshold

303

304

// Queue buildup

305

// queueSize > queue_size_threshold

306

```

307

308

### Performance Analysis

309

310

Use metrics for performance analysis and optimization:

311

312

```java

313

/**

314

* Metrics analysis for performance tuning

315

*/

316

public class PerformanceAnalysis {

317

318

/**

319

* Analyzes upload performance metrics

320

* @param metricGroup Metrics to analyze

321

* @return Performance recommendations

322

*/

323

public PerformanceRecommendations analyze(ChangelogStorageMetricGroup metricGroup) {

324

// Analyze upload patterns

325

double failureRate = calculateFailureRate();

326

double averageLatency = calculateAverageLatency();

327

double p99Latency = calculateP99Latency();

328

long averageBatchSize = calculateAverageBatchSize();

329

330

// Generate recommendations

331

if (failureRate > 0.05) {

332

// High failure rate - increase retry attempts or timeout

333

return new PerformanceRecommendations()

334

.increaseRetryAttempts()

335

.increaseUploadTimeout();

336

}

337

338

if (p99Latency > Duration.ofSeconds(10).toNanos()) {

339

// High tail latency - increase parallelism or buffer size

340

return new PerformanceRecommendations()

341

.increaseUploadThreads()

342

.increaseBufferSize();

343

}

344

345

if (averageBatchSize < 5) {

346

// Small batches - increase batching delay or threshold

347

return new PerformanceRecommendations()

348

.increasePersistDelay()

349

.increasePersistSizeThreshold();

350

}

351

352

return PerformanceRecommendations.optimal();

353

}

354

}

355

```

356

357

### Alerting and Monitoring

358

359

Set up alerts based on key metrics:

360

361

```java

362

/**

363

* Monitoring thresholds for alerting

364

*/

365

public class MonitoringThresholds {

366

367

/** Alert when failure rate exceeds 5% */

368

public static final double MAX_FAILURE_RATE = 0.05;

369

370

/** Alert when P99 latency exceeds 30 seconds */

371

public static final long MAX_P99_LATENCY_NANOS = Duration.ofSeconds(30).toNanos();

372

373

/** Alert when in-flight data approaches limit */

374

public static final double IN_FLIGHT_DATA_WARNING_RATIO = 0.8;

375

376

/** Alert when queue size indicates backpressure */

377

public static final int MAX_QUEUE_SIZE = 1000;

378

379

/**

380

* Checks if any metrics exceed alert thresholds

381

* @param metrics Current metric values

382

* @return List of active alerts

383

*/

384

public List<Alert> checkAlerts(MetricSnapshot metrics) {

385

List<Alert> alerts = new ArrayList<>();

386

387

if (metrics.getFailureRate() > MAX_FAILURE_RATE) {

388

alerts.add(Alert.highFailureRate(metrics.getFailureRate()));

389

}

390

391

if (metrics.getP99LatencyNanos() > MAX_P99_LATENCY_NANOS) {

392

alerts.add(Alert.highLatency(metrics.getP99LatencyNanos()));

393

}

394

395

if (metrics.getInFlightDataRatio() > IN_FLIGHT_DATA_WARNING_RATIO) {

396

alerts.add(Alert.backpressureWarning(metrics.getInFlightDataRatio()));

397

}

398

399

if (metrics.getQueueSize() > MAX_QUEUE_SIZE) {

400

alerts.add(Alert.queueBacklog(metrics.getQueueSize()));

401

}

402

403

return alerts;

404

}

405

}

406

```