CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-data-fabric

Core data management capabilities for CDAP including dataset operations, metadata management, lineage tracking, audit functionality, and data registry services for Hadoop-based applications.

Pending
Overview
Eval results
Files

audit-compliance.mddocs/

Audit and Compliance

Comprehensive audit logging system for compliance, monitoring, and debugging with pluggable publishers and structured payload builders. The audit functionality provides essential capabilities for tracking all data operations, changes, and access patterns within the CDAP platform for regulatory compliance and operational monitoring.

Capabilities

Core Audit Publishing

The primary interface for publishing audit events with flexible payload support for different entity types.

public interface AuditPublisher {
    // Primary audit publishing methods
    void publish(EntityId entityId, AuditType auditType, AuditPayload auditPayload);
    void publish(MetadataEntity metadataEntity, AuditType auditType, AuditPayload auditPayload);
}

Audit Publisher Implementations

Different implementations of the AuditPublisher for various deployment scenarios and storage requirements.

// Standard audit publisher with configurable backends
public class DefaultAuditPublisher implements AuditPublisher {
    // Full-featured audit publishing with persistent storage and messaging
}

// In-memory audit publisher for testing and development
public class InMemoryAuditPublisher implements AuditPublisher {
    // Fast in-memory audit storage for testing scenarios
    public List<AuditMessage> getAuditMessages();
    public void clear();
}

// No-operation publisher for environments where auditing is disabled
public class NoOpAuditPublisher implements AuditPublisher {
    // Null object pattern implementation for non-auditing environments
}

Audit Payload Builders

Specialized builders for creating structured audit payloads for different types of operations.

// Metadata-specific audit payload builder
public class MetadataPayloadBuilder {
    public static AuditPayload buildForPropertyChange(Map<String, String> oldProperties, 
                                                      Map<String, String> newProperties);
    public static AuditPayload buildForTagChange(Set<String> oldTags, Set<String> newTags);
    public static AuditPayload buildForMetadataChange(Metadata oldMetadata, Metadata newMetadata);
    public static AuditPayload buildForSearch(SearchRequest searchRequest, SearchResults results);
}

// Dataset operation audit payload builder
public class DatasetPayloadBuilder {
    public static AuditPayload buildForDatasetCreation(DatasetId datasetId, DatasetProperties properties);
    public static AuditPayload buildForDatasetAccess(DatasetId datasetId, AccessType accessType);
    public static AuditPayload buildForDatasetDeletion(DatasetId datasetId);
    public static AuditPayload buildForDatasetUpdate(DatasetId datasetId, DatasetProperties oldProps, 
                                                     DatasetProperties newProps);
}

// Program execution audit payload builder
public class ProgramPayloadBuilder {
    public static AuditPayload buildForProgramStart(ProgramId programId, Map<String, String> arguments);
    public static AuditPayload buildForProgramStop(ProgramId programId, ProgramStatus status);
    public static AuditPayload buildForProgramSuspend(ProgramId programId);
    public static AuditPayload buildForProgramResume(ProgramId programId);
}

Audit Types and Categories

Enumeration of audit types covering all major operations and events in the CDAP platform.

public enum AuditType {
    // Entity lifecycle operations
    CREATE,
    UPDATE, 
    DELETE,
    
    // Access operations
    ACCESS,
    READ,
    WRITE,
    
    // Metadata operations
    METADATA_CHANGE,
    TAG_CHANGE,
    PROPERTY_CHANGE,
    
    // Security operations
    AUTHORIZATION_SUCCESS,
    AUTHORIZATION_FAILURE,
    AUTHENTICATION,
    
    // Program operations
    PROGRAM_START,
    PROGRAM_STOP,
    PROGRAM_SUSPEND,
    PROGRAM_RESUME,
    
    // Search operations
    SEARCH,
    
    // Administrative operations
    NAMESPACE_CREATE,
    NAMESPACE_DELETE,
    NAMESPACE_UPDATE
}

Usage Examples

Basic Audit Publishing

// Access audit publisher (typically injected)
AuditPublisher auditPublisher = // ... obtain instance

// Audit dataset creation
DatasetId datasetId = NamespaceId.DEFAULT.dataset("userProfiles");
DatasetProperties properties = DatasetProperties.builder()
    .add("hbase.splits", "10")
    .add("format", "avro")
    .build();

AuditPayload creationPayload = DatasetPayloadBuilder.buildForDatasetCreation(datasetId, properties);
auditPublisher.publish(datasetId, AuditType.CREATE, creationPayload);

