CDAP Watchdog provides comprehensive metrics collection, querying, and logging services for the CDAP platform
—
Backend metrics processing infrastructure for consuming metrics data from message queues, processing and persisting metrics to storage systems, and providing status monitoring for metrics processing pipelines.
Status service for metrics processing with HTTP endpoints for health checks and discovery during CDAP services startup.
/**
* Status service with PingHandler used for discovery during CDAP-services startup
* Provides HTTP endpoints for monitoring metrics processor health and status
*/
public class MetricsProcessorStatusService extends AbstractIdleService {
/**
* Create metrics processor status service
* @param cConf CDAP configuration
* @param sConf Security configuration
* @param discoveryService Service discovery for registration
* @param handlers HTTP handlers for status endpoints
* @param commonNettyHttpServiceFactory Factory for creating HTTP service
*/
public MetricsProcessorStatusService(CConfiguration cConf,
SConfiguration sConf,
DiscoveryService discoveryService,
Set<HttpHandler> handlers,
CommonNettyHttpServiceFactory commonNettyHttpServiceFactory);
/**
* Start status service and register with discovery
* @throws Exception if service startup fails
*/
protected void startUp() throws Exception;
/**
* Stop status service and deregister from discovery
* @throws Exception if service shutdown fails
*/
protected void shutDown() throws Exception;
}Core metrics processing service that consumes metrics data from messaging system, processes and persists metrics to storage.
/**
* Service that consumes metrics from messaging system and processes them
* Reads metrics data from message queues, processes and persists to storage systems
*/
public class MessagingMetricsProcessorService extends AbstractExecutionThreadService {
/**
* Create messaging metrics processor service
* @param cConf CDAP configuration
* @param metricsWriter Writer for persisting processed metrics
* @param messagingService Messaging service for consuming metrics data
* @param topicId Topic identifier for metrics message queue
* @param metricsContext Context for processor metrics collection
* @param instanceId Unique instance identifier for this processor
*/
public MessagingMetricsProcessorService(CConfiguration cConf,
MetricsWriter metricsWriter,
MessagingService messagingService,
TopicId topicId,
MetricsContext metricsContext,
int instanceId);
/**
* Main processing loop for consuming and processing metrics
* Continuously reads from messaging system and processes metrics
*/
protected void run() throws Exception;
/**
* Graceful shutdown of processing service
* Stops message consumption and completes in-flight processing
*/
protected void shutDown() throws Exception;
}Manager service for coordinating multiple metrics processor instances and handling administrative operations.
/**
* Manager service for coordinating multiple metrics processor instances
* Handles lifecycle management and administrative operations for processor services
*/
public class MessagingMetricsProcessorManagerService extends AbstractIdleService {
/**
* Create metrics processor manager service
* @param cConf CDAP configuration
* @param messagingService Messaging service for metrics consumption
* @param metricsWriterProvider Provider for metrics writers
* @param metricsCollectionService Service for collecting processor metrics
*/
public MessagingMetricsProcessorManagerService(CConfiguration cConf,
MessagingService messagingService,
Provider<MetricsWriter> metricsWriterProvider,
MetricsCollectionService metricsCollectionService);
/**
* Start manager service and processor instances
* @throws Exception if startup fails
*/
protected void startUp() throws Exception;
/**
* Stop manager service and all processor instances
* @throws Exception if shutdown fails
*/
protected void shutDown() throws Exception;
}Factory for creating metrics processor services with proper configuration and dependencies.
/**
* Factory for creating messaging metrics processor services
* Provides configured instances of processor services with proper dependencies
*/
public interface MessagingMetricsProcessorServiceFactory {
/**
* Create messaging metrics processor service
* @param metricsWriter Writer for persisting metrics data
* @param topicId Topic identifier for metrics consumption
* @param instanceId Unique instance identifier
* @return Configured MessagingMetricsProcessorService instance
*/
MessagingMetricsProcessorService create(MetricsWriter metricsWriter,
TopicId topicId,
int instanceId);
}Runtime management services for metrics processing in distributed environments.
/**
* Runtime service for messaging metrics processor in distributed environments
* Manages processor lifecycle and integration with CDAP runtime systems
*/
public class MessagingMetricsProcessorRuntimeService extends AbstractIdleService {
/**
* Create runtime service for metrics processor
* @param cConf CDAP configuration
* @param sConf Security configuration
* @param discoveryService Service discovery
* @param messagingService Messaging service
* @param metricsCollectionService Metrics collection service
*/
public MessagingMetricsProcessorRuntimeService(CConfiguration cConf,
SConfiguration sConf,
DiscoveryService discoveryService,
MessagingService messagingService,
MetricsCollectionService metricsCollectionService);
/**
* Start runtime service and all managed components
* @throws Exception if startup fails
*/
protected void startUp() throws Exception;
/**
* Stop runtime service and cleanup resources
* @throws Exception if shutdown fails
*/
protected void shutDown() throws Exception;
}
/**
* Manager for metrics processor status service instances
* Coordinates status services across multiple processor instances
*/
public class MetricsProcessorStatusServiceManager extends AbstractIdleService {
/**
* Create status service manager
* @param cConf CDAP configuration
* @param sConf Security configuration
* @param discoveryService Service discovery
* @param handlers HTTP handlers for status endpoints
* @param httpServiceFactory Factory for creating HTTP services
*/
public MetricsProcessorStatusServiceManager(CConfiguration cConf,
SConfiguration sConf,
DiscoveryService discoveryService,
Set<HttpHandler> handlers,
CommonNettyHttpServiceFactory httpServiceFactory);
/**
* Start status service manager
* @throws Exception if startup fails
*/
protected void startUp() throws Exception;
/**
* Stop status service manager
* @throws Exception if shutdown fails
*/
protected void shutDown() throws Exception;
}Services and data models for metrics processing administration and maintenance.
/**
* Administrative message for metrics processing operations
* Used for coordinating administrative actions across processor instances
*/
public final class MetricsAdminMessage {
/**
* Get the administrative operation type
* @return Type of administrative operation
*/
public Type getType();
/**
* Get the message payload
* @param gson Gson instance for deserialization
* @param type Target type for payload deserialization
* @return Deserialized payload object
*/
public <T> T getPayload(Gson gson, Type type);
/**
* Administrative operation types
*/
public enum Type {
/** Delete metrics operation */
DELETE
}
}
/**
* Key provider for subscriber metrics processing
* Provides topic-based keys for metrics processing coordination
*/
public interface TopicSubscriberMetricsKeyProvider {
/**
* Get metrics key for subscriber processing
* @param topicId Topic identifier
* @param instanceId Processor instance identifier
* @return Metrics key for this subscriber instance
*/
String getMetricsKey(TopicId topicId, int instanceId);
}
/**
* Key provider for topic-based metrics processing
* Provides keys for organizing metrics by topic
*/
public interface TopicIdMetricsKeyProvider {
/**
* Get metrics key for topic processing
* @param topicId Topic identifier
* @return Metrics key for this topic
*/
String getMetricsKey(TopicId topicId);
}Usage Examples:
import io.cdap.cdap.metrics.process.*;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.api.metrics.MetricsWriter;
import io.cdap.cdap.proto.id.TopicId;
// Create and start status service
Set<HttpHandler> statusHandlers = // ... configure handlers
MetricsProcessorStatusService statusService = new MetricsProcessorStatusService(
cConf, sConf, discoveryService, statusHandlers, httpServiceFactory
);
statusService.startUp();
// Create processor service factory
MessagingMetricsProcessorServiceFactory factory = // ... obtain factory
// Create processor service for specific topic
TopicId metricsTopicId = new TopicId("system", "metrics");
MetricsWriter metricsWriter = // ... obtain metrics writer
MessagingMetricsProcessorService processor = factory.create(
metricsWriter,
metricsTopicId,
1 // instance ID
);
// Start processor (runs in background thread)
processor.startAsync().awaitRunning();
// Create manager service to coordinate multiple processors
MessagingMetricsProcessorManagerService manager = new MessagingMetricsProcessorManagerService(
cConf, messagingService, metricsWriterProvider, metricsCollectionService
);
manager.startUp();
// Administrative operations
MetricsAdminMessage deleteMessage = // ... create delete message
// Process admin message through appropriate channels
// Shutdown services
processor.stopAsync().awaitTerminated();
manager.shutDown();
statusService.shutDown();Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap-watchdog