CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap-client

CDAP Java Client library providing programmatic APIs for interacting with the CDAP platform

Pending
Overview
Eval results
Files

metrics-monitoring.mddocs/

Metrics and Monitoring

The MetricsClient and MonitorClient provide comprehensive metrics querying, system service monitoring, and performance data collection. Monitor system health, query time series data, and collect performance metrics from CDAP applications and services.

MetricsClient

public class MetricsClient {
    // Constructors
    public MetricsClient(ClientConfig config);
    public MetricsClient(ClientConfig config, RESTClient restClient);
    
    // Metrics search methods
    public List<MetricTagValue> searchTags(Map<String, String> tags);
    public List<String> searchMetrics(Map<String, String> tags);
    
    // Metrics query methods
    public MetricQueryResult query(Map<String, String> tags, String metric);
    public MetricQueryResult query(Map<String, String> tags, List<String> metrics, List<String> groupBys, String start, String end);
    public MetricQueryResult query(Map<String, String> tags, List<String> metrics, List<String> groupBys, Map<String, String> timeRangeParams);
    public RuntimeMetrics getServiceMetrics(ServiceId serviceId);
}

MonitorClient

public class MonitorClient {
    // Constructors
    public MonitorClient(ClientConfig config);
    public MonitorClient(ClientConfig config, RESTClient restClient);
    
    // System service information methods
    public SystemServiceLiveInfo getSystemServiceLiveInfo(String serviceName);
    public List<SystemServiceMeta> listSystemServices();
    public String getSystemServiceStatus(String serviceName);
    public Map<String, String> getAllSystemServiceStatus();
    public boolean allSystemServicesOk();
    
    // System service management methods
    public void setSystemServiceInstances(String serviceName, int instances);
    public int getSystemServiceInstances(String serviceName);
}

Metrics Types and Results

public class MetricQueryResult {
    public String getStartTime();
    public String getEndTime();
    public List<MetricTimeSeries> getSeries();
    public String getResolution();
}

public class MetricTimeSeries {
    public String getMetricName();
    public Map<String, String> getGrouping();
    public List<TimeValue> getData();
}

public class TimeValue {
    public long getTime();
    public long getValue();
}

public class MetricTagValue {
    public String getName();
    public String getValue();
}

public class RuntimeMetrics {
    public int getCurrentInstances();
    public int getRequestedInstances();
    public Map<String, String> getMetrics();
}

System Service Types

public class SystemServiceMeta {
    public String getName();
    public String getDescription();
    public String getStatus();
    public String getLogs();
    public int getInstances();
    public int getRequestedInstances();
    public boolean isCanCheck();
}

public class SystemServiceLiveInfo {
    public String getStatus();
    public Map<String, Integer> getContainers();
    public String getYarnAppId();
}

Metrics Discovery and Search

Tag Discovery

// Search for available metric tags
Map<String, String> searchCriteria = Map.of(
    "namespace", "default",
    "app", "data-pipeline"
);

List<MetricTagValue> tags = metricsClient.searchTags(searchCriteria);
System.out.println("Available metric tags:");

// Group tags by name
Map<String, List<String>> tagsByName = tags.stream()
    .collect(Collectors.groupingBy(
        MetricTagValue::getName,
        Collectors.mapping(MetricTagValue::getValue, Collectors.toList())
    ));

for (Map.Entry<String, List<String>> entry : tagsByName.entrySet()) {
    System.out.println("- " + entry.getKey() + ": " + entry.getValue());
}

// Search for program-specific tags
Map<String, String> programTags = Map.of(
    "namespace", "default",
    "app", "analytics-app",
    "program", "data-processor",
    "program.type", "worker"
);
List<MetricTagValue> programMetricTags = metricsClient.searchTags(programTags);

Metric Discovery

// Search for available metrics
List<String> metrics = metricsClient.searchMetrics(searchCriteria);
System.out.println("Available metrics: " + metrics);

// Common CDAP metrics patterns
List<String> performanceMetrics = metrics.stream()
    .filter(name -> name.contains("process.") || name.contains("system."))
    .collect(Collectors.toList());

