CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-data-fabric

Core data management capabilities for CDAP including dataset operations, metadata management, lineage tracking, audit functionality, and data registry services for Hadoop-based applications.

Pending
Overview
Eval results
Files

usage-registry.mddocs/

Usage Registry

Program-dataset relationship tracking for governance, lineage analysis, and impact assessment with comprehensive query capabilities. The Usage Registry provides essential functionality for understanding dependencies between programs and datasets, enabling effective governance and impact analysis across the CDAP platform.

Capabilities

Core Usage Registry Operations

The primary interface for tracking and querying program-dataset relationships with comprehensive bidirectional queries.

public interface UsageRegistry extends UsageWriter {
    // Application lifecycle management
    void unregister(ApplicationId applicationId);
    
    // Dataset relationship queries
    Set<DatasetId> getDatasets(ApplicationId id);
    Set<DatasetId> getDatasets(ProgramId id);
    
    // Stream relationship queries  
    Set<StreamId> getStreams(ApplicationId id);
    Set<StreamId> getStreams(ProgramId id);
    
    // Program relationship queries (reverse lookups)
    Set<ProgramId> getPrograms(DatasetId id);
    Set<ProgramId> getPrograms(StreamId id);
}

public interface UsageWriter {
    // Usage registration methods for tracking relationships
    void register(ProgramId programId, DatasetId datasetId);
    void register(ProgramId programId, StreamId streamId);
    void registerAll(Iterable<? extends ProgramId> programIds, DatasetId datasetId);
    void registerAll(Iterable<? extends ProgramId> programIds, StreamId streamId);
}

Usage Registry Implementations

Different implementations of the Usage Registry for various deployment scenarios and performance requirements.

// Standard usage registry implementation
public class BasicUsageRegistry implements UsageRegistry {
    // Full-featured usage tracking with persistent storage
}

// No-operation implementation for testing
public class NoOpUsageRegistry implements UsageRegistry {
    // Null object pattern implementation for testing scenarios
}

// Messaging-based usage writer for distributed systems
public class MessagingUsageWriter implements UsageWriter {
    // Asynchronous usage tracking via messaging system
}

Usage Data Models

Internal data structures for representing and managing usage relationships.

// Dataset usage information structure
public class DatasetUsage {
    public DatasetId getDatasetId();
    public Set<ProgramId> getPrograms();
    public long getTimestamp();
}

// Usage record key for efficient storage and retrieval
public class DatasetUsageKey {
    public DatasetId getDatasetId();
    public ProgramId getProgramId();
    public String getKey();
}

Usage Examples

Basic Usage Registration and Queries

// Access usage registry (typically injected)
UsageRegistry usageRegistry = // ... obtain instance

// Define application and program identifiers
ApplicationId appId = NamespaceId.DEFAULT.app("dataProcessingApp");
ProgramId mapReduceProgram = appId.mr("dataProcessor");
ProgramId workflowProgram = appId.workflow("dataWorkflow");

// Define dataset and stream identifiers
DatasetId inputDataset = NamespaceId.DEFAULT.dataset("rawData");
DatasetId outputDataset = NamespaceId.DEFAULT.dataset("processedData");
StreamId inputStream = NamespaceId.DEFAULT.stream("events");

// Register program-dataset relationships
usageRegistry.register(mapReduceProgram, inputDataset);
usageRegistry.register(mapReduceProgram, outputDataset);
usageRegistry.register(workflowProgram, inputDataset);

// Register program-stream relationships
usageRegistry.register(mapReduceProgram, inputStream);

// Query datasets used by application
Set<DatasetId> appDatasets = usageRegistry.getDatasets(appId);
System.out.println("Application datasets: " + appDatasets);

// Query datasets used by specific program
Set<DatasetId> programDatasets = usageRegistry.getDatasets(mapReduceProgram);
System.out.println("MapReduce program datasets: " + programDatasets);

// Query streams used by program
Set<StreamId> programStreams = usageRegistry.getStreams(mapReduceProgram);
System.out.println("MapReduce program streams: " + programStreams);

Impact Analysis and Dependency Discovery

// Find all programs that use a specific dataset (impact analysis)
DatasetId criticalDataset = NamespaceId.DEFAULT.dataset("customerData");
Set<ProgramId> affectedPrograms = usageRegistry.getPrograms(criticalDataset);