// Audit dataset access
AuditPayload accessPayload = DatasetPayloadBuilder.buildForDatasetAccess(datasetId, AccessType.READ_WRITE);
auditPublisher.publish(datasetId, AuditType.ACCESS, accessPayload);

// Audit metadata changes
MetadataEntity entity = MetadataEntity.ofDataset(NamespaceId.DEFAULT, "userProfiles");
Map<String, String> oldProperties = Map.of("environment", "dev");
Map<String, String> newProperties = Map.of("environment", "production", "owner", "team-alpha");

AuditPayload metadataPayload = MetadataPayloadBuilder.buildForPropertyChange(oldProperties, newProperties);
auditPublisher.publish(entity, AuditType.METADATA_CHANGE, metadataPayload);

Program Execution Auditing

// Audit program lifecycle events
public class ProgramAuditTracker {
    private final AuditPublisher auditPublisher;
    
    public ProgramAuditTracker(AuditPublisher auditPublisher) {
        this.auditPublisher = auditPublisher;
    }
    
    public void auditProgramStart(ProgramId programId, Map<String, String> runtimeArgs) {
        AuditPayload payload = ProgramPayloadBuilder.buildForProgramStart(programId, runtimeArgs);
        auditPublisher.publish(programId, AuditType.PROGRAM_START, payload);
        
        System.out.println("Audited program start: " + programId);
    }
    
    public void auditProgramStop(ProgramId programId, ProgramStatus finalStatus) {
        AuditPayload payload = ProgramPayloadBuilder.buildForProgramStop(programId, finalStatus);
        auditPublisher.publish(programId, AuditType.PROGRAM_STOP, payload);
        
        System.out.println("Audited program stop: " + programId + " with status: " + finalStatus);
    }
    
    public void auditProgramSuspend(ProgramId programId) {
        AuditPayload payload = ProgramPayloadBuilder.buildForProgramSuspend(programId);
        auditPublisher.publish(programId, AuditType.PROGRAM_SUSPEND, payload);
    }
    
    public void auditProgramResume(ProgramId programId) {
        AuditPayload payload = ProgramPayloadBuilder.buildForProgramResume(programId);
        auditPublisher.publish(programId, AuditType.PROGRAM_RESUME, payload);
    }
}

// Usage in program runtime
ProgramAuditTracker tracker = new ProgramAuditTracker(auditPublisher);
ProgramId programId = NamespaceId.DEFAULT.app("dataProcessor").mr("batchProcessor");

Map<String, String> args = Map.of(
    "input.path", "/data/input/2023-06-20",
    "output.path", "/data/output/2023-06-20",
    "batch.size", "1000"
);

tracker.auditProgramStart(programId, args);
// ... program execution ...
tracker.auditProgramStop(programId, ProgramStatus.COMPLETED);

Security and Authorization Auditing

// Security audit helper
public class SecurityAuditHelper {
    private final AuditPublisher auditPublisher;
    
    public SecurityAuditHelper(AuditPublisher auditPublisher) {
        this.auditPublisher = auditPublisher;
    }
    
    public void auditAuthorizationSuccess(EntityId entityId, String user, String action) {
        AuditPayload payload = AuditPayload.builder()
            .add("user", user)
            .add("action", action)
            .add("result", "SUCCESS")
            .add("timestamp", String.valueOf(System.currentTimeMillis()))
            .build();
        
        auditPublisher.publish(entityId, AuditType.AUTHORIZATION_SUCCESS, payload);
    }
    
    public void auditAuthorizationFailure(EntityId entityId, String user, String action, String reason) {
        AuditPayload payload = AuditPayload.builder()
            .add("user", user)
            .add("action", action)
            .add("result", "FAILURE")
            .add("reason", reason)
            .add("timestamp", String.valueOf(System.currentTimeMillis()))
            .build();
        
        auditPublisher.publish(entityId, AuditType.AUTHORIZATION_FAILURE, payload);
    }
    
    public void auditAuthentication(String user, boolean success, String method) {
        // Use system namespace for authentication events
        EntityId systemEntity = NamespaceId.SYSTEM;
        
        AuditPayload payload = AuditPayload.builder()
            .add("user", user)
            .add("method", method)
            .add("success", String.valueOf(success))
            .add("timestamp", String.valueOf(System.currentTimeMillis()))
            .build();
        
        auditPublisher.publish(systemEntity, AuditType.AUTHENTICATION, payload);
    }
}

