CDAP Watchdog provides comprehensive metrics collection, querying, and logging services for the CDAP platform
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
CDAP Watchdog provides comprehensive metrics collection, querying, and logging services for the CDAP (Cask Data Application Platform) ecosystem. It implements a distributed monitoring and observability system that gathers performance data from CDAP components, processes and aggregates this data for efficient storage and retrieval, and exposes REST APIs for real-time metrics queries and centralized log management.
pom.xml:<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-watchdog</artifactId>
<version>6.11.0</version>
</dependency>// Metrics query functionality
import io.cdap.cdap.metrics.query.MetricsQueryService;
import io.cdap.cdap.metrics.query.MetricsHandler;
// Metrics collection and emission
import io.cdap.cdap.metrics.collect.MetricsEmitter;
import io.cdap.cdap.metrics.collect.AggregatedMetricsEmitter;
// Metrics processing services
import io.cdap.cdap.metrics.process.MetricsProcessorStatusService;
import io.cdap.cdap.metrics.process.MessagingMetricsProcessorService;
// Logging services
import io.cdap.cdap.logging.service.LogQueryService;
import io.cdap.cdap.logging.gateway.handlers.LogHttpHandler;
import io.cdap.cdap.logging.gateway.handlers.ErrorClassificationHttpHandler;
import io.cdap.cdap.logging.read.LogReader;
// Log buffer system
import io.cdap.cdap.logging.logbuffer.LogBufferService;
import io.cdap.cdap.logging.logbuffer.LogBufferWriter;
// Logging contexts
import io.cdap.cdap.logging.context.LoggingContextHelper;
import io.cdap.cdap.logging.context.ApplicationLoggingContext;
// Error classification
import io.cdap.cdap.logging.ErrorLogsClassifier;import io.cdap.cdap.metrics.collect.AggregatedMetricsEmitter;
import io.cdap.cdap.api.metrics.MetricValue;
// Create metrics emitter for collecting metrics
AggregatedMetricsEmitter emitter = new AggregatedMetricsEmitter("my.metric.name");
// Emit different types of metrics
emitter.increment(5); // Counter metric
emitter.gauge(100); // Gauge metric
emitter.event(250); // Event for distribution
// Emit the collected metrics
MetricValue metricValue = emitter.emit();import io.cdap.cdap.logging.context.LoggingContextHelper;
import io.cdap.cdap.logging.context.ApplicationLoggingContext;
import io.cdap.cdap.logging.read.LogReader;
import io.cdap.cdap.logging.read.LogEvent;
// Create logging context for an application
ApplicationLoggingContext context = LoggingContextHelper.getLoggingContext(
"myNamespace", "myApp", "myProgram", ProgramType.SERVICE
);
// Read logs using LogReader
LogReader logReader = // ... obtain LogReader instance
logReader.getLog(context, startTime, endTime, Filter.EMPTY);CDAP Watchdog is built around several key components:
MetricsEmitter implementations, centralized processing through MetricsProcessorService, and query capabilities via MetricsQueryService with REST API endpointsLogQueryService with context-aware filteringMetricsAdminMessageREST endpoints for querying metrics data, including time series queries, aggregate queries, tag searches, and batch query processing.
public class MetricsQueryService extends AbstractIdleService {
protected void startUp() throws Exception;
protected void shutDown() throws Exception;
}
public class MetricsHandler extends AbstractHttpHandler {
// POST /v3/metrics/search - Search for tags or metrics
// POST /v3/metrics/query - Query metrics data (batch and single queries)
// GET /v3/metrics/processor/status - Get metrics processor status
}Interfaces and implementations for collecting and emitting metrics data, including counter, gauge, and distribution metrics with aggregation capabilities.
public interface MetricsEmitter {
MetricValue emit();
}
public final class AggregatedMetricsEmitter implements MetricsEmitter {
public void increment(long incrementValue);
public void gauge(long value);
public void event(long value);
public MetricValue emit();
}Backend metrics processing infrastructure for consuming metrics data from message queues, processing and persisting metrics to storage systems, and providing status monitoring.
public class MetricsProcessorStatusService extends AbstractIdleService {
protected void startUp() throws Exception;
protected void shutDown() throws Exception;
}
public class MessagingMetricsProcessorService extends AbstractExecutionThreadService {
protected void run() throws Exception;
protected void shutDown() throws Exception;
}Core logging services for centralized log collection, querying, and management with REST endpoints for log retrieval and error analysis.
public class LogQueryService extends AbstractIdleService {
protected void startUp() throws Exception;
protected void shutDown() throws Exception;
}
public interface LogReader {
void getLogNext(LoggingContext loggingContext, ReadRange readRange, int maxEvents, Filter filter, Callback callback) throws Exception;
void getLogPrev(LoggingContext loggingContext, ReadRange readRange, int maxEvents, Filter filter, Callback callback) throws Exception;
}Context classes and utilities for organizing logs by CDAP program types, providing structured logging with consistent tagging and filtering capabilities.
public final class LoggingContextHelper {
public static LoggingContext getLoggingContext(String namespaceId, String appId, String entityId, ProgramType programType);
public static LoggingContext getLoggingContextWithRunId(String namespaceId, String appId, String entityId, ProgramType programType, String runId);
}
public class ApplicationLoggingContext extends AbstractLoggingContext {
public String getLogPartition();
}High-throughput log buffering infrastructure for temporary log storage, pipeline processing, automatic recovery, and cleanup operations with file-based buffering and concurrent writer support.
public class LogBufferService extends AbstractIdleService {
protected void startUp() throws Exception;
protected void shutDown() throws Exception;
}
public class LogBufferWriter implements Flushable, Closeable {
public void append(LogBufferEvent logEvent) throws IOException;
public void flush() throws IOException;
public void close() throws IOException;
}// Logging data models
public class LogEvent {
public ILoggingEvent getLoggingEvent();
public LogOffset getOffset();
}
// Administrative messages
public final class MetricsAdminMessage {
public Type getType();
public <T> T getPayload(Gson gson, Type type);
public enum Type {
DELETE
}
}
// Entity categorization
public enum MetricsEntityType {
CONTEXT("c"),
RUN("r"),
METRIC("m"),
TAG("t");
}