List<String> userMetrics = metrics.stream()
    .filter(name -> name.startsWith("user."))
    .collect(Collectors.toList());

System.out.println("Performance metrics: " + performanceMetrics);
System.out.println("User metrics: " + userMetrics);

Metrics Querying

Basic Metrics Queries

// Query single metric
Map<String, String> tags = Map.of(
    "namespace", "default",
    "app", "web-analytics",
    "program", "stream-processor",
    "program.type", "workflow"
);

String metricName = "system.process.memory.used.mb";
MetricQueryResult result = metricsClient.query(tags, metricName);

System.out.println("Query period: " + result.getStartTime() + " to " + result.getEndTime());
System.out.println("Resolution: " + result.getResolution());

for (MetricTimeSeries series : result.getSeries()) {
    System.out.println("Metric: " + series.getMetricName());
    System.out.println("Grouping: " + series.getGrouping());
    System.out.println("Data points: " + series.getData().size());
    
    // Show recent values
    List<TimeValue> data = series.getData();
    if (!data.isEmpty()) {
        TimeValue latest = data.get(data.size() - 1);
        System.out.println("Latest value: " + latest.getValue() + " MB at " + new Date(latest.getTime() * 1000));
    }
}

Advanced Metrics Queries

// Query multiple metrics with time range
List<String> multipleMetrics = List.of(
    "system.process.memory.used.mb",
    "system.process.cpu.percentage",
    "user.records.processed",
    "user.processing.time.ms"
);

List<String> groupBy = List.of("program", "program.type");

// Time range - last 24 hours
long endTime = System.currentTimeMillis() / 1000;
long startTime = endTime - 24 * 3600; // 24 hours ago

MetricQueryResult multiResult = metricsClient.query(
    tags,
    multipleMetrics,
    groupBy,
    String.valueOf(startTime),
    String.valueOf(endTime)
);

// Process results by metric
Map<String, MetricTimeSeries> seriesByMetric = multiResult.getSeries().stream()
    .collect(Collectors.toMap(MetricTimeSeries::getMetricName, series -> series));

for (String metric : multipleMetrics) {
    MetricTimeSeries series = seriesByMetric.get(metric);
    if (series != null) {
        analyzeMetricTrend(metric, series);
    }
}

// Query with custom time parameters
Map<String, String> timeParams = Map.of(
    "start", String.valueOf(startTime),
    "end", String.valueOf(endTime),
    "resolution", "1h",  // 1 hour resolution
    "aggregate", "true"
);

MetricQueryResult customResult = metricsClient.query(tags, multipleMetrics, groupBy, timeParams);

Service-Specific Metrics

// Get runtime metrics for a service
ServiceId serviceId = ServiceId.of(appId, "user-service");
RuntimeMetrics runtimeMetrics = metricsClient.getServiceMetrics(serviceId);

System.out.println("Current instances: " + runtimeMetrics.getCurrentInstances());
System.out.println("Requested instances: " + runtimeMetrics.getRequestedInstances());

Map<String, String> metrics = runtimeMetrics.getMetrics();
for (Map.Entry<String, String> entry : metrics.entrySet()) {
    System.out.println(entry.getKey() + ": " + entry.getValue());
}

// Check if scaling is needed
if (runtimeMetrics.getCurrentInstances() != runtimeMetrics.getRequestedInstances()) {
    System.out.println("Service is scaling...");
}

System Service Monitoring

System Service Status

// Check all system services
boolean allOk = monitorClient.allSystemServicesOk();
System.out.println("All system services OK: " + allOk);

// Get status of all services
Map<String, String> allStatuses = monitorClient.getAllSystemServiceStatus();
System.out.println("System service statuses:");

for (Map.Entry<String, String> entry : allStatuses.entrySet()) {
    String serviceName = entry.getKey();
    String status = entry.getValue();
    System.out.println("- " + serviceName + ": " + status);
    
    if (!"OK".equals(status)) {
        System.err.println("  WARNING: Service " + serviceName + " is not OK!");
    }
}

// Check specific service
String datasetServiceStatus = monitorClient.getSystemServiceStatus("dataset.service");
System.out.println("Dataset service status: " + datasetServiceStatus);