// Usage in security layer
SecurityAuditHelper securityAudit = new SecurityAuditHelper(auditPublisher);
DatasetId sensitiveDataset = NamespaceId.DEFAULT.dataset("customerPII");

// Audit successful authorization
securityAudit.auditAuthorizationSuccess(sensitiveDataset, "analyst@company.com", "READ");

// Audit failed authorization
securityAudit.auditAuthorizationFailure(sensitiveDataset, "intern@company.com", "WRITE", 
                                       "Insufficient privileges for PII data");

// Audit authentication events
securityAudit.auditAuthentication("admin@company.com", true, "KERBEROS");

Metadata Change Auditing

// Comprehensive metadata audit tracking
public class MetadataAuditTracker {
    private final AuditPublisher auditPublisher;
    
    public MetadataAuditTracker(AuditPublisher auditPublisher) {
        this.auditPublisher = auditPublisher;
    }
    
    public void auditMetadataChange(MetadataEntity entity, Metadata oldMetadata, Metadata newMetadata) {
        // Audit complete metadata change
        AuditPayload completePayload = MetadataPayloadBuilder.buildForMetadataChange(oldMetadata, newMetadata);
        auditPublisher.publish(entity, AuditType.METADATA_CHANGE, completePayload);
        
        // Audit specific property changes
        Map<String, String> oldProps = oldMetadata.getProperties();
        Map<String, String> newProps = newMetadata.getProperties();
        
        if (!oldProps.equals(newProps)) {
            AuditPayload propPayload = MetadataPayloadBuilder.buildForPropertyChange(oldProps, newProps);
            auditPublisher.publish(entity, AuditType.PROPERTY_CHANGE, propPayload);
        }
        
        // Audit specific tag changes
        Set<String> oldTags = oldMetadata.getTags();
        Set<String> newTags = newMetadata.getTags();
        
        if (!oldTags.equals(newTags)) {
            AuditPayload tagPayload = MetadataPayloadBuilder.buildForTagChange(oldTags, newTags);
            auditPublisher.publish(entity, AuditType.TAG_CHANGE, tagPayload);
        }
    }
    
    public void auditSearchOperation(SearchRequest request, SearchResults results) {
        // Create payload for search auditing (for compliance tracking)
        AuditPayload searchPayload = MetadataPayloadBuilder.buildForSearch(request, results);
        
        // Use system namespace for search operations
        MetadataEntity systemEntity = MetadataEntity.ofNamespace(NamespaceId.SYSTEM);
        auditPublisher.publish(systemEntity, AuditType.SEARCH, searchPayload);
    }
}

// Usage in metadata service
MetadataAuditTracker metadataAudit = new MetadataAuditTracker(auditPublisher);

Metadata oldMetadata = // ... get existing metadata
Metadata newMetadata = // ... get updated metadata

metadataAudit.auditMetadataChange(entity, oldMetadata, newMetadata);

Audit Message Retrieval and Analysis

// For testing and development environments using InMemoryAuditPublisher
public class AuditAnalyzer {
    private final InMemoryAuditPublisher inMemoryPublisher;
    
    public AuditAnalyzer(InMemoryAuditPublisher publisher) {
        this.inMemoryPublisher = publisher;
    }
    
    public void analyzeAuditMessages() {
        List<AuditMessage> messages = inMemoryPublisher.getAuditMessages();
        
        // Analyze by audit type
        Map<AuditType, Long> typeCount = messages.stream()
            .collect(Collectors.groupingBy(AuditMessage::getAuditType, Collectors.counting()));
        
        System.out.println("=== Audit Summary ===");
        typeCount.forEach((type, count) -> 
            System.out.println(type + ": " + count + " events"));
        
        // Analyze by entity type
        Map<String, Long> entityCount = messages.stream()
            .collect(Collectors.groupingBy(msg -> msg.getEntityId().getClass().getSimpleName(), 
                                         Collectors.counting()));
        
        System.out.println("\n=== Entity Type Summary ===");
        entityCount.forEach((entityType, count) -> 
            System.out.println(entityType + ": " + count + " events"));
        
        // Find security-related events
        List<AuditMessage> securityEvents = messages.stream()
            .filter(msg -> msg.getAuditType() == AuditType.AUTHORIZATION_FAILURE || 
                          msg.getAuditType() == AuditType.AUTHENTICATION)
            .collect(Collectors.toList());
        
        System.out.println("\n=== Security Events ===");
        securityEvents.forEach(event -> 
            System.out.println(event.getTimestamp() + ": " + event.getAuditType() + 
                             " for " + event.getEntityId()));
    }
    
