or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-metrics.mdimplementations.mdindex.mdmetric-groups.mdreporters.mdspecialized-groups.mdtracing.md

configuration.mddocs/

0

# Configuration and Utilities

1

2

Configuration management and utility classes for metric system setup. Provides type-safe property access, character filtering for metric names, and supporting interfaces for metric system operation.

3

4

## Capabilities

5

6

### MetricConfig Class

7

8

Type-safe configuration class extending Properties with utility methods for extracting primitive values with defaults.

9

10

```java { .api }

11

/**

12

* A properties class with added utility method to extract primitives.

13

*/

14

public class MetricConfig extends Properties {

15

16

/**

17

* Gets string property with default value.

18

* @param key the property key

19

* @param defaultValue default value if key not found

20

* @return property value or default

21

*/

22

public String getString(String key, String defaultValue) { /* implementation */ }

23

24

/**

25

* Gets integer property with default value.

26

* @param key the property key

27

* @param defaultValue default value if key not found

28

* @return property value parsed as int or default

29

*/

30

public int getInteger(String key, int defaultValue) { /* implementation */ }

31

32

/**

33

* Gets long property with default value.

34

* @param key the property key

35

* @param defaultValue default value if key not found

36

* @return property value parsed as long or default

37

*/

38

public long getLong(String key, long defaultValue) { /* implementation */ }

39

40

/**

41

* Gets float property with default value.

42

* @param key the property key

43

* @param defaultValue default value if key not found

44

* @return property value parsed as float or default

45

*/

46

public float getFloat(String key, float defaultValue) { /* implementation */ }

47

48

/**

49

* Gets double property with default value.

50

* @param key the property key

51

* @param defaultValue default value if key not found

52

* @return property value parsed as double or default

53

*/

54

public double getDouble(String key, double defaultValue) { /* implementation */ }

55

56

/**

57

* Gets boolean property with default value.

58

* @param key the property key

59

* @param defaultValue default value if key not found

60

* @return property value parsed as boolean or default

61

*/

62

public boolean getBoolean(String key, boolean defaultValue) { /* implementation */ }

63

}

64

```

65

66

**Usage Examples:**

67

68

```java

69

// Creating and using MetricConfig

70

MetricConfig config = new MetricConfig();

71

72

// Set properties (inherited from Properties)

73

config.setProperty("reporter.host", "metrics.example.com");

74

config.setProperty("reporter.port", "8080");

75

config.setProperty("reporter.enabled", "true");

76

config.setProperty("reporter.batch.size", "100");

77

config.setProperty("reporter.timeout", "30.5");

78

79

// Type-safe property retrieval with defaults

80

String host = config.getString("reporter.host", "localhost");

81

int port = config.getInteger("reporter.port", 9090);

82

boolean enabled = config.getBoolean("reporter.enabled", false);

83

int batchSize = config.getInteger("reporter.batch.size", 50);

84

double timeout = config.getDouble("reporter.timeout", 10.0);

85

86

// Missing keys return defaults

87

String missingKey = config.getString("non.existent", "default-value");

88

int missingInt = config.getInteger("missing.int", 42);

89

90

// Use in reporter configuration

91

public class ConfigurableReporter implements MetricReporter {

92

private String endpoint;

93

private int batchSize;

94

private long flushInterval;

95

private boolean compressionEnabled;

96

97

@Override

98

public void open(MetricConfig config) {

99

// Required configuration

100

this.endpoint = config.getString("endpoint", null);

101

if (endpoint == null) {

102

throw new IllegalArgumentException("endpoint is required");

103

}

104

105

// Optional configuration with sensible defaults

106

this.batchSize = config.getInteger("batch.size", 100);

107

this.flushInterval = config.getLong("flush.interval", 5000L);

108

this.compressionEnabled = config.getBoolean("compression.enabled", true);

109

110

// Validation

111

if (batchSize <= 0) {

112

throw new IllegalArgumentException("batch.size must be positive");

113

}

114

115

initializeReporter();

116

}

117

}

118

```

119

120

### CharacterFilter Interface

121

122

Function interface for filtering and transforming strings, commonly used for metric name normalization across different backends.

123

124

```java { .api }

125

/**

126

* Interface for a character filter function. The filter function is given

127

* a string which the filter can transform. The returned string is the

128

* transformation result.

129

*/

130

public interface CharacterFilter {

131

132

/** No-operation filter that returns input unchanged. */

133

CharacterFilter NO_OP_FILTER = input -> input;

134

135

/**

136

* Filter the given string and generate a resulting string from it.

137

* For example, one implementation could filter out invalid characters

138

* from the input string.

139

* @param input Input string

140

* @return Filtered result string

141

*/

142

String filterCharacters(String input);

143

}

144

```

145

146

**Usage Examples:**

147

148