System Service Information

// List all system services
List<SystemServiceMeta> services = monitorClient.listSystemServices();
System.out.println("System services (" + services.size() + "):");

for (SystemServiceMeta service : services) {
    System.out.println("Service: " + service.getName());
    System.out.println("  Description: " + service.getDescription());
    System.out.println("  Status: " + service.getStatus());
    System.out.println("  Instances: " + service.getInstances() + "/" + service.getRequestedInstances());
    System.out.println("  Can check: " + service.isCanCheck());
    
    if (service.getLogs() != null && !service.getLogs().isEmpty()) {
        System.out.println("  Logs available: " + service.getLogs().length() + " characters");
    }
}

System Service Live Information

// Get live information for specific services
String[] criticalServices = {"dataset.service", "transaction.service", "metadata.service"};

for (String serviceName : criticalServices) {
    try {
        SystemServiceLiveInfo liveInfo = monitorClient.getSystemServiceLiveInfo(serviceName);
        
        System.out.println("Live info for " + serviceName + ":");
        System.out.println("  Status: " + liveInfo.getStatus());
        System.out.println("  YARN App ID: " + liveInfo.getYarnAppId());
        System.out.println("  Containers: " + liveInfo.getContainers());
        
        // Analyze container distribution
        Map<String, Integer> containers = liveInfo.getContainers();
        int totalContainers = containers.values().stream().mapToInt(Integer::intValue).sum();
        System.out.println("  Total containers: " + totalContainers);
        
    } catch (Exception e) {
        System.err.println("Error getting live info for " + serviceName + ": " + e.getMessage());
    }
}

System Service Management

Instance Management

// Get current instance count
String serviceName = "metadata.service";
int currentInstances = monitorClient.getSystemServiceInstances(serviceName);
System.out.println("Current instances of " + serviceName + ": " + currentInstances);

// Scale service instances
int newInstanceCount = 3;
try {
    monitorClient.setSystemServiceInstances(serviceName, newInstanceCount);
    System.out.println("Scaled " + serviceName + " to " + newInstanceCount + " instances");
    
    // Wait and verify scaling
    Thread.sleep(30000); // Wait 30 seconds
    
    int actualInstances = monitorClient.getSystemServiceInstances(serviceName);
    if (actualInstances == newInstanceCount) {
        System.out.println("Scaling completed successfully");
    } else {
        System.out.println("Scaling in progress: " + actualInstances + "/" + newInstanceCount);
    }
    
} catch (Exception e) {
    System.err.println("Error scaling service: " + e.getMessage());
}

Advanced Monitoring and Analytics

Comprehensive System Health Check

public class SystemHealthChecker {
    private final MonitorClient monitorClient;
    private final MetricsClient metricsClient;
    
    public SystemHealthChecker(MonitorClient monitorClient, MetricsClient metricsClient) {
        this.monitorClient = monitorClient;
        this.metricsClient = metricsClient;
    }
    
    public SystemHealthReport checkSystemHealth() {
        SystemHealthReport.Builder reportBuilder = SystemHealthReport.builder();
        
        try {
            // Check overall system status
            boolean allServicesOk = monitorClient.allSystemServicesOk();
            reportBuilder.allServicesHealthy(allServicesOk);
            
            // Get detailed service status
            Map<String, String> serviceStatuses = monitorClient.getAllSystemServiceStatus();
            reportBuilder.serviceStatuses(serviceStatuses);
            
            // Identify unhealthy services
            List<String> unhealthyServices = serviceStatuses.entrySet().stream()
                .filter(entry -> !"OK".equals(entry.getValue()))
                .map(Map.Entry::getKey)
                .collect(Collectors.toList());
            reportBuilder.unhealthyServices(unhealthyServices);
            
            // Get system resource metrics
            Map<String, String> systemTags = Map.of("component", "system");
            List<String> resourceMetrics = List.of(
                "system.total.memory.mb",
                "system.available.memory.mb", 
                "system.cpu.percentage",
                "system.disk.used.percentage"
            );
            
            try {
                MetricQueryResult systemMetrics = metricsClient.query(systemTags, resourceMetrics, List.of(), 
                    String.valueOf(System.currentTimeMillis() / 1000 - 300), // Last 5 minutes
                    String.valueOf(System.currentTimeMillis() / 1000)
                );
                reportBuilder.systemMetrics(systemMetrics);
            } catch (Exception e) {
                reportBuilder.metricsError("Failed to retrieve system metrics: " + e.getMessage());
            }
            
        } catch (Exception e) {
            reportBuilder.error("System health check failed: " + e.getMessage());
        }
        
        return reportBuilder.build();
    }
    