    public void clearAuditHistory() {
        System.out.println("Clearing audit history (" + 
                          inMemoryPublisher.getAuditMessages().size() + " messages)");
        inMemoryPublisher.clear();
    }
}

Namespace Operation Auditing

// Audit namespace operations
public void auditNamespaceOperations(NamespaceStore namespaceStore, AuditPublisher auditPublisher) {
    
    // Audit namespace creation
    NamespaceMeta namespace = NamespaceMeta.builder()
        .setName("audit-test")
        .setDescription("Test namespace for auditing")
        .build();
    
    AuditPayload creationPayload = AuditPayload.builder()
        .add("operation", "create")
        .add("namespace", namespace.getName())
        .add("description", namespace.getDescription())
        .add("creator", "admin-user")
        .build();
    
    try {
        namespaceStore.create(namespace);
        auditPublisher.publish(NamespaceId.of(namespace.getName()), 
                              AuditType.NAMESPACE_CREATE, creationPayload);
    } catch (NamespaceAlreadyExistsException e) {
        // Audit the attempt even if it failed
        auditPayload.add("error", e.getMessage());
        auditPublisher.publish(NamespaceId.of(namespace.getName()), 
                              AuditType.NAMESPACE_CREATE, creationPayload);
    }
    
    // Audit namespace deletion
    NamespaceId namespaceToDelete = NamespaceId.of("audit-test");
    AuditPayload deletionPayload = AuditPayload.builder()
        .add("operation", "delete")
        .add("namespace", namespaceToDelete.getNamespace())
        .add("deletor", "admin-user")
        .add("timestamp", String.valueOf(System.currentTimeMillis()))
        .build();
    
    try {
        namespaceStore.delete(namespaceToDelete);
        auditPublisher.publish(namespaceToDelete, AuditType.NAMESPACE_DELETE, deletionPayload);
    } catch (NamespaceNotFoundException | NamespaceCannotBeDeletedException e) {
        deletionPayload.add("error", e.getMessage());
        auditPublisher.publish(namespaceToDelete, AuditType.NAMESPACE_DELETE, deletionPayload);
    }
}

Types

// Audit payload structure
public final class AuditPayload {
    public static Builder builder();
    
    public Map<String, String> getPayload();
    public String get(String key);
    public boolean contains(String key);
    
    public static class Builder {
        public Builder add(String key, String value);
        public Builder addAll(Map<String, String> properties);
        public AuditPayload build();
    }
}

// Audit message structure
public final class AuditMessage {
    public EntityId getEntityId();
    public AuditType getAuditType();
    public AuditPayload getPayload();
    public long getTimestamp();
    public String getUser();
    public String getVersion();
}

// Audit types enumeration
public enum AuditType {
    CREATE("Entity creation"),
    UPDATE("Entity update"),
    DELETE("Entity deletion"),
    ACCESS("Entity access"),
    READ("Read operation"),
    WRITE("Write operation"),
    METADATA_CHANGE("Metadata change"),
    TAG_CHANGE("Tag change"),
    PROPERTY_CHANGE("Property change"),
    AUTHORIZATION_SUCCESS("Authorization success"),
    AUTHORIZATION_FAILURE("Authorization failure"),
    AUTHENTICATION("Authentication event"),
    PROGRAM_START("Program start"),
    PROGRAM_STOP("Program stop"),
    PROGRAM_SUSPEND("Program suspend"),
    PROGRAM_RESUME("Program resume"),
    SEARCH("Search operation"),
    NAMESPACE_CREATE("Namespace creation"),
    NAMESPACE_DELETE("Namespace deletion"),
    NAMESPACE_UPDATE("Namespace update");
    
    private final String description;
    
    AuditType(String description) {
        this.description = description;
    }
    
    public String getDescription() {
        return description;
    }
}

// Program status enumeration for audit tracking
public enum ProgramStatus {
    PENDING,
    STARTING,
    RUNNING,
    SUSPENDED,
    RESUMING,
    STOPPING,
    STOPPED,
    COMPLETED,
    FAILED,
    KILLED
}

// Exception types
public class AuditException extends Exception {
    public AuditException(String message);
    public AuditException(String message, Throwable cause);
}

public class AuditPublishException extends AuditException {
    public AuditPublishException(String message);
    public AuditPublishException(String message, Throwable cause);
}

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-data-fabric

docs

audit-compliance.md

dataset-management.md

index.md

metadata-management.md

namespace-management.md

stream-processing.md

transaction-management.md

usage-registry.md

tile.json