```java

149

// Common character filters

150

CharacterFilter noOp = CharacterFilter.NO_OP_FILTER;

151

CharacterFilter dotToUnderscore = input -> input.replace('.', '_');

152

CharacterFilter spacesToDashes = input -> input.replace(' ', '-');

153

CharacterFilter alphanumericOnly = input -> input.replaceAll("[^a-zA-Z0-9]", "");

154

155

// Combining filters

156

CharacterFilter combined = input -> {

157

String result = input.toLowerCase(); // lowercase

158

result = result.replace(' ', '-'); // spaces to dashes

159

result = result.replaceAll("[^a-z0-9-]", ""); // alphanumeric + dashes only

160

return result;

161

};

162

163

// Usage with metric identifiers

164

MetricGroup group = getRootGroup().addGroup("My Operator");

165

String metricName = "Records Processed Per Second";

166

167

String defaultId = group.getMetricIdentifier(metricName);

168

// Result: "My Operator.Records Processed Per Second"

169

170

String filteredId = group.getMetricIdentifier(metricName, combined);

171

// Result: "my-operator.records-processed-per-second"

172

173

// Custom filters for different backends

174

public class PrometheusCharacterFilter implements CharacterFilter {

175

@Override

176

public String filterCharacters(String input) {

177

// Prometheus naming conventions

178

return input.toLowerCase()

179

.replaceAll("[^a-z0-9_]", "_") // Replace invalid chars with underscores

180

.replaceAll("_{2,}", "_") // Collapse multiple underscores

181

.replaceAll("^_|_$", ""); // Remove leading/trailing underscores

182

}

183

}

184

185

public class GraphiteCharacterFilter implements CharacterFilter {

186

@Override

187

public String filterCharacters(String input) {

188

// Graphite naming conventions

189

return input.replace(' ', '_')

190

.replace(':', '_')

191

.replaceAll("[^a-zA-Z0-9._-]", "");

192

}

193

}

194

195

// Use in reporters

196

public class GraphiteReporter implements MetricReporter {

197

private final CharacterFilter filter = new GraphiteCharacterFilter();

198

199

@Override

200

public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {

201

String graphiteMetricName = group.getMetricIdentifier(metricName, filter);

202

registerWithGraphite(graphiteMetricName, metric);

203

}

204

}

205

```

206

207

### View Interface

208

209

Interface for metrics that require periodic background updates, enabling time-windowed calculations and derived metrics.

210

211

```java { .api }

212

/**

213

* An interface for metrics which should be updated in regular intervals

214

* by a background thread.

215

*/

216

public interface View {

217

/** The interval in which metrics are updated (5 seconds). */

218

int UPDATE_INTERVAL_SECONDS = 5;

219

220

/** This method will be called regularly to update the metric. */

221

void update();

222

}

223

```

224

225

**Usage Examples:**

226

227

```java

228

// Custom view implementation for rate calculation

229

public class CustomRateGauge implements Gauge<Double>, View {

230

private final AtomicLong counter = new AtomicLong(0);

231

private volatile long lastCount = 0;

232

private volatile double currentRate = 0.0;

233

234

public void increment() {

235

counter.incrementAndGet();

236

}

237

238

@Override

239

public Double getValue() {

240

return currentRate;

241

}

242

243

@Override

244

public void update() {

245

long currentCount = counter.get();

246

long deltaCount = currentCount - lastCount;

247

248

// Calculate rate per second over the update interval

249

currentRate = (double) deltaCount / UPDATE_INTERVAL_SECONDS;

250

251

lastCount = currentCount;

252

}

253

}

254

255

// Moving average implementation

256

public class MovingAverageView implements Gauge<Double>, View {

257

private final Queue<Double> samples = new LinkedList<>();

258

private final int windowSize;

259

private volatile double currentAverage = 0.0;

260

261

public MovingAverageView(int windowSize) {

262

this.windowSize = windowSize;

263

}

264

265

public synchronized void addSample(double value) {

266

samples.offer(value);

267

if (samples.size() > windowSize) {

268

samples.poll();

269

}

270

}

271

272

@Override

273

public Double getValue() {

274

return currentAverage;

275

}

276

277

@Override

278

public synchronized void update() {

279

if (!samples.isEmpty()) {

280

currentAverage = samples.stream()

281

.mapToDouble(Double::doubleValue)

282

.average()

283

.orElse(0.0);

284

}

285

}

286

}

287

288

// System resource monitoring

289

public class SystemResourceView implements Gauge<Map<String, Object>>, View {

290

private volatile Map<String, Object> resourceInfo = new HashMap<>();

291

292

@Override

293

public Map<String, Object> getValue() {

294

return new HashMap<>(resourceInfo);

295

}

296

297

@Override

298

public void update() {

299

Runtime runtime = Runtime.getRuntime();

300

Map<String, Object> newInfo = new HashMap<>();

301

302

// Memory information

303

long totalMemory = runtime.totalMemory();

304

long freeMemory = runtime.freeMemory();

305

long usedMemory = totalMemory - freeMemory;

306

307

newInfo.put("memory.total", totalMemory);

308

newInfo.put("memory.free", freeMemory);

309

newInfo.put("memory.used", usedMemory);

310

newInfo.put("memory.usage.ratio", (double) usedMemory / totalMemory);

311

312

// CPU information (simplified)

313

newInfo.put("processors", runtime.availableProcessors());

314

315

// Update atomically

316

resourceInfo = newInfo;

317

}

318

}

319

320

// Register views for automatic updates

321

CustomRateGauge processingRate = new CustomRateGauge();

322

metricGroup.gauge("processing-rate", processingRate);

323

// Flink will automatically call update() every 5 seconds

324

325

MovingAverageView avgLatency = new MovingAverageView(20);

326

metricGroup.gauge("average-latency", avgLatency);

327

// Background thread updates the moving average

328

329

SystemResourceView systemMetrics = new SystemResourceView();

330

metricGroup.gauge("system-resources", systemMetrics);

331

// Periodically updates system resource information

332

```

