CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-ranger--ranger-plugins-audit

Apache Ranger Audit Plugin Framework providing centralized audit logging capabilities for Apache Ranger security plugins across various big data components.

Pending
Overview
Eval results
Files

queue-async.mddocs/

Queue and Async Processing

Asynchronous audit processing capabilities with configurable queues, batching, and file spooling that provide reliability and performance optimization for high-volume audit scenarios.

Capabilities

Asynchronous Audit Provider

Asynchronous audit provider that processes audit events in background threads with configurable queue sizes and batch intervals.

/**
 * Asynchronous audit provider with background processing
 */
public class AsyncAuditProvider extends BaseAuditHandler {
    /**
     * Create asynchronous audit provider with queue configuration
     * @param name String provider name identifier
     * @param maxQueueSize int maximum queue size before blocking
     * @param maxFlushInterval int maximum flush interval in milliseconds
     */
    public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval);
    
    /**
     * Create asynchronous audit provider with queue configuration and audit handler
     * @param name String provider name identifier
     * @param maxQueueSize int maximum queue size before blocking
     * @param maxFlushInterval int maximum flush interval in milliseconds
     * @param provider AuditHandler audit handler to add to this provider
     */
    public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval, AuditHandler provider);
    
    /**
     * Initialize async provider with configuration properties
     * @param props Properties configuration properties
     */
    public void init(Properties props);
    
    /**
     * Log audit event asynchronously (non-blocking)
     * @param event AuditEventBase event to log
     */
    public void log(AuditEventBase event);
    
    /**
     * Log collection of audit events asynchronously
     * @param events Collection<AuditEventBase> events to log
     */
    public void log(Collection<AuditEventBase> events);
    
    /**
     * Start background processing threads
     */
    public void start();
    
    /**
     * Stop async provider and background threads
     */
    public void stop();
    
    /**
     * Wait for all pending events to be processed
     */
    public void waitToComplete();
    
    /**
     * Get current queue size
     * @return int number of events in queue
     */
    public int getQueueSize();
    
    /**
     * Check if async provider is running
     * @return boolean true if running
     */
    public boolean isRunning();
}

Audit Queue Base Class

Abstract base class for audit queues providing batching, file spooling, and drain management capabilities.

/**
 * Base class for audit queues with batching and file spooling
 */
public abstract class AuditQueue extends AuditDestination {
    // Queue configuration methods
    public void setMaxBatchSize(int maxBatchSize);
    public int getMaxBatchSize();
    public void setMaxBatchInterval(long maxBatchInterval);
    public long getMaxBatchInterval();
    public void setMaxQueueSize(int maxQueueSize);
    public int getMaxQueueSize();
    
    // File spooling configuration
    public void setSpoolEnabled(boolean spoolEnabled);
    public boolean isSpoolEnabled();
    public void setSpoolDirectory(String spoolDirectory);
    public String getSpoolDirectory();
    public void setSpoolFileName(String spoolFileName);
    public String getSpoolFileName();
    
    // Drain management
    public void startDrainThread();
    public void stopDrainThread();
    public boolean isDrainInProgress();
    
    // Statistics
    public long getProcessedCount();
    public long getErrorCount();
    public long getDroppedCount();
}

Multi-Destination Audit Provider

Audit provider that routes audit events to multiple destinations simultaneously, enabling parallel audit logging to different systems.

/**
 * Routes audit events to multiple destinations
 */
public class MultiDestAuditProvider extends BaseAuditHandler {
    /**
     * Add single audit provider to the multi-destination list
     * @param provider AuditHandler provider to add
     */
    public void addAuditProvider(AuditHandler provider);
    
    /**
     * Add multiple audit providers to the multi-destination list
     * @param providers List<AuditHandler> providers to add
     */
    public void addAuditProviders(List<AuditHandler> providers);
    
    /**
     * Remove audit provider from the multi-destination list
     * @param provider AuditHandler provider to remove
     */
    public void removeAuditProvider(AuditHandler provider);
    
    /**
     * Get list of configured audit providers
     * @return List<AuditHandler> current providers
     */
    public List<AuditHandler> getAuditProviders();
    
    /**
     * Log event to all configured providers
     * @param event AuditEventBase event to log
     */
    public void log(AuditEventBase event);
    
    /**
     * Log events to all configured providers
     * @param events Collection<AuditEventBase> events to log
     */
    public void log(Collection<AuditEventBase> events);
    
    /**
     * Initialize all configured providers
     * @param props Properties configuration properties
     */
    public void init(Properties props);
    
    /**
     * Start all configured providers
     */
    public void start();
    
    /**
     * Stop all configured providers
     */
    public void stop();
    
