CDAP Watchdog provides comprehensive metrics collection, querying, and logging services for the CDAP platform
npx @tessl/cli install tessl/maven-io-cdap-cdap--cdap-watchdog@6.11.0CDAP Watchdog provides comprehensive metrics collection, querying, and logging services for the CDAP (Cask Data Application Platform) ecosystem. It implements a distributed monitoring and observability system that gathers performance data from CDAP components, processes and aggregates this data for efficient storage and retrieval, and exposes REST APIs for real-time metrics queries and centralized log management.
pom.xml:<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-watchdog</artifactId>
<version>6.11.0</version>
</dependency>// Metrics query functionality
import io.cdap.cdap.metrics.query.MetricsQueryService;
import io.cdap.cdap.metrics.query.MetricsHandler;
// Metrics collection and emission
import io.cdap.cdap.metrics.collect.MetricsEmitter;
import io.cdap.cdap.metrics.collect.AggregatedMetricsEmitter;
// Metrics processing services
import io.cdap.cdap.metrics.process.MetricsProcessorStatusService;
import io.cdap.cdap.metrics.process.MessagingMetricsProcessorService;
// Logging services
import io.cdap.cdap.logging.service.LogQueryService;
import io.cdap.cdap.logging.gateway.handlers.LogHttpHandler;
import io.cdap.cdap.logging.gateway.handlers.ErrorClassificationHttpHandler;
import io.cdap.cdap.logging.read.LogReader;
// Log buffer system
import io.cdap.cdap.logging.logbuffer.LogBufferService;
import io.cdap.cdap.logging.logbuffer.LogBufferWriter;
// Logging contexts
import io.cdap.cdap.logging.context.LoggingContextHelper;
import io.cdap.cdap.logging.context.ApplicationLoggingContext;
// Error classification
import io.cdap.cdap.logging.ErrorLogsClassifier;import io.cdap.cdap.metrics.collect.AggregatedMetricsEmitter;
import io.cdap.cdap.api.metrics.MetricValue;
// Create metrics emitter for collecting metrics
AggregatedMetricsEmitter emitter = new AggregatedMetricsEmitter("my.metric.name");
// Emit different types of metrics
emitter.increment(5); // Counter metric
emitter.gauge(100); // Gauge metric
emitter.event(250); // Event for distribution
// Emit the collected metrics
MetricValue metricValue = emitter.emit();import io.cdap.cdap.logging.context.LoggingContextHelper;
import io.cdap.cdap.logging.context.ApplicationLoggingContext;
import io.cdap.cdap.logging.read.LogReader;
import io.cdap.cdap.logging.read.LogEvent;
// Create logging context for an application
ApplicationLoggingContext context = LoggingContextHelper.getLoggingContext(
"myNamespace", "myApp", "myProgram", ProgramType.SERVICE
);
// Read logs using LogReader
LogReader logReader = // ... obtain LogReader instance
logReader.getLog(context, startTime, endTime, Filter.EMPTY);CDAP Watchdog is built around several key components:
MetricsEmitter implementations, centralized processing through MetricsProcessorService, and query capabilities via MetricsQueryService with REST API endpointsLogQueryService with context-aware filteringMetricsAdminMessageREST endpoints for querying metrics data, including time series queries, aggregate queries, tag searches, and batch query processing.
public class MetricsQueryService extends AbstractIdleService {
protected void startUp() throws Exception;
protected void shutDown() throws Exception;
}
public class MetricsHandler extends AbstractHttpHandler {
// POST /v3/metrics/search - Search for tags or metrics
// POST /v3/metrics/query - Query metrics data (batch and single queries)
// GET /v3/metrics/processor/status - Get metrics processor status
}Interfaces and implementations for collecting and emitting metrics data, including counter, gauge, and distribution metrics with aggregation capabilities.
public interface MetricsEmitter {
MetricValue emit();
}
public final class AggregatedMetricsEmitter implements MetricsEmitter {
public void increment(long incrementValue);
public void gauge(long value);
public void event(long value);
public MetricValue emit();
}Backend metrics processing infrastructure for consuming metrics data from message queues, processing and persisting metrics to storage systems, and providing status monitoring.
public class MetricsProcessorStatusService extends AbstractIdleService {
protected void startUp() throws Exception;
protected void shutDown() throws Exception;
}
public class MessagingMetricsProcessorService extends AbstractExecutionThreadService {
protected void run() throws Exception;
protected void shutDown() throws Exception;
}Core logging services for centralized log collection, querying, and management with REST endpoints for log retrieval and error analysis.
public class LogQueryService extends AbstractIdleService {
protected void startUp() throws Exception;
protected void shutDown() throws Exception;
}
public interface LogReader {
void getLogNext(LoggingContext loggingContext, ReadRange readRange, int maxEvents, Filter filter, Callback callback) throws Exception;
void getLogPrev(LoggingContext loggingContext, ReadRange readRange, int maxEvents, Filter filter, Callback callback) throws Exception;
}Context classes and utilities for organizing logs by CDAP program types, providing structured logging with consistent tagging and filtering capabilities.
public final class LoggingContextHelper {
public static LoggingContext getLoggingContext(String namespaceId, String appId, String entityId, ProgramType programType);
public static LoggingContext getLoggingContextWithRunId(String namespaceId, String appId, String entityId, ProgramType programType, String runId);
}
public class ApplicationLoggingContext extends AbstractLoggingContext {
public String getLogPartition();
}High-throughput log buffering infrastructure for temporary log storage, pipeline processing, automatic recovery, and cleanup operations with file-based buffering and concurrent writer support.
public class LogBufferService extends AbstractIdleService {
protected void startUp() throws Exception;
protected void shutDown() throws Exception;
}
public class LogBufferWriter implements Flushable, Closeable {
public void append(LogBufferEvent logEvent) throws IOException;
public void flush() throws IOException;
public void close() throws IOException;
}// Logging data models
public class LogEvent {
public ILoggingEvent getLoggingEvent();
public LogOffset getOffset();
}
// Administrative messages
public final class MetricsAdminMessage {
public Type getType();
public <T> T getPayload(Gson gson, Type type);
public enum Type {
DELETE
}
}
// Entity categorization
public enum MetricsEntityType {
CONTEXT("c"),
RUN("r"),
METRIC("m"),
TAG("t");
}