333

334

### MetricType Enumeration

335

336

Enumeration defining the standard metric types supported by the Flink metrics system.

337

338

```java { .api }

339

/**

340

* Enum describing the different metric types.

341

*/

342

public enum MetricType {

343

COUNTER,

344

METER,

345

GAUGE,

346

HISTOGRAM

347

}

348

```

349

350

**Usage Examples:**

351

352

```java

353

// Type checking and handling

354

public void handleMetric(Metric metric) {

355

MetricType type = metric.getMetricType();

356

357

switch (type) {

358

case COUNTER:

359

Counter counter = (Counter) metric;

360

System.out.println("Counter value: " + counter.getCount());

361

break;

362

363

case GAUGE:

364

Gauge<?> gauge = (Gauge<?>) metric;

365

System.out.println("Gauge value: " + gauge.getValue());

366

break;

367

368

case METER:

369

Meter meter = (Meter) metric;

370

System.out.println("Meter rate: " + meter.getRate() + " events/sec");

371

System.out.println("Meter count: " + meter.getCount());

372

break;

373

374

case HISTOGRAM:

375

Histogram histogram = (Histogram) metric;

376

HistogramStatistics stats = histogram.getStatistics();

377

System.out.println("Histogram count: " + histogram.getCount());

378

System.out.println("Histogram mean: " + stats.getMean());

379

break;

380

}

381

}

382

383

// Metric type-specific processing in reporters

384

public class TypeAwareReporter implements MetricReporter {

385

private final Map<MetricType, MetricHandler> handlers = new EnumMap<>(MetricType.class);

386

387

public TypeAwareReporter() {

388

handlers.put(MetricType.COUNTER, this::handleCounter);

389

handlers.put(MetricType.GAUGE, this::handleGauge);

390

handlers.put(MetricType.METER, this::handleMeter);

391

handlers.put(MetricType.HISTOGRAM, this::handleHistogram);

392

}

393

394

@Override

395

public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {

396

MetricType type = metric.getMetricType();

397

MetricHandler handler = handlers.get(type);

398

399

if (handler != null) {

400

String identifier = group.getMetricIdentifier(metricName);

401

handler.handle(identifier, metric);

402

}

403

}

404

405

private void handleCounter(String name, Metric metric) {

406

Counter counter = (Counter) metric;

407

// Register counter with monitoring system

408

}

409

410

private void handleGauge(String name, Metric metric) {

411

Gauge<?> gauge = (Gauge<?>) metric;

412

// Register gauge with monitoring system

413

}

414

415

private void handleMeter(String name, Metric metric) {

416

Meter meter = (Meter) metric;

417

// Register meter with monitoring system, possibly as multiple metrics

418

}

419

420

private void handleHistogram(String name, Metric metric) {

421

Histogram histogram = (Histogram) metric;

422

// Register histogram with monitoring system, possibly as multiple metrics

423

}

424

425

@FunctionalInterface

426

private interface MetricHandler {

427

void handle(String name, Metric metric);

428

}

429

}

430

431

// Type validation

432

public class MetricValidator {

433

public static void validateMetricType(Metric metric, MetricType expectedType) {

434

MetricType actualType = metric.getMetricType();

435

if (actualType != expectedType) {

436

throw new IllegalArgumentException(

437

String.format("Expected metric type %s but got %s", expectedType, actualType));

438

}

439

}

440

441

public static boolean isCounterType(Metric metric) {

442

return metric.getMetricType() == MetricType.COUNTER;

443

}

444

445

public static boolean isNumericType(Metric metric) {

446

MetricType type = metric.getMetricType();

447

return type == MetricType.COUNTER || type == MetricType.METER || type == MetricType.HISTOGRAM;

448

}

449

}

450

```