0
# Metrics and Monitoring
1
2
The MetricsClient and MonitorClient provide comprehensive metrics querying, system service monitoring, and performance data collection. Monitor system health, query time series data, and collect performance metrics from CDAP applications and services.
3
4
## MetricsClient
5
6
```java { .api }
7
public class MetricsClient {
8
// Constructors
9
public MetricsClient(ClientConfig config);
10
public MetricsClient(ClientConfig config, RESTClient restClient);
11
12
// Metrics search methods
13
public List<MetricTagValue> searchTags(Map<String, String> tags);
14
public List<String> searchMetrics(Map<String, String> tags);
15
16
// Metrics query methods
17
public MetricQueryResult query(Map<String, String> tags, String metric);
18
public MetricQueryResult query(Map<String, String> tags, List<String> metrics, List<String> groupBys, String start, String end);
19
public MetricQueryResult query(Map<String, String> tags, List<String> metrics, List<String> groupBys, Map<String, String> timeRangeParams);
20
public RuntimeMetrics getServiceMetrics(ServiceId serviceId);
21
}
22
```
23
24
## MonitorClient
25
26
```java { .api }
27
public class MonitorClient {
28
// Constructors
29
public MonitorClient(ClientConfig config);
30
public MonitorClient(ClientConfig config, RESTClient restClient);
31
32
// System service information methods
33
public SystemServiceLiveInfo getSystemServiceLiveInfo(String serviceName);
34
public List<SystemServiceMeta> listSystemServices();
35
public String getSystemServiceStatus(String serviceName);
36
public Map<String, String> getAllSystemServiceStatus();
37
public boolean allSystemServicesOk();
38
39
// System service management methods
40
public void setSystemServiceInstances(String serviceName, int instances);
41
public int getSystemServiceInstances(String serviceName);
42
}
43
```
44
45
## Metrics Types and Results
46
47
```java { .api }
48
public class MetricQueryResult {
49
public String getStartTime();
50
public String getEndTime();
51
public List<MetricTimeSeries> getSeries();
52
public String getResolution();
53
}
54
55
public class MetricTimeSeries {
56
public String getMetricName();
57
public Map<String, String> getGrouping();
58
public List<TimeValue> getData();
59
}
60
61
public class TimeValue {
62
public long getTime();
63
public long getValue();
64
}
65
66
public class MetricTagValue {
67
public String getName();
68
public String getValue();
69
}
70
71
public class RuntimeMetrics {
72
public int getCurrentInstances();
73
public int getRequestedInstances();
74
public Map<String, String> getMetrics();
75
}
76
```
77
78
## System Service Types
79
80
```java { .api }
81
public class SystemServiceMeta {
82
public String getName();
83
public String getDescription();
84
public String getStatus();
85
public String getLogs();
86
public int getInstances();
87
public int getRequestedInstances();
88
public boolean isCanCheck();
89
}
90
91
public class SystemServiceLiveInfo {
92
public String getStatus();
93
public Map<String, Integer> getContainers();
94
public String getYarnAppId();
95
}
96
```
97
98
## Metrics Discovery and Search
99
100
### Tag Discovery
101
102
```java
103
// Search for available metric tags
104
Map<String, String> searchCriteria = Map.of(
105
"namespace", "default",
106
"app", "data-pipeline"
107
);
108
109
List<MetricTagValue> tags = metricsClient.searchTags(searchCriteria);
110
System.out.println("Available metric tags:");
111
112
// Group tags by name
113
Map<String, List<String>> tagsByName = tags.stream()
114
.collect(Collectors.groupingBy(
115
MetricTagValue::getName,
116
Collectors.mapping(MetricTagValue::getValue, Collectors.toList())
117
));
118
119
for (Map.Entry<String, List<String>> entry : tagsByName.entrySet()) {
120
System.out.println("- " + entry.getKey() + ": " + entry.getValue());
121
}
122
123
// Search for program-specific tags
124
Map<String, String> programTags = Map.of(
125
"namespace", "default",
126
"app", "analytics-app",
127
"program", "data-processor",
128
"program.type", "worker"
129
);
130
List<MetricTagValue> programMetricTags = metricsClient.searchTags(programTags);
131
```
132
133
### Metric Discovery
134
135
```java
136
// Search for available metrics
137
List<String> metrics = metricsClient.searchMetrics(searchCriteria);
138
System.out.println("Available metrics: " + metrics);
139
140
// Common CDAP metrics patterns
141
List<String> performanceMetrics = metrics.stream()
142
.filter(name -> name.contains("process.") || name.contains("system."))
143
.collect(Collectors.toList());
144
145
List<String> userMetrics = metrics.stream()
146
.filter(name -> name.startsWith("user."))
147
.collect(Collectors.toList());
148
149
System.out.println("Performance metrics: " + performanceMetrics);
150
System.out.println("User metrics: " + userMetrics);
151
```
152
153
## Metrics Querying
154
155
### Basic Metrics Queries
156
157
```java
158
// Query single metric
159
Map<String, String> tags = Map.of(
160
"namespace", "default",
161
"app", "web-analytics",
162
"program", "stream-processor",
163
"program.type", "workflow"
164
);
165
166
String metricName = "system.process.memory.used.mb";
167
MetricQueryResult result = metricsClient.query(tags, metricName);
168
169
System.out.println("Query period: " + result.getStartTime() + " to " + result.getEndTime());
170
System.out.println("Resolution: " + result.getResolution());
171
172
for (MetricTimeSeries series : result.getSeries()) {
173
System.out.println("Metric: " + series.getMetricName());
174
System.out.println("Grouping: " + series.getGrouping());
175
System.out.println("Data points: " + series.getData().size());
176
177
// Show recent values
178
List<TimeValue> data = series.getData();
179
if (!data.isEmpty()) {
180
TimeValue latest = data.get(data.size() - 1);
181
System.out.println("Latest value: " + latest.getValue() + " MB at " + new Date(latest.getTime() * 1000));
182
}
183
}
184
```
185
186
### Advanced Metrics Queries
187
188
```java
189
// Query multiple metrics with time range
190
List<String> multipleMetrics = List.of(
191
"system.process.memory.used.mb",
192
"system.process.cpu.percentage",
193
"user.records.processed",
194
"user.processing.time.ms"
195
);
196
197
List<String> groupBy = List.of("program", "program.type");
198
199
// Time range - last 24 hours
200
long endTime = System.currentTimeMillis() / 1000;
201
long startTime = endTime - 24 * 3600; // 24 hours ago
202
203
MetricQueryResult multiResult = metricsClient.query(
204
tags,
205
multipleMetrics,
206
groupBy,
207
String.valueOf(startTime),
208
String.valueOf(endTime)
209
);
210
211
// Process results by metric
212
Map<String, MetricTimeSeries> seriesByMetric = multiResult.getSeries().stream()
213
.collect(Collectors.toMap(MetricTimeSeries::getMetricName, series -> series));
214
215
for (String metric : multipleMetrics) {
216
MetricTimeSeries series = seriesByMetric.get(metric);
217
if (series != null) {
218
analyzeMetricTrend(metric, series);
219
}
220
}
221
222
// Query with custom time parameters
223
Map<String, String> timeParams = Map.of(
224
"start", String.valueOf(startTime),
225
"end", String.valueOf(endTime),
226
"resolution", "1h", // 1 hour resolution
227
"aggregate", "true"
228
);
229
230
MetricQueryResult customResult = metricsClient.query(tags, multipleMetrics, groupBy, timeParams);
231
```
232
233
### Service-Specific Metrics
234
235
```java
236
// Get runtime metrics for a service
237
ServiceId serviceId = ServiceId.of(appId, "user-service");
238
RuntimeMetrics runtimeMetrics = metricsClient.getServiceMetrics(serviceId);
239
240
System.out.println("Current instances: " + runtimeMetrics.getCurrentInstances());
241
System.out.println("Requested instances: " + runtimeMetrics.getRequestedInstances());
242
243
Map<String, String> metrics = runtimeMetrics.getMetrics();
244
for (Map.Entry<String, String> entry : metrics.entrySet()) {
245
System.out.println(entry.getKey() + ": " + entry.getValue());
246
}
247
248
// Check if scaling is needed
249
if (runtimeMetrics.getCurrentInstances() != runtimeMetrics.getRequestedInstances()) {
250
System.out.println("Service is scaling...");
251
}
252
```
253
254
## System Service Monitoring
255
256
### System Service Status
257
258
```java
259
// Check all system services
260
boolean allOk = monitorClient.allSystemServicesOk();
261
System.out.println("All system services OK: " + allOk);
262
263
// Get status of all services
264
Map<String, String> allStatuses = monitorClient.getAllSystemServiceStatus();
265
System.out.println("System service statuses:");
266
267
for (Map.Entry<String, String> entry : allStatuses.entrySet()) {
268
String serviceName = entry.getKey();
269
String status = entry.getValue();
270
System.out.println("- " + serviceName + ": " + status);
271
272
if (!"OK".equals(status)) {
273
System.err.println(" WARNING: Service " + serviceName + " is not OK!");
274
}
275
}
276
277
// Check specific service
278
String datasetServiceStatus = monitorClient.getSystemServiceStatus("dataset.service");
279
System.out.println("Dataset service status: " + datasetServiceStatus);
280
```
281
282
### System Service Information
283
284
```java
285
// List all system services
286
List<SystemServiceMeta> services = monitorClient.listSystemServices();
287
System.out.println("System services (" + services.size() + "):");
288
289
for (SystemServiceMeta service : services) {
290
System.out.println("Service: " + service.getName());
291
System.out.println(" Description: " + service.getDescription());
292
System.out.println(" Status: " + service.getStatus());
293
System.out.println(" Instances: " + service.getInstances() + "/" + service.getRequestedInstances());
294
System.out.println(" Can check: " + service.isCanCheck());
295
296
if (service.getLogs() != null && !service.getLogs().isEmpty()) {
297
System.out.println(" Logs available: " + service.getLogs().length() + " characters");
298
}
299
}
300
```
301
302
### System Service Live Information
303
304
```java
305
// Get live information for specific services
306
String[] criticalServices = {"dataset.service", "transaction.service", "metadata.service"};
307
308
for (String serviceName : criticalServices) {
309
try {
310
SystemServiceLiveInfo liveInfo = monitorClient.getSystemServiceLiveInfo(serviceName);
311
312
System.out.println("Live info for " + serviceName + ":");
313
System.out.println(" Status: " + liveInfo.getStatus());
314
System.out.println(" YARN App ID: " + liveInfo.getYarnAppId());
315
System.out.println(" Containers: " + liveInfo.getContainers());
316
317
// Analyze container distribution
318
Map<String, Integer> containers = liveInfo.getContainers();
319
int totalContainers = containers.values().stream().mapToInt(Integer::intValue).sum();
320
System.out.println(" Total containers: " + totalContainers);
321
322
} catch (Exception e) {
323
System.err.println("Error getting live info for " + serviceName + ": " + e.getMessage());
324
}
325
}
326
```
327
328
## System Service Management
329
330
### Instance Management
331
332
```java
333
// Get current instance count
334
String serviceName = "metadata.service";
335
int currentInstances = monitorClient.getSystemServiceInstances(serviceName);
336
System.out.println("Current instances of " + serviceName + ": " + currentInstances);
337
338
// Scale service instances
339
int newInstanceCount = 3;
340
try {
341
monitorClient.setSystemServiceInstances(serviceName, newInstanceCount);
342
System.out.println("Scaled " + serviceName + " to " + newInstanceCount + " instances");
343
344
// Wait and verify scaling
345
Thread.sleep(30000); // Wait 30 seconds
346
347
int actualInstances = monitorClient.getSystemServiceInstances(serviceName);
348
if (actualInstances == newInstanceCount) {
349
System.out.println("Scaling completed successfully");
350
} else {
351
System.out.println("Scaling in progress: " + actualInstances + "/" + newInstanceCount);
352
}
353
354
} catch (Exception e) {
355
System.err.println("Error scaling service: " + e.getMessage());
356
}
357
```
358
359
## Advanced Monitoring and Analytics
360
361
### Comprehensive System Health Check
362
363
```java
364
public class SystemHealthChecker {
365
private final MonitorClient monitorClient;
366
private final MetricsClient metricsClient;
367
368
public SystemHealthChecker(MonitorClient monitorClient, MetricsClient metricsClient) {
369
this.monitorClient = monitorClient;
370
this.metricsClient = metricsClient;
371
}
372
373
public SystemHealthReport checkSystemHealth() {
374
SystemHealthReport.Builder reportBuilder = SystemHealthReport.builder();
375
376
try {
377
// Check overall system status
378
boolean allServicesOk = monitorClient.allSystemServicesOk();
379
reportBuilder.allServicesHealthy(allServicesOk);
380
381
// Get detailed service status
382
Map<String, String> serviceStatuses = monitorClient.getAllSystemServiceStatus();
383
reportBuilder.serviceStatuses(serviceStatuses);
384
385
// Identify unhealthy services
386
List<String> unhealthyServices = serviceStatuses.entrySet().stream()
387
.filter(entry -> !"OK".equals(entry.getValue()))
388
.map(Map.Entry::getKey)
389
.collect(Collectors.toList());
390
reportBuilder.unhealthyServices(unhealthyServices);
391
392
// Get system resource metrics
393
Map<String, String> systemTags = Map.of("component", "system");
394
List<String> resourceMetrics = List.of(
395
"system.total.memory.mb",
396
"system.available.memory.mb",
397
"system.cpu.percentage",
398
"system.disk.used.percentage"
399
);
400
401
try {
402
MetricQueryResult systemMetrics = metricsClient.query(systemTags, resourceMetrics, List.of(),
403
String.valueOf(System.currentTimeMillis() / 1000 - 300), // Last 5 minutes
404
String.valueOf(System.currentTimeMillis() / 1000)
405
);
406
reportBuilder.systemMetrics(systemMetrics);
407
} catch (Exception e) {
408
reportBuilder.metricsError("Failed to retrieve system metrics: " + e.getMessage());
409
}
410
411
} catch (Exception e) {
412
reportBuilder.error("System health check failed: " + e.getMessage());
413
}
414
415
return reportBuilder.build();
416
}
417
418
public void monitorSystemHealth(long intervalMs, HealthReportCallback callback) {
419
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
420
421
executor.scheduleAtFixedRate(() -> {
422
try {
423
SystemHealthReport report = checkSystemHealth();
424
callback.onHealthReport(report);
425
} catch (Exception e) {
426
callback.onError(e);
427
}
428
}, 0, intervalMs, TimeUnit.MILLISECONDS);
429
}
430
431
@FunctionalInterface
432
public interface HealthReportCallback {
433
void onHealthReport(SystemHealthReport report);
434
435
default void onError(Exception e) {
436
System.err.println("Health monitoring error: " + e.getMessage());
437
}
438
}
439
}
440
441
// Health report data structure
442
public class SystemHealthReport {
443
private final boolean allServicesHealthy;
444
private final Map<String, String> serviceStatuses;
445
private final List<String> unhealthyServices;
446
private final MetricQueryResult systemMetrics;
447
private final String metricsError;
448
private final String error;
449
private final long timestamp;
450
451
// Constructor, getters, and builder
452
}
453
```
454
455
### Performance Analysis
456
457
```java
458
public void analyzeMetricTrend(String metricName, MetricTimeSeries series) {
459
List<TimeValue> data = series.getData();
460
if (data.size() < 2) {
461
System.out.println("Insufficient data for trend analysis: " + metricName);
462
return;
463
}
464
465
// Calculate basic statistics
466
DoubleSummaryStatistics stats = data.stream()
467
.mapToDouble(TimeValue::getValue)
468
.summaryStatistics();
469
470
System.out.println("Metric: " + metricName);
471
System.out.println(" Data points: " + stats.getCount());
472
System.out.println(" Min: " + stats.getMin());
473
System.out.println(" Max: " + stats.getMax());
474
System.out.println(" Average: " + String.format("%.2f", stats.getAverage()));
475
476
// Calculate trend
477
double firstValue = data.get(0).getValue();
478
double lastValue = data.get(data.size() - 1).getValue();
479
double percentChange = ((lastValue - firstValue) / firstValue) * 100;
480
481
System.out.println(" Trend: " + String.format("%.2f", percentChange) + "%");
482
483
// Detect anomalies (values > 2 standard deviations from mean)
484
double stdDev = calculateStandardDeviation(data, stats.getAverage());
485
long anomalies = data.stream()
486
.mapToDouble(TimeValue::getValue)
487
.filter(value -> Math.abs(value - stats.getAverage()) > 2 * stdDev)
488
.count();
489
490
if (anomalies > 0) {
491
System.out.println(" Anomalies detected: " + anomalies + " data points");
492
}
493
}
494
495
private double calculateStandardDeviation(List<TimeValue> data, double mean) {
496
double variance = data.stream()
497
.mapToDouble(TimeValue::getValue)
498
.map(value -> Math.pow(value - mean, 2))
499
.average()
500
.orElse(0.0);
501
return Math.sqrt(variance);
502
}
503
```
504
505
### Custom Metrics Dashboard
506
507
```java
508
public class MetricsDashboard {
509
private final MetricsClient metricsClient;
510
511
public MetricsDashboard(MetricsClient metricsClient) {
512
this.metricsClient = metricsClient;
513
}
514
515
public void displayApplicationMetrics(ApplicationId appId) {
516
Map<String, String> appTags = Map.of(
517
"namespace", appId.getNamespace().getId(),
518
"app", appId.getApplication()
519
);
520
521
// Key metrics to monitor
522
List<String> keyMetrics = List.of(
523
"system.process.memory.used.mb",
524
"system.process.cpu.percentage",
525
"user.records.in",
526
"user.records.out",
527
"user.errors.total"
528
);
529
530
System.out.println("=== Application Metrics Dashboard ===");
531
System.out.println("Application: " + appId.getApplication());
532
System.out.println("Namespace: " + appId.getNamespace().getId());
533
System.out.println("Timestamp: " + new Date());
534
System.out.println();
535
536
for (String metric : keyMetrics) {
537
try {
538
MetricQueryResult result = metricsClient.query(appTags, metric);
539
displayMetricSummary(metric, result);
540
} catch (Exception e) {
541
System.out.println(metric + ": Error retrieving data - " + e.getMessage());
542
}
543
}
544
545
System.out.println("========================================");
546
}
547
548
private void displayMetricSummary(String metricName, MetricQueryResult result) {
549
if (result.getSeries().isEmpty()) {
550
System.out.println(metricName + ": No data available");
551
return;
552
}
553
554
MetricTimeSeries series = result.getSeries().get(0);
555
List<TimeValue> data = series.getData();
556
557
if (data.isEmpty()) {
558
System.out.println(metricName + ": No data points");
559
return;
560
}
561
562
TimeValue latest = data.get(data.size() - 1);
563
DoubleSummaryStatistics stats = data.stream()
564
.mapToDouble(TimeValue::getValue)
565
.summaryStatistics();
566
567
System.out.printf("%s: Current=%.2f, Avg=%.2f, Max=%.2f%n",
568
metricName, (double)latest.getValue(), stats.getAverage(), stats.getMax());
569
}
570
}
571
```
572
573
## Error Handling
574
575
Metrics and monitoring operations may throw these exceptions:
576
577
- **MetricNotFoundException**: Requested metric does not exist
578
- **InvalidMetricException**: Invalid metric query parameters
579
- **SystemServiceNotFoundException**: System service does not exist
580
- **UnauthenticatedException**: Authentication required
581
- **UnauthorizedException**: Insufficient permissions
582
583
```java
584
try {
585
MetricQueryResult result = metricsClient.query(tags, metricName);
586
System.out.println("Query successful, " + result.getSeries().size() + " series returned");
587
} catch (MetricNotFoundException e) {
588
System.err.println("Metric not found: " + metricName);
589
} catch (InvalidMetricException e) {
590
System.err.println("Invalid metric query: " + e.getMessage());
591
} catch (IOException e) {
592
System.err.println("Network error: " + e.getMessage());
593
}
594
```
595
596
## Best Practices
597
598
1. **Regular Monitoring**: Implement continuous system health monitoring
599
2. **Metric Selection**: Focus on key performance indicators and business metrics
600
3. **Alerting**: Set up alerts for critical metrics and system services
601
4. **Trend Analysis**: Track metrics over time to identify patterns and anomalies
602
5. **Resource Planning**: Use metrics for capacity planning and scaling decisions
603
6. **Performance Optimization**: Use metrics to identify and resolve performance bottlenecks
604
605
```java
606
// Good: Comprehensive monitoring with alerting and trend analysis
607
public class ProductionMonitor {
608
private final MetricsClient metricsClient;
609
private final MonitorClient monitorClient;
610
private final AlertingService alertingService;
611
612
public ProductionMonitor(MetricsClient metricsClient, MonitorClient monitorClient, AlertingService alertingService) {
613
this.metricsClient = metricsClient;
614
this.monitorClient = monitorClient;
615
this.alertingService = alertingService;
616
}
617
618
public void startMonitoring() {
619
// System health monitoring
620
ScheduledExecutorService systemMonitor = Executors.newScheduledThreadPool(2);
621
622
// Check system services every minute
623
systemMonitor.scheduleAtFixedRate(() -> {
624
try {
625
if (!monitorClient.allSystemServicesOk()) {
626
Map<String, String> statuses = monitorClient.getAllSystemServiceStatus();
627
alertingService.sendSystemAlert("System services unhealthy", statuses);
628
}
629
} catch (Exception e) {
630
alertingService.sendSystemAlert("System monitoring failed", e.getMessage());
631
}
632
}, 0, 60, TimeUnit.SECONDS);
633
634
// Check application metrics every 5 minutes
635
systemMonitor.scheduleAtFixedRate(() -> {
636
checkApplicationMetrics();
637
}, 0, 300, TimeUnit.SECONDS);
638
}
639
640
private void checkApplicationMetrics() {
641
// Implementation for application-specific metric monitoring
642
// Check memory usage, error rates, throughput, etc.
643
// Send alerts if thresholds are exceeded
644
}
645
}
646
```