CDAP Watchdog provides comprehensive metrics collection, querying, and logging services for the CDAP platform
—
High-throughput log buffering infrastructure for temporary log storage, pipeline processing, automatic recovery, and cleanup operations. The Log Buffer system provides resilient log processing capabilities for CDAP platform components.
Main service managing log buffer pipelines, recovery, cleanup, and HTTP endpoints for high-throughput log processing scenarios.
/**
* Manages log buffer pipelines, recovery, cleanup, and HTTP endpoint
* Responsible for loading, starting and stopping log buffer pipelines, creating concurrent writers,
* starting cleaner service, and recovering logs from buffer
*/
public class LogBufferService extends AbstractIdleService {
/**
* Start log buffer service including pipeline loading and recovery
* @throws Exception if service startup fails
*/
protected void startUp() throws Exception;
/**
* Stop log buffer service and cleanup resources
* @throws Exception if service shutdown fails
*/
protected void shutDown() throws Exception;
}File-based log writer that appends logs to rotating buffer files with automatic file rotation when maximum size is reached.
/**
* Appends logs to log buffer file with automatic rotation
* File format: <length><log_event> where length is Avro encoded int32
* Files are named with monotonically increasing numbers: (max_file_id + 1).buf
*/
public class LogBufferWriter implements Flushable, Closeable {
/**
* Create log buffer writer with specified configuration
* @param logEventSerializer Serializer for log events
* @param locationFactory Factory for creating file locations
* @param maxFileSizeInBytes Maximum file size before rotation
* @param cleaner Cleanup runnable for old files
*/
public LogBufferWriter(LoggingEventSerializer logEventSerializer, LocationFactory locationFactory,
long maxFileSizeInBytes, Runnable cleaner);
/**
* Append log event to buffer file
* @param logEvent Log event to append
* @throws IOException if write operation fails
*/
public void append(LogBufferEvent logEvent) throws IOException;
/**
* Flush buffered data to file system
* @throws IOException if flush operation fails
*/
public void flush() throws IOException;
/**
* Close writer and cleanup resources
* @throws IOException if close operation fails
*/
public void close() throws IOException;
}Thread-safe wrapper around LogBufferWriter providing concurrent access for multiple threads writing to the same buffer.
/**
* Thread-safe wrapper for LogBufferWriter supporting concurrent writes
* Provides synchronization for multiple threads writing to the same log buffer
*/
public class ConcurrentLogBufferWriter implements Flushable, Closeable {
/**
* Create concurrent log buffer writer
* @param logBufferWriter Underlying writer to wrap
*/
public ConcurrentLogBufferWriter(LogBufferWriter logBufferWriter);
/**
* Thread-safe append operation
* @param logEvent Log event to append
* @throws IOException if write operation fails
*/
public synchronized void append(LogBufferEvent logEvent) throws IOException;
/**
* Thread-safe flush operation
* @throws IOException if flush operation fails
*/
public synchronized void flush() throws IOException;
/**
* Thread-safe close operation
* @throws IOException if close operation fails
*/
public synchronized void close() throws IOException;
}HTTP request handler for processing log buffer requests through REST endpoints.
/**
* HTTP handler for log buffer requests
* Processes incoming log events through HTTP endpoints
*/
public class LogBufferHandler extends AbstractHttpHandler {
// Handles HTTP requests for log buffer operations
// POST /logBuffer - Process log events through buffer
}Components for recovering logs from buffer files after system restarts or failures.
/**
* Service for recovering logs from buffer files
* Handles recovery operations after system restarts or failures
*/
public class LogBufferRecoveryService extends AbstractIdleService {
/**
* Start recovery service
* @throws Exception if recovery startup fails
*/
protected void startUp() throws Exception;
/**
* Stop recovery service
* @throws Exception if recovery shutdown fails
*/
protected void shutDown() throws Exception;
}
/**
* Reader for recovering log events from buffer files
* Provides sequential access to log events stored in buffer files
*/
public class LogBufferReader implements Closeable {
/**
* Create log buffer reader for specified file
* @param bufferFile File containing buffered log events
* @param serializer Serializer for deserializing log events
*/
public LogBufferReader(File bufferFile, LoggingEventSerializer serializer);
/**
* Read next log event from buffer
* @return Next log event, or null if end of file reached
* @throws IOException if read operation fails
*/
public LogBufferEvent readNext() throws IOException;
/**
* Close reader and cleanup resources
* @throws IOException if close operation fails
*/
public void close() throws IOException;
}Automatic cleanup of processed log buffer files to manage disk space usage.
/**
* Cleaner service for removing processed log buffer files
* Automatically removes old buffer files that have been processed
*/
public class LogBufferCleaner {
/**
* Create log buffer cleaner with configuration
* @param retentionPeriodMs Retention period for buffer files in milliseconds
* @param cleanupIntervalMs Interval between cleanup runs in milliseconds
*/
public LogBufferCleaner(long retentionPeriodMs, long cleanupIntervalMs);
/**
* Start cleanup operations
* Begins periodic cleanup of old buffer files
*/
public void start();
/**
* Stop cleanup operations
* Stops periodic cleanup and cleans up resources
*/
public void stop();
}Data structures for log buffer operations and file management.
/**
* Represents a log event in the buffer system
* Wrapper for log events with buffer-specific metadata
*/
public class LogBufferEvent {
/**
* Get the underlying log event
* @return ILoggingEvent containing the actual log data
*/
public ILoggingEvent getLoggingEvent();
/**
* Get the timestamp of the event
* @return Timestamp in milliseconds since epoch
*/
public long getTimestamp();
}
/**
* Request structure for log buffer operations
* Contains log events and metadata for buffer processing
*/
public class LogBufferRequest {
/**
* Get log events in this request
* @return List of log events to be buffered
*/
public List<LogBufferEvent> getLogEvents();
/**
* Get request metadata
* @return Map of metadata key-value pairs
*/
public Map<String, String> getMetadata();
}
/**
* Represents pending log buffer request awaiting processing
* Used for managing queued requests in the buffer system
*/
public class PendingLogBufferRequest {
/**
* Get the underlying request
* @return LogBufferRequest that is pending processing
*/
public LogBufferRequest getRequest();
/**
* Get the submission timestamp
* @return When this request was submitted for processing
*/
public long getSubmissionTime();
}
/**
* File offset information for log buffer files
* Tracks position information for reading/writing buffer files
*/
public class LogBufferFileOffset {
/**
* Get the file identifier
* @return Unique identifier for the buffer file
*/
public String getFileId();
/**
* Get the offset within the file
* @return Byte offset within the buffer file
*/
public long getOffset();
}Components for integrating log buffer with CDAP logging pipelines.
/**
* Pipeline configuration for log buffer processing
* Defines how log buffer integrates with logging pipelines
*/
public class LogBufferPipelineConfig {
/**
* Get buffer directory path
* @return Directory path where buffer files are stored
*/
public String getBufferDir();
/**
* Get maximum file size for buffer files
* @return Maximum size in bytes before file rotation
*/
public long getMaxFileSize();
/**
* Get retention period for buffer files
* @return Retention period in milliseconds
*/
public long getRetentionPeriod();
}
/**
* Log processor pipeline specifically for buffer processing
* Integrates buffer operations with CDAP logging pipeline framework
*/
public class LogBufferProcessorPipeline extends LogProcessorPipelineContext {
/**
* Create buffer processor pipeline with configuration
* @param config Pipeline configuration
* @param checkpointManager Manager for tracking processing checkpoints
*/
public LogBufferProcessorPipeline(LogBufferPipelineConfig config, CheckpointManager<LogBufferFileOffset> checkpointManager);
/**
* Start pipeline processing
* @throws Exception if pipeline startup fails
*/
public void start() throws Exception;
/**
* Stop pipeline processing
* @throws Exception if pipeline shutdown fails
*/
public void stop() throws Exception;
}Usage Examples:
import io.cdap.cdap.logging.logbuffer.*;
import io.cdap.cdap.logging.serialize.LoggingEventSerializer;
import org.apache.twill.filesystem.LocalLocationFactory;
// Create log buffer writer
LoggingEventSerializer serializer = new LoggingEventSerializer();
LocalLocationFactory locationFactory = new LocalLocationFactory();
long maxFileSize = 64 * 1024 * 1024; // 64MB
LogBufferWriter writer = new LogBufferWriter(
serializer,
locationFactory,
maxFileSize,
() -> System.out.println("Cleanup triggered")
);
// Create concurrent wrapper for multi-threaded access
ConcurrentLogBufferWriter concurrentWriter = new ConcurrentLogBufferWriter(writer);
// Append log events
LogBufferEvent event = // ... create log event
concurrentWriter.append(event);
concurrentWriter.flush();
// Cleanup
concurrentWriter.close();
// Recovery example
LogBufferRecoveryService recoveryService = // ... obtain recovery service
recoveryService.startUp(); // Recover any pending logs
// Cleanup configuration
LogBufferCleaner cleaner = new LogBufferCleaner(
24 * 60 * 60 * 1000L, // 24 hour retention
60 * 60 * 1000L // 1 hour cleanup interval
);
cleaner.start();Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap-watchdog