    /**
     * Flush all configured providers
     */
    public void flush();
}

Audit Index Record

Represents audit file index records for spool file management and tracking.

/**
 * Represents audit file index records for spool file management
 */
public class AuditIndexRecord {
    /**
     * Get unique record identifier
     * @return String record ID
     */
    public String getId();
    
    /**
     * Set unique record identifier
     * @param id String record ID
     */
    public void setId(String id);
    
    /**
     * Get file path for this record
     * @return String file path
     */
    public String getFilePath();
    
    /**
     * Set file path for this record
     * @param filePath String file path
     */
    public void setFilePath(String filePath);
    
    /**
     * Get line position in file
     * @return long line position
     */
    public long getLinePosition();
    
    /**
     * Set line position in file
     * @param linePosition long line position
     */
    public void setLinePosition(long linePosition);
    
    /**
     * Get current status of this record
     * @return SPOOL_FILE_STATUS current status
     */
    public SPOOL_FILE_STATUS getStatus();
    
    /**
     * Set status of this record
     * @param status SPOOL_FILE_STATUS new status
     */
    public void setStatus(SPOOL_FILE_STATUS status);
    
    /**
     * Get creation timestamp
     * @return Date creation time
     */
    public Date getCreatedTime();
    
    /**
     * Set creation timestamp
     * @param createdTime Date creation time
     */
    public void setCreatedTime(Date createdTime);
    
    /**
     * Get last attempt timestamp
     * @return Date last attempt time
     */
    public Date getLastAttempt();
    
    /**
     * Set last attempt timestamp
     * @param lastAttempt Date last attempt time
     */
    public void setLastAttempt(Date lastAttempt);
    
    /**
     * Get retry count
     * @return int number of retries
     */
    public int getRetryCount();
    
    /**
     * Set retry count
     * @param retryCount int number of retries
     */
    public void setRetryCount(int retryCount);
}

Usage Examples:

import org.apache.ranger.audit.provider.AsyncAuditProvider;
import org.apache.ranger.audit.provider.MultiDestAuditProvider;
import org.apache.ranger.audit.destination.*;

// Configure asynchronous audit provider
AsyncAuditProvider asyncProvider = new AsyncAuditProvider("async-hdfs", 10000, 5000);
Properties asyncProps = new Properties();
asyncProps.setProperty("xasecure.audit.async.queue.batch.size", "100");
asyncProps.setProperty("xasecure.audit.async.queue.flush.interval", "30000");
asyncProvider.init(asyncProps);
asyncProvider.start();

// Configure multi-destination provider
MultiDestAuditProvider multiProvider = new MultiDestAuditProvider();

// Add HDFS destination
HDFSAuditDestination hdfsDestination = new HDFSAuditDestination();
Properties hdfsProps = new Properties();
hdfsProps.setProperty("xasecure.audit.hdfs.is.enabled", "true");
hdfsProps.setProperty("xasecure.audit.hdfs.destination.directory", "/ranger/audit");
hdfsDestination.init(hdfsProps, "xasecure.audit.hdfs");

// Add Solr destination  
SolrAuditDestination solrDestination = new SolrAuditDestination();
Properties solrProps = new Properties();
solrProps.setProperty("xasecure.audit.solr.is.enabled", "true");
solrProps.setProperty("xasecure.audit.solr.urls", "http://solr:8983/solr");
solrDestination.init(solrProps, "xasecure.audit.solr");

// Add destinations to multi-provider
multiProvider.addAuditProvider(hdfsDestination);
multiProvider.addAuditProvider(solrDestination);
multiProvider.init(new Properties());
multiProvider.start();

// Log events - will go to both HDFS and Solr
AuthzAuditEvent event = new AuthzAuditEvent();
// ... configure event ...
multiProvider.log(event);

// Async logging (non-blocking)
asyncProvider.log(event);

// Batch processing
List<AuditEventBase> events = Arrays.asList(event1, event2, event3);
multiProvider.log(events);

// Graceful shutdown
multiProvider.flush(); // Ensure all events are processed
multiProvider.stop();
asyncProvider.waitToComplete(); // Wait for async processing to finish
asyncProvider.stop();

Async Queue Implementation

Non-blocking asynchronous queue with unlimited capacity using LinkedBlockingQueue internally.

/**
 * Non-blocking asynchronous queue with background processing
 */
public class AuditAsyncQueue extends AuditQueue implements Runnable {
    /**
     * Create async queue with consumer handler
     * @param consumer AuditHandler consumer to process events
     */
    public AuditAsyncQueue(AuditHandler consumer);
    
