0
# Configuration and Utilities
1
2
Configuration management and utility classes for metric system setup. Provides type-safe property access, character filtering for metric names, and supporting interfaces for metric system operation.
3
4
## Capabilities
5
6
### MetricConfig Class
7
8
Type-safe configuration class extending Properties with utility methods for extracting primitive values with defaults.
9
10
```java { .api }
11
/**
12
* A properties class with added utility method to extract primitives.
13
*/
14
public class MetricConfig extends Properties {
15
16
/**
17
* Gets string property with default value.
18
* @param key the property key
19
* @param defaultValue default value if key not found
20
* @return property value or default
21
*/
22
public String getString(String key, String defaultValue) { /* implementation */ }
23
24
/**
25
* Gets integer property with default value.
26
* @param key the property key
27
* @param defaultValue default value if key not found
28
* @return property value parsed as int or default
29
*/
30
public int getInteger(String key, int defaultValue) { /* implementation */ }
31
32
/**
33
* Gets long property with default value.
34
* @param key the property key
35
* @param defaultValue default value if key not found
36
* @return property value parsed as long or default
37
*/
38
public long getLong(String key, long defaultValue) { /* implementation */ }
39
40
/**
41
* Gets float property with default value.
42
* @param key the property key
43
* @param defaultValue default value if key not found
44
* @return property value parsed as float or default
45
*/
46
public float getFloat(String key, float defaultValue) { /* implementation */ }
47
48
/**
49
* Gets double property with default value.
50
* @param key the property key
51
* @param defaultValue default value if key not found
52
* @return property value parsed as double or default
53
*/
54
public double getDouble(String key, double defaultValue) { /* implementation */ }
55
56
/**
57
* Gets boolean property with default value.
58
* @param key the property key
59
* @param defaultValue default value if key not found
60
* @return property value parsed as boolean or default
61
*/
62
public boolean getBoolean(String key, boolean defaultValue) { /* implementation */ }
63
}
64
```
65
66
**Usage Examples:**
67
68
```java
69
// Creating and using MetricConfig
70
MetricConfig config = new MetricConfig();
71
72
// Set properties (inherited from Properties)
73
config.setProperty("reporter.host", "metrics.example.com");
74
config.setProperty("reporter.port", "8080");
75
config.setProperty("reporter.enabled", "true");
76
config.setProperty("reporter.batch.size", "100");
77
config.setProperty("reporter.timeout", "30.5");
78
79
// Type-safe property retrieval with defaults
80
String host = config.getString("reporter.host", "localhost");
81
int port = config.getInteger("reporter.port", 9090);
82
boolean enabled = config.getBoolean("reporter.enabled", false);
83
int batchSize = config.getInteger("reporter.batch.size", 50);
84
double timeout = config.getDouble("reporter.timeout", 10.0);
85
86
// Missing keys return defaults
87
String missingKey = config.getString("non.existent", "default-value");
88
int missingInt = config.getInteger("missing.int", 42);
89
90
// Use in reporter configuration
91
public class ConfigurableReporter implements MetricReporter {
92
private String endpoint;
93
private int batchSize;
94
private long flushInterval;
95
private boolean compressionEnabled;
96
97
@Override
98
public void open(MetricConfig config) {
99
// Required configuration
100
this.endpoint = config.getString("endpoint", null);
101
if (endpoint == null) {
102
throw new IllegalArgumentException("endpoint is required");
103
}
104
105
// Optional configuration with sensible defaults
106
this.batchSize = config.getInteger("batch.size", 100);
107
this.flushInterval = config.getLong("flush.interval", 5000L);
108
this.compressionEnabled = config.getBoolean("compression.enabled", true);
109
110
// Validation
111
if (batchSize <= 0) {
112
throw new IllegalArgumentException("batch.size must be positive");
113
}
114
115
initializeReporter();
116
}
117
}
118
```
119
120
### CharacterFilter Interface
121
122
Function interface for filtering and transforming strings, commonly used for metric name normalization across different backends.
123
124
```java { .api }
125
/**
126
* Interface for a character filter function. The filter function is given
127
* a string which the filter can transform. The returned string is the
128
* transformation result.
129
*/
130
public interface CharacterFilter {
131
132
/** No-operation filter that returns input unchanged. */
133
CharacterFilter NO_OP_FILTER = input -> input;
134
135
/**
136
* Filter the given string and generate a resulting string from it.
137
* For example, one implementation could filter out invalid characters
138
* from the input string.
139
* @param input Input string
140
* @return Filtered result string
141
*/
142
String filterCharacters(String input);
143
}
144
```
145
146
**Usage Examples:**
147
148
```java
149
// Common character filters
150
CharacterFilter noOp = CharacterFilter.NO_OP_FILTER;
151
CharacterFilter dotToUnderscore = input -> input.replace('.', '_');
152
CharacterFilter spacesToDashes = input -> input.replace(' ', '-');
153
CharacterFilter alphanumericOnly = input -> input.replaceAll("[^a-zA-Z0-9]", "");
154
155
// Combining filters
156
CharacterFilter combined = input -> {
157
String result = input.toLowerCase(); // lowercase
158
result = result.replace(' ', '-'); // spaces to dashes
159
result = result.replaceAll("[^a-z0-9-]", ""); // alphanumeric + dashes only
160
return result;
161
};
162
163
// Usage with metric identifiers
164
MetricGroup group = getRootGroup().addGroup("My Operator");
165
String metricName = "Records Processed Per Second";
166
167
String defaultId = group.getMetricIdentifier(metricName);
168
// Result: "My Operator.Records Processed Per Second"
169
170
String filteredId = group.getMetricIdentifier(metricName, combined);
171
// Result: "my-operator.records-processed-per-second"
172
173
// Custom filters for different backends
174
public class PrometheusCharacterFilter implements CharacterFilter {
175
@Override
176
public String filterCharacters(String input) {
177
// Prometheus naming conventions
178
return input.toLowerCase()
179
.replaceAll("[^a-z0-9_]", "_") // Replace invalid chars with underscores
180
.replaceAll("_{2,}", "_") // Collapse multiple underscores
181
.replaceAll("^_|_$", ""); // Remove leading/trailing underscores
182
}
183
}
184
185
public class GraphiteCharacterFilter implements CharacterFilter {
186
@Override
187
public String filterCharacters(String input) {
188
// Graphite naming conventions
189
return input.replace(' ', '_')
190
.replace(':', '_')
191
.replaceAll("[^a-zA-Z0-9._-]", "");
192
}
193
}
194
195
// Use in reporters
196
public class GraphiteReporter implements MetricReporter {
197
private final CharacterFilter filter = new GraphiteCharacterFilter();
198
199
@Override
200
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
201
String graphiteMetricName = group.getMetricIdentifier(metricName, filter);
202
registerWithGraphite(graphiteMetricName, metric);
203
}
204
}
205
```
206
207
### View Interface
208
209
Interface for metrics that require periodic background updates, enabling time-windowed calculations and derived metrics.
210
211
```java { .api }
212
/**
213
* An interface for metrics which should be updated in regular intervals
214
* by a background thread.
215
*/
216
public interface View {
217
/** The interval in which metrics are updated (5 seconds). */
218
int UPDATE_INTERVAL_SECONDS = 5;
219
220
/** This method will be called regularly to update the metric. */
221
void update();
222
}
223
```
224
225
**Usage Examples:**
226
227
```java
228
// Custom view implementation for rate calculation
229
public class CustomRateGauge implements Gauge<Double>, View {
230
private final AtomicLong counter = new AtomicLong(0);
231
private volatile long lastCount = 0;
232
private volatile double currentRate = 0.0;
233
234
public void increment() {
235
counter.incrementAndGet();
236
}
237
238
@Override
239
public Double getValue() {
240
return currentRate;
241
}
242
243
@Override
244
public void update() {
245
long currentCount = counter.get();
246
long deltaCount = currentCount - lastCount;
247
248
// Calculate rate per second over the update interval
249
currentRate = (double) deltaCount / UPDATE_INTERVAL_SECONDS;
250
251
lastCount = currentCount;
252
}
253
}
254
255
// Moving average implementation
256
public class MovingAverageView implements Gauge<Double>, View {
257
private final Queue<Double> samples = new LinkedList<>();
258
private final int windowSize;
259
private volatile double currentAverage = 0.0;
260
261
public MovingAverageView(int windowSize) {
262
this.windowSize = windowSize;
263
}
264
265
public synchronized void addSample(double value) {
266
samples.offer(value);
267
if (samples.size() > windowSize) {
268
samples.poll();
269
}
270
}
271
272
@Override
273
public Double getValue() {
274
return currentAverage;
275
}
276
277
@Override
278
public synchronized void update() {
279
if (!samples.isEmpty()) {
280
currentAverage = samples.stream()
281
.mapToDouble(Double::doubleValue)
282
.average()
283
.orElse(0.0);
284
}
285
}
286
}
287
288
// System resource monitoring
289
public class SystemResourceView implements Gauge<Map<String, Object>>, View {
290
private volatile Map<String, Object> resourceInfo = new HashMap<>();
291
292
@Override
293
public Map<String, Object> getValue() {
294
return new HashMap<>(resourceInfo);
295
}
296
297
@Override
298
public void update() {
299
Runtime runtime = Runtime.getRuntime();
300
Map<String, Object> newInfo = new HashMap<>();
301
302
// Memory information
303
long totalMemory = runtime.totalMemory();
304
long freeMemory = runtime.freeMemory();
305
long usedMemory = totalMemory - freeMemory;
306
307
newInfo.put("memory.total", totalMemory);
308
newInfo.put("memory.free", freeMemory);
309
newInfo.put("memory.used", usedMemory);
310
newInfo.put("memory.usage.ratio", (double) usedMemory / totalMemory);
311
312
// CPU information (simplified)
313
newInfo.put("processors", runtime.availableProcessors());
314
315
// Update atomically
316
resourceInfo = newInfo;
317
}
318
}
319
320
// Register views for automatic updates
321
CustomRateGauge processingRate = new CustomRateGauge();
322
metricGroup.gauge("processing-rate", processingRate);
323
// Flink will automatically call update() every 5 seconds
324
325
MovingAverageView avgLatency = new MovingAverageView(20);
326
metricGroup.gauge("average-latency", avgLatency);
327
// Background thread updates the moving average
328
329
SystemResourceView systemMetrics = new SystemResourceView();
330
metricGroup.gauge("system-resources", systemMetrics);
331
// Periodically updates system resource information
332
```
333
334
### MetricType Enumeration
335
336
Enumeration defining the standard metric types supported by the Flink metrics system.
337
338
```java { .api }
339
/**
340
* Enum describing the different metric types.
341
*/
342
public enum MetricType {
343
COUNTER,
344
METER,
345
GAUGE,
346
HISTOGRAM
347
}
348
```
349
350
**Usage Examples:**
351
352
```java
353
// Type checking and handling
354
public void handleMetric(Metric metric) {
355
MetricType type = metric.getMetricType();
356
357
switch (type) {
358
case COUNTER:
359
Counter counter = (Counter) metric;
360
System.out.println("Counter value: " + counter.getCount());
361
break;
362
363
case GAUGE:
364
Gauge<?> gauge = (Gauge<?>) metric;
365
System.out.println("Gauge value: " + gauge.getValue());
366
break;
367
368
case METER:
369
Meter meter = (Meter) metric;
370
System.out.println("Meter rate: " + meter.getRate() + " events/sec");
371
System.out.println("Meter count: " + meter.getCount());
372
break;
373
374
case HISTOGRAM:
375
Histogram histogram = (Histogram) metric;
376
HistogramStatistics stats = histogram.getStatistics();
377
System.out.println("Histogram count: " + histogram.getCount());
378
System.out.println("Histogram mean: " + stats.getMean());
379
break;
380
}
381
}
382
383
// Metric type-specific processing in reporters
384
public class TypeAwareReporter implements MetricReporter {
385
private final Map<MetricType, MetricHandler> handlers = new EnumMap<>(MetricType.class);
386
387
public TypeAwareReporter() {
388
handlers.put(MetricType.COUNTER, this::handleCounter);
389
handlers.put(MetricType.GAUGE, this::handleGauge);
390
handlers.put(MetricType.METER, this::handleMeter);
391
handlers.put(MetricType.HISTOGRAM, this::handleHistogram);
392
}
393
394
@Override
395
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
396
MetricType type = metric.getMetricType();
397
MetricHandler handler = handlers.get(type);
398
399
if (handler != null) {
400
String identifier = group.getMetricIdentifier(metricName);
401
handler.handle(identifier, metric);
402
}
403
}
404
405
private void handleCounter(String name, Metric metric) {
406
Counter counter = (Counter) metric;
407
// Register counter with monitoring system
408
}
409
410
private void handleGauge(String name, Metric metric) {
411
Gauge<?> gauge = (Gauge<?>) metric;
412
// Register gauge with monitoring system
413
}
414
415
private void handleMeter(String name, Metric metric) {
416
Meter meter = (Meter) metric;
417
// Register meter with monitoring system, possibly as multiple metrics
418
}
419
420
private void handleHistogram(String name, Metric metric) {
421
Histogram histogram = (Histogram) metric;
422
// Register histogram with monitoring system, possibly as multiple metrics
423
}
424
425
@FunctionalInterface
426
private interface MetricHandler {
427
void handle(String name, Metric metric);
428
}
429
}
430
431
// Type validation
432
public class MetricValidator {
433
public static void validateMetricType(Metric metric, MetricType expectedType) {
434
MetricType actualType = metric.getMetricType();
435
if (actualType != expectedType) {
436
throw new IllegalArgumentException(
437
String.format("Expected metric type %s but got %s", expectedType, actualType));
438
}
439
}
440
441
public static boolean isCounterType(Metric metric) {
442
return metric.getMetricType() == MetricType.COUNTER;
443
}
444
445
public static boolean isNumericType(Metric metric) {
446
MetricType type = metric.getMetricType();
447
return type == MetricType.COUNTER || type == MetricType.METER || type == MetricType.HISTOGRAM;
448
}
449
}
450
```