    public void monitorSystemHealth(long intervalMs, HealthReportCallback callback) {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        
        executor.scheduleAtFixedRate(() -> {
            try {
                SystemHealthReport report = checkSystemHealth();
                callback.onHealthReport(report);
            } catch (Exception e) {
                callback.onError(e);
            }
        }, 0, intervalMs, TimeUnit.MILLISECONDS);
    }
    
    @FunctionalInterface
    public interface HealthReportCallback {
        void onHealthReport(SystemHealthReport report);
        
        default void onError(Exception e) {
            System.err.println("Health monitoring error: " + e.getMessage());
        }
    }
}

// Health report data structure
public class SystemHealthReport {
    private final boolean allServicesHealthy;
    private final Map<String, String> serviceStatuses;
    private final List<String> unhealthyServices;
    private final MetricQueryResult systemMetrics;
    private final String metricsError;
    private final String error;
    private final long timestamp;
    
    // Constructor, getters, and builder
}

Performance Analysis

public void analyzeMetricTrend(String metricName, MetricTimeSeries series) {
    List<TimeValue> data = series.getData();
    if (data.size() < 2) {
        System.out.println("Insufficient data for trend analysis: " + metricName);
        return;
    }
    
    // Calculate basic statistics
    DoubleSummaryStatistics stats = data.stream()
        .mapToDouble(TimeValue::getValue)
        .summaryStatistics();
    
    System.out.println("Metric: " + metricName);
    System.out.println("  Data points: " + stats.getCount());
    System.out.println("  Min: " + stats.getMin());
    System.out.println("  Max: " + stats.getMax());
    System.out.println("  Average: " + String.format("%.2f", stats.getAverage()));
    
    // Calculate trend
    double firstValue = data.get(0).getValue();
    double lastValue = data.get(data.size() - 1).getValue();
    double percentChange = ((lastValue - firstValue) / firstValue) * 100;
    
    System.out.println("  Trend: " + String.format("%.2f", percentChange) + "%");
    
    // Detect anomalies (values > 2 standard deviations from mean)
    double stdDev = calculateStandardDeviation(data, stats.getAverage());
    long anomalies = data.stream()
        .mapToDouble(TimeValue::getValue)
        .filter(value -> Math.abs(value - stats.getAverage()) > 2 * stdDev)
        .count();
    
    if (anomalies > 0) {
        System.out.println("  Anomalies detected: " + anomalies + " data points");
    }
}

private double calculateStandardDeviation(List<TimeValue> data, double mean) {
    double variance = data.stream()
        .mapToDouble(TimeValue::getValue)
        .map(value -> Math.pow(value - mean, 2))
        .average()
        .orElse(0.0);
    return Math.sqrt(variance);
}

Custom Metrics Dashboard

public class MetricsDashboard {
    private final MetricsClient metricsClient;
    
    public MetricsDashboard(MetricsClient metricsClient) {
        this.metricsClient = metricsClient;
    }
    
    public void displayApplicationMetrics(ApplicationId appId) {
        Map<String, String> appTags = Map.of(
            "namespace", appId.getNamespace().getId(),
            "app", appId.getApplication()
        );
        
        // Key metrics to monitor
        List<String> keyMetrics = List.of(
            "system.process.memory.used.mb",
            "system.process.cpu.percentage", 
            "user.records.in",
            "user.records.out",
            "user.errors.total"
        );
        
        System.out.println("=== Application Metrics Dashboard ===");
        System.out.println("Application: " + appId.getApplication());
        System.out.println("Namespace: " + appId.getNamespace().getId());
        System.out.println("Timestamp: " + new Date());
        System.out.println();
        
        for (String metric : keyMetrics) {
            try {
                MetricQueryResult result = metricsClient.query(appTags, metric);
                displayMetricSummary(metric, result);
            } catch (Exception e) {
                System.out.println(metric + ": Error retrieving data - " + e.getMessage());
            }
        }
        
        System.out.println("========================================");
    }
    
