CDAP Java Client library providing programmatic APIs for interacting with the CDAP platform
—
The MetricsClient and MonitorClient provide comprehensive metrics querying, system service monitoring, and performance data collection. Monitor system health, query time series data, and collect performance metrics from CDAP applications and services.
public class MetricsClient {
// Constructors
public MetricsClient(ClientConfig config);
public MetricsClient(ClientConfig config, RESTClient restClient);
// Metrics search methods
public List<MetricTagValue> searchTags(Map<String, String> tags);
public List<String> searchMetrics(Map<String, String> tags);
// Metrics query methods
public MetricQueryResult query(Map<String, String> tags, String metric);
public MetricQueryResult query(Map<String, String> tags, List<String> metrics, List<String> groupBys, String start, String end);
public MetricQueryResult query(Map<String, String> tags, List<String> metrics, List<String> groupBys, Map<String, String> timeRangeParams);
public RuntimeMetrics getServiceMetrics(ServiceId serviceId);
}public class MonitorClient {
// Constructors
public MonitorClient(ClientConfig config);
public MonitorClient(ClientConfig config, RESTClient restClient);
// System service information methods
public SystemServiceLiveInfo getSystemServiceLiveInfo(String serviceName);
public List<SystemServiceMeta> listSystemServices();
public String getSystemServiceStatus(String serviceName);
public Map<String, String> getAllSystemServiceStatus();
public boolean allSystemServicesOk();
// System service management methods
public void setSystemServiceInstances(String serviceName, int instances);
public int getSystemServiceInstances(String serviceName);
}public class MetricQueryResult {
public String getStartTime();
public String getEndTime();
public List<MetricTimeSeries> getSeries();
public String getResolution();
}
public class MetricTimeSeries {
public String getMetricName();
public Map<String, String> getGrouping();
public List<TimeValue> getData();
}
public class TimeValue {
public long getTime();
public long getValue();
}
public class MetricTagValue {
public String getName();
public String getValue();
}
public class RuntimeMetrics {
public int getCurrentInstances();
public int getRequestedInstances();
public Map<String, String> getMetrics();
}public class SystemServiceMeta {
public String getName();
public String getDescription();
public String getStatus();
public String getLogs();
public int getInstances();
public int getRequestedInstances();
public boolean isCanCheck();
}
public class SystemServiceLiveInfo {
public String getStatus();
public Map<String, Integer> getContainers();
public String getYarnAppId();
}// Search for available metric tags
Map<String, String> searchCriteria = Map.of(
"namespace", "default",
"app", "data-pipeline"
);
List<MetricTagValue> tags = metricsClient.searchTags(searchCriteria);
System.out.println("Available metric tags:");
// Group tags by name
Map<String, List<String>> tagsByName = tags.stream()
.collect(Collectors.groupingBy(
MetricTagValue::getName,
Collectors.mapping(MetricTagValue::getValue, Collectors.toList())
));
for (Map.Entry<String, List<String>> entry : tagsByName.entrySet()) {
System.out.println("- " + entry.getKey() + ": " + entry.getValue());
}
// Search for program-specific tags
Map<String, String> programTags = Map.of(
"namespace", "default",
"app", "analytics-app",
"program", "data-processor",
"program.type", "worker"
);
List<MetricTagValue> programMetricTags = metricsClient.searchTags(programTags);// Search for available metrics
List<String> metrics = metricsClient.searchMetrics(searchCriteria);
System.out.println("Available metrics: " + metrics);
// Common CDAP metrics patterns
List<String> performanceMetrics = metrics.stream()
.filter(name -> name.contains("process.") || name.contains("system."))
.collect(Collectors.toList());
List<String> userMetrics = metrics.stream()
.filter(name -> name.startsWith("user."))
.collect(Collectors.toList());
System.out.println("Performance metrics: " + performanceMetrics);
System.out.println("User metrics: " + userMetrics);// Query single metric
Map<String, String> tags = Map.of(
"namespace", "default",
"app", "web-analytics",
"program", "stream-processor",
"program.type", "workflow"
);
String metricName = "system.process.memory.used.mb";
MetricQueryResult result = metricsClient.query(tags, metricName);
System.out.println("Query period: " + result.getStartTime() + " to " + result.getEndTime());
System.out.println("Resolution: " + result.getResolution());
for (MetricTimeSeries series : result.getSeries()) {
System.out.println("Metric: " + series.getMetricName());
System.out.println("Grouping: " + series.getGrouping());
System.out.println("Data points: " + series.getData().size());
// Show recent values
List<TimeValue> data = series.getData();
if (!data.isEmpty()) {
TimeValue latest = data.get(data.size() - 1);
System.out.println("Latest value: " + latest.getValue() + " MB at " + new Date(latest.getTime() * 1000));
}
}// Query multiple metrics with time range
List<String> multipleMetrics = List.of(
"system.process.memory.used.mb",
"system.process.cpu.percentage",
"user.records.processed",
"user.processing.time.ms"
);
List<String> groupBy = List.of("program", "program.type");
// Time range - last 24 hours
long endTime = System.currentTimeMillis() / 1000;
long startTime = endTime - 24 * 3600; // 24 hours ago
MetricQueryResult multiResult = metricsClient.query(
tags,
multipleMetrics,
groupBy,
String.valueOf(startTime),
String.valueOf(endTime)
);
// Process results by metric
Map<String, MetricTimeSeries> seriesByMetric = multiResult.getSeries().stream()
.collect(Collectors.toMap(MetricTimeSeries::getMetricName, series -> series));
for (String metric : multipleMetrics) {
MetricTimeSeries series = seriesByMetric.get(metric);
if (series != null) {
analyzeMetricTrend(metric, series);
}
}
// Query with custom time parameters
Map<String, String> timeParams = Map.of(
"start", String.valueOf(startTime),
"end", String.valueOf(endTime),
"resolution", "1h", // 1 hour resolution
"aggregate", "true"
);
MetricQueryResult customResult = metricsClient.query(tags, multipleMetrics, groupBy, timeParams);// Get runtime metrics for a service
ServiceId serviceId = ServiceId.of(appId, "user-service");
RuntimeMetrics runtimeMetrics = metricsClient.getServiceMetrics(serviceId);
System.out.println("Current instances: " + runtimeMetrics.getCurrentInstances());
System.out.println("Requested instances: " + runtimeMetrics.getRequestedInstances());
Map<String, String> metrics = runtimeMetrics.getMetrics();
for (Map.Entry<String, String> entry : metrics.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue());
}
// Check if scaling is needed
if (runtimeMetrics.getCurrentInstances() != runtimeMetrics.getRequestedInstances()) {
System.out.println("Service is scaling...");
}// Check all system services
boolean allOk = monitorClient.allSystemServicesOk();
System.out.println("All system services OK: " + allOk);
// Get status of all services
Map<String, String> allStatuses = monitorClient.getAllSystemServiceStatus();
System.out.println("System service statuses:");
for (Map.Entry<String, String> entry : allStatuses.entrySet()) {
String serviceName = entry.getKey();
String status = entry.getValue();
System.out.println("- " + serviceName + ": " + status);
if (!"OK".equals(status)) {
System.err.println(" WARNING: Service " + serviceName + " is not OK!");
}
}
// Check specific service
String datasetServiceStatus = monitorClient.getSystemServiceStatus("dataset.service");
System.out.println("Dataset service status: " + datasetServiceStatus);// List all system services
List<SystemServiceMeta> services = monitorClient.listSystemServices();
System.out.println("System services (" + services.size() + "):");
for (SystemServiceMeta service : services) {
System.out.println("Service: " + service.getName());
System.out.println(" Description: " + service.getDescription());
System.out.println(" Status: " + service.getStatus());
System.out.println(" Instances: " + service.getInstances() + "/" + service.getRequestedInstances());
System.out.println(" Can check: " + service.isCanCheck());
if (service.getLogs() != null && !service.getLogs().isEmpty()) {
System.out.println(" Logs available: " + service.getLogs().length() + " characters");
}
}// Get live information for specific services
String[] criticalServices = {"dataset.service", "transaction.service", "metadata.service"};
for (String serviceName : criticalServices) {
try {
SystemServiceLiveInfo liveInfo = monitorClient.getSystemServiceLiveInfo(serviceName);
System.out.println("Live info for " + serviceName + ":");
System.out.println(" Status: " + liveInfo.getStatus());
System.out.println(" YARN App ID: " + liveInfo.getYarnAppId());
System.out.println(" Containers: " + liveInfo.getContainers());
// Analyze container distribution
Map<String, Integer> containers = liveInfo.getContainers();
int totalContainers = containers.values().stream().mapToInt(Integer::intValue).sum();
System.out.println(" Total containers: " + totalContainers);
} catch (Exception e) {
System.err.println("Error getting live info for " + serviceName + ": " + e.getMessage());
}
}// Get current instance count
String serviceName = "metadata.service";
int currentInstances = monitorClient.getSystemServiceInstances(serviceName);
System.out.println("Current instances of " + serviceName + ": " + currentInstances);
// Scale service instances
int newInstanceCount = 3;
try {
monitorClient.setSystemServiceInstances(serviceName, newInstanceCount);
System.out.println("Scaled " + serviceName + " to " + newInstanceCount + " instances");
// Wait and verify scaling
Thread.sleep(30000); // Wait 30 seconds
int actualInstances = monitorClient.getSystemServiceInstances(serviceName);
if (actualInstances == newInstanceCount) {
System.out.println("Scaling completed successfully");
} else {
System.out.println("Scaling in progress: " + actualInstances + "/" + newInstanceCount);
}
} catch (Exception e) {
System.err.println("Error scaling service: " + e.getMessage());
}public class SystemHealthChecker {
private final MonitorClient monitorClient;
private final MetricsClient metricsClient;
public SystemHealthChecker(MonitorClient monitorClient, MetricsClient metricsClient) {
this.monitorClient = monitorClient;
this.metricsClient = metricsClient;
}
public SystemHealthReport checkSystemHealth() {
SystemHealthReport.Builder reportBuilder = SystemHealthReport.builder();
try {
// Check overall system status
boolean allServicesOk = monitorClient.allSystemServicesOk();
reportBuilder.allServicesHealthy(allServicesOk);
// Get detailed service status
Map<String, String> serviceStatuses = monitorClient.getAllSystemServiceStatus();
reportBuilder.serviceStatuses(serviceStatuses);
// Identify unhealthy services
List<String> unhealthyServices = serviceStatuses.entrySet().stream()
.filter(entry -> !"OK".equals(entry.getValue()))
.map(Map.Entry::getKey)
.collect(Collectors.toList());
reportBuilder.unhealthyServices(unhealthyServices);
// Get system resource metrics
Map<String, String> systemTags = Map.of("component", "system");
List<String> resourceMetrics = List.of(
"system.total.memory.mb",
"system.available.memory.mb",
"system.cpu.percentage",
"system.disk.used.percentage"
);
try {
MetricQueryResult systemMetrics = metricsClient.query(systemTags, resourceMetrics, List.of(),
String.valueOf(System.currentTimeMillis() / 1000 - 300), // Last 5 minutes
String.valueOf(System.currentTimeMillis() / 1000)
);
reportBuilder.systemMetrics(systemMetrics);
} catch (Exception e) {
reportBuilder.metricsError("Failed to retrieve system metrics: " + e.getMessage());
}
} catch (Exception e) {
reportBuilder.error("System health check failed: " + e.getMessage());
}
return reportBuilder.build();
}
public void monitorSystemHealth(long intervalMs, HealthReportCallback callback) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(() -> {
try {
SystemHealthReport report = checkSystemHealth();
callback.onHealthReport(report);
} catch (Exception e) {
callback.onError(e);
}
}, 0, intervalMs, TimeUnit.MILLISECONDS);
}
@FunctionalInterface
public interface HealthReportCallback {
void onHealthReport(SystemHealthReport report);
default void onError(Exception e) {
System.err.println("Health monitoring error: " + e.getMessage());
}
}
}
// Health report data structure
public class SystemHealthReport {
private final boolean allServicesHealthy;
private final Map<String, String> serviceStatuses;
private final List<String> unhealthyServices;
private final MetricQueryResult systemMetrics;
private final String metricsError;
private final String error;
private final long timestamp;
// Constructor, getters, and builder
}public void analyzeMetricTrend(String metricName, MetricTimeSeries series) {
List<TimeValue> data = series.getData();
if (data.size() < 2) {
System.out.println("Insufficient data for trend analysis: " + metricName);
return;
}
// Calculate basic statistics
DoubleSummaryStatistics stats = data.stream()
.mapToDouble(TimeValue::getValue)
.summaryStatistics();
System.out.println("Metric: " + metricName);
System.out.println(" Data points: " + stats.getCount());
System.out.println(" Min: " + stats.getMin());
System.out.println(" Max: " + stats.getMax());
System.out.println(" Average: " + String.format("%.2f", stats.getAverage()));
// Calculate trend
double firstValue = data.get(0).getValue();
double lastValue = data.get(data.size() - 1).getValue();
double percentChange = ((lastValue - firstValue) / firstValue) * 100;
System.out.println(" Trend: " + String.format("%.2f", percentChange) + "%");
// Detect anomalies (values > 2 standard deviations from mean)
double stdDev = calculateStandardDeviation(data, stats.getAverage());
long anomalies = data.stream()
.mapToDouble(TimeValue::getValue)
.filter(value -> Math.abs(value - stats.getAverage()) > 2 * stdDev)
.count();
if (anomalies > 0) {
System.out.println(" Anomalies detected: " + anomalies + " data points");
}
}
private double calculateStandardDeviation(List<TimeValue> data, double mean) {
double variance = data.stream()
.mapToDouble(TimeValue::getValue)
.map(value -> Math.pow(value - mean, 2))
.average()
.orElse(0.0);
return Math.sqrt(variance);
}public class MetricsDashboard {
private final MetricsClient metricsClient;
public MetricsDashboard(MetricsClient metricsClient) {
this.metricsClient = metricsClient;
}
public void displayApplicationMetrics(ApplicationId appId) {
Map<String, String> appTags = Map.of(
"namespace", appId.getNamespace().getId(),
"app", appId.getApplication()
);
// Key metrics to monitor
List<String> keyMetrics = List.of(
"system.process.memory.used.mb",
"system.process.cpu.percentage",
"user.records.in",
"user.records.out",
"user.errors.total"
);
System.out.println("=== Application Metrics Dashboard ===");
System.out.println("Application: " + appId.getApplication());
System.out.println("Namespace: " + appId.getNamespace().getId());
System.out.println("Timestamp: " + new Date());
System.out.println();
for (String metric : keyMetrics) {
try {
MetricQueryResult result = metricsClient.query(appTags, metric);
displayMetricSummary(metric, result);
} catch (Exception e) {
System.out.println(metric + ": Error retrieving data - " + e.getMessage());
}
}
System.out.println("========================================");
}
private void displayMetricSummary(String metricName, MetricQueryResult result) {
if (result.getSeries().isEmpty()) {
System.out.println(metricName + ": No data available");
return;
}
MetricTimeSeries series = result.getSeries().get(0);
List<TimeValue> data = series.getData();
if (data.isEmpty()) {
System.out.println(metricName + ": No data points");
return;
}
TimeValue latest = data.get(data.size() - 1);
DoubleSummaryStatistics stats = data.stream()
.mapToDouble(TimeValue::getValue)
.summaryStatistics();
System.out.printf("%s: Current=%.2f, Avg=%.2f, Max=%.2f%n",
metricName, (double)latest.getValue(), stats.getAverage(), stats.getMax());
}
}Metrics and monitoring operations may throw these exceptions:
try {
MetricQueryResult result = metricsClient.query(tags, metricName);
System.out.println("Query successful, " + result.getSeries().size() + " series returned");
} catch (MetricNotFoundException e) {
System.err.println("Metric not found: " + metricName);
} catch (InvalidMetricException e) {
System.err.println("Invalid metric query: " + e.getMessage());
} catch (IOException e) {
System.err.println("Network error: " + e.getMessage());
}// Good: Comprehensive monitoring with alerting and trend analysis
public class ProductionMonitor {
private final MetricsClient metricsClient;
private final MonitorClient monitorClient;
private final AlertingService alertingService;
public ProductionMonitor(MetricsClient metricsClient, MonitorClient monitorClient, AlertingService alertingService) {
this.metricsClient = metricsClient;
this.monitorClient = monitorClient;
this.alertingService = alertingService;
}
public void startMonitoring() {
// System health monitoring
ScheduledExecutorService systemMonitor = Executors.newScheduledThreadPool(2);
// Check system services every minute
systemMonitor.scheduleAtFixedRate(() -> {
try {
if (!monitorClient.allSystemServicesOk()) {
Map<String, String> statuses = monitorClient.getAllSystemServiceStatus();
alertingService.sendSystemAlert("System services unhealthy", statuses);
}
} catch (Exception e) {
alertingService.sendSystemAlert("System monitoring failed", e.getMessage());
}
}, 0, 60, TimeUnit.SECONDS);
// Check application metrics every 5 minutes
systemMonitor.scheduleAtFixedRate(() -> {
checkApplicationMetrics();
}, 0, 300, TimeUnit.SECONDS);
}
private void checkApplicationMetrics() {
// Implementation for application-specific metric monitoring
// Check memory usage, error rates, throughput, etc.
// Send alerts if thresholds are exceeded
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap-client