    /**
     * Log audit event asynchronously (non-blocking)
     * @param event AuditEventBase event to queue
     * @return boolean true if queued successfully
     */
    public boolean log(AuditEventBase event);
    
    /**
     * Log collection of audit events
     * @param events Collection<AuditEventBase> events to queue
     * @return boolean true if all events queued
     */
    public boolean log(Collection<AuditEventBase> events);
    
    /**
     * Start the queue and consumer thread
     */
    public void start();
    
    /**
     * Stop the queue and drain remaining events
     */
    public void stop();
    
    /**
     * Get current queue size
     * @return int number of events in queue
     */
    public int size();
}

Batch Queue Implementation

Blocking queue that batches audit events before sending to consumer with file spooling support.

/**
 * Blocking queue with batching and file spooling capabilities
 */
public class AuditBatchQueue extends AuditQueue implements Runnable {
    /**
     * Create batch queue with consumer handler
     * @param consumer AuditHandler consumer to process batched events
     */
    public AuditBatchQueue(AuditHandler consumer);
    
    /**
     * Initialize with configuration properties
     * @param prop Properties configuration properties
     * @param basePropertyName String base property name
     */
    public void init(Properties prop, String basePropertyName);
    
    /**
     * Log audit event (blocking if queue full)
     * @param event AuditEventBase event to queue
     * @return boolean true if queued successfully
     */
    public boolean log(AuditEventBase event);
    
    /**
     * Start the queue, consumer and file spooler
     */
    public synchronized void start();
    
    /**
     * Wait for completion with timeout
     * @param timeout long timeout in milliseconds
     */
    public void waitToComplete(long timeout);
    
    /**
     * Flush pending events to consumer
     */
    public void flush();
}

File Queue Implementation

File-based queue providing persistence and failover through local filesystem spooling.

/**
 * File-based queue with persistence and failover capabilities
 */
public class AuditFileQueue extends BaseAuditHandler {
    /**
     * Create file queue with consumer handler
     * @param consumer AuditHandler consumer to process events from files
     */
    public AuditFileQueue(AuditHandler consumer);
    
    /**
     * Initialize with configuration properties
     * @param prop Properties configuration properties
     * @param basePropertyName String base property name
     */
    public void init(Properties prop, String basePropertyName);
    
    /**
     * Log audit event to file spool
     * @param event AuditEventBase event to spool to file
     * @return boolean true if spooled successfully
     */
    public boolean log(AuditEventBase event);
    
    /**
     * Start the consumer and file spooler
     */
    public void start();
    
    /**
     * Wait for completion with timeout
     * @param timeout long timeout in milliseconds
     */
    public void waitToComplete(long timeout);
}

Summary Queue Implementation

Queue that aggregates and summarizes similar audit events to reduce volume.

/**
 * Queue that summarizes similar audit events before sending to consumer
 */
public class AuditSummaryQueue extends AuditQueue implements Runnable {
    /**
     * Create summary queue with consumer handler
     * @param consumer AuditHandler consumer to process summarized events
     */
    public AuditSummaryQueue(AuditHandler consumer);
    
    /**
     * Initialize with summary-specific properties
     * @param props Properties configuration properties
     * @param propPrefix String property prefix
     */
    public void init(Properties props, String propPrefix);
    
    /**
     * Log audit event (adds to summary aggregation)
     * @param event AuditEventBase event to add to summary
     * @return boolean true if processed successfully
     */
    public boolean log(AuditEventBase event);
    
    /**
     * Start the queue and consumer thread
     */
    public void start();
    
    /**
     * Stop the queue and send remaining summaries
     */
    public void stop();
}

Configuration Properties

Key configuration properties for queue and async processing:

Async Provider Configuration:

  • xasecure.audit.async.queue.batch.size: Batch size for async processing
  • xasecure.audit.async.queue.flush.interval: Flush interval in milliseconds
  • xasecure.audit.async.queue.max.size: Maximum queue size

Common Queue Configuration:

  • batch.size: Events per batch (default: 1000)
  • queue.size: Maximum queue capacity (default: 1024*1024)
  • batch.interval.ms: Batch processing interval (default: 3000ms)

Summary Queue Configuration:

  • summary.interval.ms: Summary aggregation interval (default: 5000ms)

File Spooling Configuration:

  • filespool.enable: Enable file spooling for failover
  • filespool.drain.threshold.percent: Threshold for draining spool files
  • filespool.drain.full.wait.ms: Wait time for full drain completion
  • xasecure.audit.spool.local.dir: Local spool directory
  • xasecure.audit.spool.local.filename: Spool filename pattern

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-ranger--ranger-plugins-audit

docs

audit-destinations.md

core-framework.md

index.md

queue-async.md

writers-formats.md

tile.json