System.out.println("Programs affected by dataset changes:");
for (ProgramId program : affectedPrograms) {
    System.out.println("  - " + program.getApplication() + "." + program.getProgram());
}

// Find all programs consuming from a stream
StreamId eventStream = NamespaceId.DEFAULT.stream("userEvents");
Set<ProgramId> streamConsumers = usageRegistry.getPrograms(eventStream);

System.out.println("Programs consuming from stream:");
for (ProgramId consumer : streamConsumers) {
    System.out.println("  - " + consumer.getApplication() + "." + consumer.getProgram());
}

// Comprehensive dependency analysis
Map<String, Set<String>> dependencies = new HashMap<>();
for (ProgramId program : affectedPrograms) {
    Set<DatasetId> datasets = usageRegistry.getDatasets(program);
    Set<StreamId> streams = usageRegistry.getStreams(program);
    
    Set<String> resources = new HashSet<>();
    datasets.forEach(ds -> resources.add("dataset:" + ds.getDataset()));
    streams.forEach(st -> resources.add("stream:" + st.getStream()));
    
    dependencies.put(program.toString(), resources);
}

System.out.println("Full dependency map: " + dependencies);

Batch Registration Operations

// Register multiple programs with the same dataset
List<ProgramId> batchProcessingPrograms = Arrays.asList(
    appId.mr("processor1"),
    appId.mr("processor2"), 
    appId.mr("processor3"),
    appId.workflow("batchWorkflow")
);

DatasetId sharedDataset = NamespaceId.DEFAULT.dataset("sharedLookupData");

// Efficient batch registration
usageRegistry.registerAll(batchProcessingPrograms, sharedDataset);

// Verify all programs are registered
for (ProgramId program : batchProcessingPrograms) {
    Set<DatasetId> datasets = usageRegistry.getDatasets(program);
    assert datasets.contains(sharedDataset);
}

// Batch registration for stream relationships
StreamId eventStream = NamespaceId.DEFAULT.stream("auditEvents");
usageRegistry.registerAll(batchProcessingPrograms, eventStream);

Application Lifecycle Management

// Application deployment - register all program relationships
ApplicationId newApp = NamespaceId.DEFAULT.app("analyticsApp");
ProgramId sparkProgram = newApp.spark("analyticsEngine");
ProgramId serviceProgram = newApp.service("analyticsService");

// Register program dependencies
usageRegistry.register(sparkProgram, NamespaceId.DEFAULT.dataset("rawAnalytics"));
usageRegistry.register(sparkProgram, NamespaceId.DEFAULT.dataset("processedAnalytics"));
usageRegistry.register(serviceProgram, NamespaceId.DEFAULT.dataset("processedAnalytics"));

// Verify application dependencies before deployment
Set<DatasetId> requiredDatasets = usageRegistry.getDatasets(newApp);
for (DatasetId dataset : requiredDatasets) {
    // Check if datasets exist and are accessible
    verifyDatasetExists(dataset);
}

// Application undeployment - clean up usage tracking
System.out.println("Unregistering application: " + newApp);
usageRegistry.unregister(newApp);

// Verify cleanup
Set<DatasetId> remainingDatasets = usageRegistry.getDatasets(newApp);
assert remainingDatasets.isEmpty() : "Application still has registered datasets";

Governance and Compliance Reporting

// Generate compliance report for data usage
public void generateUsageReport(NamespaceId namespace) {
    Map<String, List<String>> datasetUsage = new HashMap<>();
    Map<String, List<String>> streamUsage = new HashMap<>();
    
    // Get all applications in namespace
    List<ApplicationId> applications = getApplicationsInNamespace(namespace);
    
    for (ApplicationId app : applications) {
        Set<DatasetId> datasets = usageRegistry.getDatasets(app);
        Set<StreamId> streams = usageRegistry.getStreams(app);
        
        for (DatasetId dataset : datasets) {
            datasetUsage.computeIfAbsent(dataset.getDataset(), k -> new ArrayList<>())
                        .add(app.getApplication());
        }
        
        for (StreamId stream : streams) {
            streamUsage.computeIfAbsent(stream.getStream(), k -> new ArrayList<>())
                      .add(app.getApplication());
        }
    }
    
    System.out.println("=== Dataset Usage Report ===");
    datasetUsage.forEach((dataset, apps) -> {
        System.out.println(dataset + " used by: " + String.join(", ", apps));
    });
    
    System.out.println("\n=== Stream Usage Report ===");
    streamUsage.forEach((stream, apps) -> {
        System.out.println(stream + " consumed by: " + String.join(", ", apps));
    });
}