    private void displayMetricSummary(String metricName, MetricQueryResult result) {
        if (result.getSeries().isEmpty()) {
            System.out.println(metricName + ": No data available");
            return;
        }
        
        MetricTimeSeries series = result.getSeries().get(0);
        List<TimeValue> data = series.getData();
        
        if (data.isEmpty()) {
            System.out.println(metricName + ": No data points");
            return;
        }
        
        TimeValue latest = data.get(data.size() - 1);
        DoubleSummaryStatistics stats = data.stream()
            .mapToDouble(TimeValue::getValue)
            .summaryStatistics();
        
        System.out.printf("%s: Current=%.2f, Avg=%.2f, Max=%.2f%n", 
            metricName, (double)latest.getValue(), stats.getAverage(), stats.getMax());
    }
}

Error Handling

Metrics and monitoring operations may throw these exceptions:

  • MetricNotFoundException: Requested metric does not exist
  • InvalidMetricException: Invalid metric query parameters
  • SystemServiceNotFoundException: System service does not exist
  • UnauthenticatedException: Authentication required
  • UnauthorizedException: Insufficient permissions
try {
    MetricQueryResult result = metricsClient.query(tags, metricName);
    System.out.println("Query successful, " + result.getSeries().size() + " series returned");
} catch (MetricNotFoundException e) {
    System.err.println("Metric not found: " + metricName);
} catch (InvalidMetricException e) {
    System.err.println("Invalid metric query: " + e.getMessage());
} catch (IOException e) {
    System.err.println("Network error: " + e.getMessage());
}

Best Practices

  1. Regular Monitoring: Implement continuous system health monitoring
  2. Metric Selection: Focus on key performance indicators and business metrics
  3. Alerting: Set up alerts for critical metrics and system services
  4. Trend Analysis: Track metrics over time to identify patterns and anomalies
  5. Resource Planning: Use metrics for capacity planning and scaling decisions
  6. Performance Optimization: Use metrics to identify and resolve performance bottlenecks
// Good: Comprehensive monitoring with alerting and trend analysis
public class ProductionMonitor {
    private final MetricsClient metricsClient;
    private final MonitorClient monitorClient;
    private final AlertingService alertingService;
    
    public ProductionMonitor(MetricsClient metricsClient, MonitorClient monitorClient, AlertingService alertingService) {
        this.metricsClient = metricsClient;
        this.monitorClient = monitorClient;
        this.alertingService = alertingService;
    }
    
    public void startMonitoring() {
        // System health monitoring
        ScheduledExecutorService systemMonitor = Executors.newScheduledThreadPool(2);
        
        // Check system services every minute
        systemMonitor.scheduleAtFixedRate(() -> {
            try {
                if (!monitorClient.allSystemServicesOk()) {
                    Map<String, String> statuses = monitorClient.getAllSystemServiceStatus();
                    alertingService.sendSystemAlert("System services unhealthy", statuses);
                }
            } catch (Exception e) {
                alertingService.sendSystemAlert("System monitoring failed", e.getMessage());
            }
        }, 0, 60, TimeUnit.SECONDS);
        
        // Check application metrics every 5 minutes
        systemMonitor.scheduleAtFixedRate(() -> {
            checkApplicationMetrics();
        }, 0, 300, TimeUnit.SECONDS);
    }
    
    private void checkApplicationMetrics() {
        // Implementation for application-specific metric monitoring
        // Check memory usage, error rates, throughput, etc.
        // Send alerts if thresholds are exceeded
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-cdap-cdap--cdap-client

docs

application-management.md

artifact-management.md

configuration.md

data-operations.md

dataset-operations.md

index.md

metrics-monitoring.md

program-control.md

schedule-management.md

security-administration.md

service-management.md

tile.json