// Find unused datasets for cleanup
public Set<DatasetId> findUnusedDatasets(NamespaceId namespace) {
    Set<DatasetId> allDatasets = getAllDatasetsInNamespace(namespace);
    Set<DatasetId> usedDatasets = new HashSet<>();
    
    List<ApplicationId> applications = getApplicationsInNamespace(namespace);
    for (ApplicationId app : applications) {
        usedDatasets.addAll(usageRegistry.getDatasets(app));
    }
    
    Set<DatasetId> unusedDatasets = new HashSet<>(allDatasets);
    unusedDatasets.removeAll(usedDatasets);
    
    return unusedDatasets;
}

Usage Writer for Real-time Tracking

// Real-time usage tracking during program execution
public class ProgramExecutionTracker {
    private final UsageWriter usageWriter;
    
    public ProgramExecutionTracker(UsageWriter usageWriter) {
        this.usageWriter = usageWriter;
    }
    
    public void trackDatasetAccess(ProgramId program, DatasetId dataset) {
        // Register usage relationship in real-time
        usageWriter.register(program, dataset);
        
        // Log for audit purposes
        System.out.println("Registered dataset access: " + program + " -> " + dataset);
    }
    
    public void trackStreamAccess(ProgramId program, StreamId stream) {
        // Register stream usage relationship
        usageWriter.register(program, stream);
        
        // Log for audit purposes
        System.out.println("Registered stream access: " + program + " -> " + stream);
    }
}

// Usage in program runtime
ProgramExecutionTracker tracker = new ProgramExecutionTracker(usageRegistry);
ProgramId currentProgram = // ... get current program context

// Track dataset access as it happens
DatasetId dataset = NamespaceId.DEFAULT.dataset("userProfiles");
tracker.trackDatasetAccess(currentProgram, dataset);

// Track stream access
StreamId stream = NamespaceId.DEFAULT.stream("events");
tracker.trackStreamAccess(currentProgram, stream);

Types

// Core entity identifiers
public final class ApplicationId extends EntityId {
    public static ApplicationId of(String namespace, String application);
    public String getApplication();
    public NamespaceId getParent();
    
    // Program ID factory methods
    public ProgramId mr(String program);
    public ProgramId spark(String program);
    public ProgramId service(String program);
    public ProgramId worker(String program);
    public ProgramId workflow(String program);
}

public final class ProgramId extends EntityId {
    public static ProgramId of(String namespace, String application, ProgramType type, String program);
    public String getProgram();
    public ProgramType getType();
    public ApplicationId getParent();
}

public final class DatasetId extends EntityId {
    public static DatasetId of(String namespace, String dataset);
    public String getDataset();
    public NamespaceId getParent();
}

public final class StreamId extends EntityId {
    public static StreamId of(String namespace, String stream);
    public String getStream();
    public NamespaceId getParent();
}

// Program types
public enum ProgramType {
    MAPREDUCE("MapReduce"),
    WORKFLOW("Workflow"),
    SERVICE("Service"),
    SPARK("Spark"),
    WORKER("Worker");
    
    private final String prettyName;
    
    ProgramType(String prettyName) {
        this.prettyName = prettyName;
    }
    
    public String getPrettyName() {
        return prettyName;
    }
}

// Usage data structures
public final class DatasetUsage {
    public DatasetId getDatasetId();
    public Set<ProgramId> getPrograms();
    public long getCreationTime();
    public long getLastAccessTime();
}

public final class DatasetUsageKey {
    public DatasetId getDatasetId();
    public ProgramId getProgramId();
    public long getTimestamp();
    
    public String getKey();
    public static DatasetUsageKey of(DatasetId datasetId, ProgramId programId);
}

// Exception types
public class UsageException extends Exception {
    public UsageException(String message);
    public UsageException(String message, Throwable cause);
}

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-data-fabric

docs

audit-compliance.md

dataset-management.md

index.md

metadata-management.md

namespace-management.md

stream-processing.md

transaction-management.md

usage-registry.md

tile.json