CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap-client

CDAP Java Client library providing programmatic APIs for interacting with the CDAP platform

Pending
Overview
Eval results
Files

data-operations.mddocs/

Data Operations and Utilities

Additional data operations including lineage tracking, preferences management, dataset modules, workflow management, and system utilities that complement the core CDAP functionality.

LineageClient

public class LineageClient {
    // Constructors
    public LineageClient(ClientConfig config);
    public LineageClient(ClientConfig config, RESTClient restClient);
    
    // Data lineage operations
    public Lineage getLineage(DatasetId dataset, long startTime, long endTime, int levels);
    public FieldLineage getFieldLineage(DatasetId dataset, String field, long startTime, long endTime);
    public Set<DatasetId> getDatasetDependencies(DatasetId dataset, long startTime, long endTime);
    public Set<ProgramId> getDatasetConsumers(DatasetId dataset, long startTime, long endTime);
    public Set<ProgramId> getDatasetProducers(DatasetId dataset, long startTime, long endTime);
}

PreferencesClient

public class PreferencesClient {
    // Constructors
    public PreferencesClient(ClientConfig config);
    public PreferencesClient(ClientConfig config, RESTClient restClient);
    
    // Namespace preferences
    public void setPreferences(NamespaceId namespace, Map<String, String> preferences);
    public Map<String, String> getPreferences(NamespaceId namespace);
    public void deletePreferences(NamespaceId namespace);
    
    // Application preferences
    public void setApplicationPreferences(ApplicationId application, Map<String, String> preferences);
    public Map<String, String> getApplicationPreferences(ApplicationId application);
    public void deleteApplicationPreferences(ApplicationId application);
    
    // Program preferences
    public void setProgramPreferences(ProgramId program, Map<String, String> preferences);
    public Map<String, String> getProgramPreferences(ProgramId program);
    public void deleteProgramPreferences(ProgramId program);
    
    // Resolved preferences (with inheritance)
    public Map<String, String> getResolvedPreferences(ProgramId program);
}

Dataset Module and Type Management

DatasetModuleClient

public class DatasetModuleClient {
    // Constructors
    public DatasetModuleClient(ClientConfig config);
    public DatasetModuleClient(ClientConfig config, RESTClient restClient);
    
    // Dataset module management
    public void deploy(NamespaceId namespace, String moduleName, String className, File jarFile);
    public void deploy(DatasetModuleId moduleId, String className, File jarFile);
    public List<DatasetModuleMeta> list(NamespaceId namespace);
    public DatasetModuleMeta get(DatasetModuleId moduleId);
    public void delete(DatasetModuleId moduleId);
    public void deleteAll(NamespaceId namespace);
}

DatasetTypeClient

public class DatasetTypeClient {
    // Constructors
    public DatasetTypeClient(ClientConfig config);
    public DatasetTypeClient(ClientConfig config, RESTClient restClient);
    
    // Dataset type operations
    public List<DatasetTypeMeta> list(NamespaceId namespace);
    public DatasetTypeMeta get(DatasetTypeId typeId);
}

Workflow Management

WorkflowClient

public class WorkflowClient {
    // Constructors
    public WorkflowClient(ClientConfig config);
    public WorkflowClient(ClientConfig config, RESTClient restClient);
    
    // Workflow token operations
    public WorkflowToken getWorkflowToken(ProgramRunId workflowRun);
    public WorkflowToken getWorkflowToken(ProgramRunId workflowRun, WorkflowToken.Scope scope);
    public WorkflowToken getWorkflowToken(ProgramRunId workflowRun, WorkflowToken.Scope scope, String key);
    
    // Workflow state operations  
    public List<WorkflowNodeStateDetail> getNodeStates(ProgramRunId workflowRun);
    public WorkflowNodeStateDetail getNodeState(ProgramRunId workflowRun, String nodeId);
    
    // Workflow statistics
    public WorkflowStatistics getStatistics(ProgramId workflow, long startTime, long endTime);
}

System Information

MetaClient

public class MetaClient {
    // Constructors
    public MetaClient(ClientConfig config);
    public MetaClient(ClientConfig config, RESTClient restClient);
    
    // System information
    public String getVersion();
    public void ping();
    public Map<String, String> getConfiguration();
    public CDAPCapabilities getCapabilities();
}

Data Lineage Operations

Basic Lineage Tracking

// Get lineage for a dataset
DatasetId userProfiles = DatasetId.of(namespace, "user-profiles");
long endTime = System.currentTimeMillis();
long startTime = endTime - TimeUnit.DAYS.toMillis(7); // Last 7 days
int levels = 3; // 3 levels of lineage

Lineage lineage = lineageClient.getLineage(userProfiles, startTime, endTime, levels);

System.out.println("Lineage for dataset: " + userProfiles.getDataset());
System.out.println("Programs that read from this dataset:");
for (ProgramId consumer : lineage.getConsumers()) {
    System.out.println("- " + consumer.getApplication() + "." + consumer.getProgram() + 
                      " (" + consumer.getType() + ")");
}

System.out.println("Programs that write to this dataset:");
for (ProgramId producer : lineage.getProducers()) {
    System.out.println("- " + producer.getApplication() + "." + producer.getProgram() + 
                      " (" + producer.getType() + ")");
}

// Get upstream and downstream datasets
Set<DatasetId> upstreamDatasets = lineage.getUpstreamDatasets();
Set<DatasetId> downstreamDatasets = lineage.getDownstreamDatasets();

System.out.println("Upstream datasets: " + upstreamDatasets);
System.out.println("Downstream datasets: " + downstreamDatasets);

Field-Level Lineage

// Get field lineage for specific field
String fieldName = "user_id";
FieldLineage fieldLineage = lineageClient.getFieldLineage(userProfiles, fieldName, startTime, endTime);

System.out.println("Field lineage for " + fieldName + ":");
System.out.println("Source fields: " + fieldLineage.getSourceFields());
System.out.println("Destination fields: " + fieldLineage.getDestinationFields());

// Analyze field transformations
for (FieldTransformation transformation : fieldLineage.getTransformations()) {
    System.out.println("Transformation in " + transformation.getProgram() + ":");
    System.out.println("  From: " + transformation.getSourceField());
    System.out.println("  To: " + transformation.getDestinationField());
    System.out.println("  Operation: " + transformation.getOperation());
}

Dependency Analysis

// Get dataset dependencies
Set<DatasetId> dependencies = lineageClient.getDatasetDependencies(userProfiles, startTime, endTime);
System.out.println("Datasets that " + userProfiles.getDataset() + " depends on: " + dependencies);

// Get consumers and producers
Set<ProgramId> consumers = lineageClient.getDatasetConsumers(userProfiles, startTime, endTime);
Set<ProgramId> producers = lineageClient.getDatasetProducers(userProfiles, startTime, endTime);

System.out.println("Programs consuming from " + userProfiles.getDataset() + ":");
consumers.forEach(program -> 
    System.out.println("- " + program.getApplication() + "." + program.getProgram()));

System.out.println("Programs producing to " + userProfiles.getDataset() + ":");
producers.forEach(program -> 
    System.out.println("- " + program.getApplication() + "." + program.getProgram()));

Preferences Management

Namespace-Level Preferences

// Set namespace preferences
Map<String, String> namespacePrefs = Map.of(
    "default.batch.size", "1000",
    "default.timeout.minutes", "30",
    "log.level", "INFO",
    "metrics.collection.enabled", "true",
    "data.retention.default.days", "90"
);

preferencesClient.setPreferences(namespace, namespacePrefs);
System.out.println("Set namespace preferences");

// Get namespace preferences
Map<String, String> retrievedPrefs = preferencesClient.getPreferences(namespace);
System.out.println("Namespace preferences: " + retrievedPrefs);

// Update specific preference
Map<String, String> updatedPrefs = new HashMap<>(retrievedPrefs);
updatedPrefs.put("log.level", "DEBUG");
preferencesClient.setPreferences(namespace, updatedPrefs);

Application-Level Preferences

// Set application preferences
ApplicationId appId = ApplicationId.of(namespace, "data-pipeline", "1.0.0");
Map<String, String> appPrefs = Map.of(
    "input.path", "/data/pipeline/input",
    "output.path", "/data/pipeline/output", 
    "parallel.workers", "4",
    "checkpoint.interval", "60000",
    "error.handling", "skip"
);

preferencesClient.setApplicationPreferences(appId, appPrefs);
System.out.println("Set application preferences for: " + appId.getApplication());

// Get application preferences
Map<String, String> appRetrievedPrefs = preferencesClient.getApplicationPreferences(appId);
System.out.println("Application preferences: " + appRetrievedPrefs);

Program-Level Preferences

// Set program-specific preferences
ProgramId workflowId = ProgramId.of(appId, ProgramType.WORKFLOW, "etl-workflow");
Map<String, String> programPrefs = Map.of(
    "workflow.timeout", "7200000", // 2 hours
    "max.retries", "3",
    "retry.delay.seconds", "30",
    "notification.email", "team@company.com"
);

preferencesClient.setProgramPreferences(workflowId, programPrefs);

// Get resolved preferences (with inheritance from namespace and application)
Map<String, String> resolvedPrefs = preferencesClient.getResolvedPreferences(workflowId);
System.out.println("Resolved preferences for " + workflowId.getProgram() + ": " + resolvedPrefs);

// Analyze preference inheritance
System.out.println("Preference resolution order (highest to lowest priority):");
System.out.println("1. Program level: " + preferencesClient.getProgramPreferences(workflowId).keySet());
System.out.println("2. Application level: " + preferencesClient.getApplicationPreferences(appId).keySet());
System.out.println("3. Namespace level: " + preferencesClient.getPreferences(namespace).keySet());

Dataset Module Management

Module Deployment

// Deploy custom dataset module
File moduleJar = new File("/path/to/custom-dataset-module.jar");
String moduleName = "custom-table-module";
String mainClassName = "com.company.dataset.CustomTableModule";

datasetModuleClient.deploy(namespace, moduleName, mainClassName, moduleJar);
System.out.println("Deployed dataset module: " + moduleName);

// Deploy with module ID
DatasetModuleId moduleId = DatasetModuleId.of(namespace, moduleName);
datasetModuleClient.deploy(moduleId, mainClassName, moduleJar);

// List deployed modules
List<DatasetModuleMeta> modules = datasetModuleClient.list(namespace);
System.out.println("Deployed dataset modules:");
for (DatasetModuleMeta module : modules) {
    System.out.println("- " + module.getName());
    System.out.println("  Class: " + module.getClassName());
    System.out.println("  JAR: " + module.getJarLocationPath());
    System.out.println("  Types: " + module.getTypes());
}

Module Information and Management

// Get specific module information
DatasetModuleMeta moduleInfo = datasetModuleClient.get(moduleId);
System.out.println("Module: " + moduleInfo.getName());
System.out.println("Class: " + moduleInfo.getClassName());
System.out.println("JAR location: " + moduleInfo.getJarLocationPath());
System.out.println("Provided types: " + moduleInfo.getTypes());

// Delete module
try {
    datasetModuleClient.delete(moduleId);
    System.out.println("Deleted module: " + moduleName);
} catch (DatasetModuleInUseException e) {
    System.err.println("Cannot delete module - datasets depend on it: " + e.getMessage());
}

// Clean up all modules in namespace (use with caution!)
// datasetModuleClient.deleteAll(namespace);

Dataset Type Operations

// List available dataset types
List<DatasetTypeMeta> types = datasetTypeClient.list(namespace);
System.out.println("Available dataset types:");
for (DatasetTypeMeta type : types) {
    System.out.println("- " + type.getName());
    System.out.println("  Modules: " + type.getModules());
}

// Get specific type information
DatasetTypeId typeId = DatasetTypeId.of(namespace, "custom-table");
try {
    DatasetTypeMeta typeInfo = datasetTypeClient.get(typeId);
    System.out.println("Type: " + typeInfo.getName());
    System.out.println("Modules: " + typeInfo.getModules());
} catch (DatasetTypeNotFoundException e) {
    System.err.println("Dataset type not found: " + typeId.getType());
}

Workflow Operations

Workflow Token Management

// Get workflow token for a specific run
ApplicationId appId = ApplicationId.of(namespace, "analytics-app", "1.0.0");
ProgramId workflowId = ProgramId.of(appId, ProgramType.WORKFLOW, "data-processing");
String runId = "run-12345"; // Obtained from program runs
ProgramRunId workflowRunId = ProgramRunId.of(workflowId, runId);

WorkflowToken token = workflowClient.getWorkflowToken(workflowRunId);
System.out.println("Workflow token for run: " + runId);

// Get all token data
Map<String, WorkflowToken.Value> allTokens = token.getAll();
for (Map.Entry<String, WorkflowToken.Value> entry : allTokens.entrySet()) {
    WorkflowToken.Value value = entry.getValue();
    System.out.println("Key: " + entry.getKey());
    System.out.println("  Value: " + value.toString());
    System.out.println("  Node: " + value.getNodeName()); 
}

// Get tokens by scope
WorkflowToken systemTokens = workflowClient.getWorkflowToken(workflowRunId, WorkflowToken.Scope.SYSTEM);
WorkflowToken userTokens = workflowClient.getWorkflowToken(workflowRunId, WorkflowToken.Scope.USER);

System.out.println("System tokens: " + systemTokens.getAll().keySet());
System.out.println("User tokens: " + userTokens.getAll().keySet());

// Get specific token value
String specificKey = "processed.records.count";
WorkflowToken specificToken = workflowClient.getWorkflowToken(workflowRunId, WorkflowToken.Scope.USER, specificKey);
if (specificToken.getAll().containsKey(specificKey)) {
    WorkflowToken.Value recordCount = specificToken.get(specificKey);
    System.out.println("Processed records: " + recordCount.toString());
}

Workflow State Tracking

// Get workflow node states
List<WorkflowNodeStateDetail> nodeStates = workflowClient.getNodeStates(workflowRunId);
System.out.println("Workflow nodes (" + nodeStates.size() + "):");

for (WorkflowNodeStateDetail nodeState : nodeStates) {
    System.out.println("- Node: " + nodeState.getNodeId());
    System.out.println("  Status: " + nodeState.getNodeStatus());
    System.out.println("  Type: " + nodeState.getNodeType());
    System.out.println("  Start: " + new Date(nodeState.getStartTime()));
    
    if (nodeState.getEndTime() != null) {
        System.out.println("  End: " + new Date(nodeState.getEndTime()));
        long duration = nodeState.getEndTime() - nodeState.getStartTime();
        System.out.println("  Duration: " + duration + " ms");
    }
    
    if (nodeState.getFailureCause() != null) {
        System.out.println("  Failure: " + nodeState.getFailureCause());
    }
}

// Get specific node state
String nodeId = "data-validation-action";
WorkflowNodeStateDetail specificNode = workflowClient.getNodeState(workflowRunId, nodeId);
System.out.println("Node " + nodeId + " status: " + specificNode.getNodeStatus());

Workflow Statistics

// Get workflow statistics over time period
long endTime = System.currentTimeMillis();
long startTime = endTime - TimeUnit.DAYS.toMillis(30); // Last 30 days

WorkflowStatistics stats = workflowClient.getStatistics(workflowId, startTime, endTime);
System.out.println("Workflow statistics for " + workflowId.getProgram() + ":");
System.out.println("Total runs: " + stats.getTotalRuns());
System.out.println("Successful runs: " + stats.getSuccessfulRuns());
System.out.println("Failed runs: " + stats.getFailedRuns());
System.out.println("Average duration: " + stats.getAverageDuration() + " ms");

// Node-level statistics
Map<String, WorkflowNodeStatistics> nodeStats = stats.getNodeStatistics();
for (Map.Entry<String, WorkflowNodeStatistics> entry : nodeStats.entrySet()) {
    WorkflowNodeStatistics nodeStat = entry.getValue();
    System.out.println("Node " + entry.getKey() + ":");
    System.out.println("  Success rate: " + (nodeStat.getSuccessfulRuns() * 100.0 / nodeStat.getTotalRuns()) + "%");
    System.out.println("  Average duration: " + nodeStat.getAverageDuration() + " ms");
}

System Information and Health

System Metadata

// Get CDAP version
String version = metaClient.getVersion();
System.out.println("CDAP Version: " + version);

// Ping CDAP instance
try {
    metaClient.ping();
    System.out.println("CDAP instance is responsive");
} catch (IOException e) {
    System.err.println("CDAP instance is not responsive: " + e.getMessage());
}

// Get system configuration
Map<String, String> config = metaClient.getConfiguration();
System.out.println("System configuration:");
for (Map.Entry<String, String> entry : config.entrySet()) {
    if (entry.getKey().contains("password") || entry.getKey().contains("secret")) {
        System.out.println("  " + entry.getKey() + ": [REDACTED]");
    } else {
        System.out.println("  " + entry.getKey() + ": " + entry.getValue());
    }
}

// Get system capabilities
CDAPCapabilities capabilities = metaClient.getCapabilities();
System.out.println("System capabilities:");
System.out.println("  Security enabled: " + capabilities.isSecurityEnabled());
System.out.println("  Namespaces supported: " + capabilities.isNamespacesSupported());
System.out.println("  Audit logging: " + capabilities.isAuditLoggingEnabled());

Advanced Data Operations

Comprehensive Data Pipeline Analysis

public class DataPipelineAnalyzer {
    private final LineageClient lineageClient;
    private final PreferencesClient preferencesClient;
    private final WorkflowClient workflowClient;
    
    public DataPipelineAnalyzer(LineageClient lineageClient, 
                               PreferencesClient preferencesClient,
                               WorkflowClient workflowClient) {
        this.lineageClient = lineageClient;
        this.preferencesClient = preferencesClient;
        this.workflowClient = workflowClient;
    }
    
    public PipelineAnalysisReport analyzePipeline(DatasetId dataset, long analysisWindowDays) {
        long endTime = System.currentTimeMillis();
        long startTime = endTime - TimeUnit.DAYS.toMillis(analysisWindowDays);
        
        PipelineAnalysisReport.Builder reportBuilder = PipelineAnalysisReport.builder()
            .dataset(dataset)
            .analysisWindow(startTime, endTime);
        
        try {
            // Get lineage information
            Lineage lineage = lineageClient.getLineage(dataset, startTime, endTime, 5);
            reportBuilder.lineage(lineage);
            
            // Analyze producers and consumers
            Set<ProgramId> producers = lineage.getProducers();
            Set<ProgramId> consumers = lineage.getConsumers();
            
            reportBuilder.producers(producers).consumers(consumers);
            
            // Get workflow statistics for each producer/consumer
            for (ProgramId program : producers) {
                if (program.getType() == ProgramType.WORKFLOW) {
                    WorkflowStatistics stats = workflowClient.getStatistics(program, startTime, endTime);
                    reportBuilder.addWorkflowStats(program, stats);
                }
            }
            
            for (ProgramId program : consumers) {
                if (program.getType() == ProgramType.WORKFLOW) {
                    WorkflowStatistics stats = workflowClient.getStatistics(program, startTime, endTime);
                    reportBuilder.addWorkflowStats(program, stats);
                }
            }
            
            // Get preferences for involved applications
            Set<ApplicationId> applications = new HashSet<>();
            producers.forEach(p -> applications.add(p.getApplication()));
            consumers.forEach(p -> applications.add(p.getApplication()));
            
            for (ApplicationId app : applications) {
                Map<String, String> prefs = preferencesClient.getApplicationPreferences(app);
                reportBuilder.addApplicationPreferences(app, prefs);
            }
            
        } catch (Exception e) {
            reportBuilder.error("Analysis failed: " + e.getMessage());
        }
        
        return reportBuilder.build();
    }
}

// Analysis report data structure
public class PipelineAnalysisReport {
    private final DatasetId dataset;
    private final long startTime;
    private final long endTime;
    private final Lineage lineage;
    private final Set<ProgramId> producers;
    private final Set<ProgramId> consumers;
    private final Map<ProgramId, WorkflowStatistics> workflowStats;
    private final Map<ApplicationId, Map<String, String>> applicationPreferences;
    private final String error;
    
    // Constructor, getters, and builder
}

Error Handling

Data operations may throw these common exceptions:

  • LineageNotFoundException: Lineage data not available for specified time range
  • PreferencesNotFoundException: Preferences not set at requested level
  • DatasetModuleNotFoundException: Dataset module does not exist
  • WorkflowNotFoundException: Workflow or run not found
  • UnauthenticatedException: Authentication required
  • UnauthorizedException: Insufficient permissions
try {
    Lineage lineage = lineageClient.getLineage(datasetId, startTime, endTime, 3);
    System.out.println("Lineage retrieved successfully");
} catch (LineageNotFoundException e) {
    System.err.println("No lineage data available for time range");
} catch (UnauthorizedException e) {
    System.err.println("No permission to access lineage data: " + e.getMessage());
} catch (IOException e) {
    System.err.println("Network error: " + e.getMessage());
}

Best Practices

  1. Lineage Tracking: Regularly analyze data lineage for impact assessment
  2. Preference Hierarchy: Use preference inheritance effectively (namespace → application → program)
  3. Module Management: Keep dataset modules updated and remove unused ones
  4. Workflow Monitoring: Monitor workflow tokens and node states for debugging
  5. System Health: Regularly check system capabilities and configuration
  6. Performance Analysis: Use workflow statistics for performance optimization
// Good: Comprehensive data operations with proper error handling
public class DataOperationsManager {
    private final LineageClient lineageClient;
    private final PreferencesClient preferencesClient;
    private final WorkflowClient workflowClient;
    
    public DataOperationsManager(LineageClient lineageClient, 
                                PreferencesClient preferencesClient,
                                WorkflowClient workflowClient) {
        this.lineageClient = lineageClient;
        this.preferencesClient = preferencesClient;
        this.workflowClient = workflowClient;
    }
    
    public void setupApplicationDefaults(ApplicationId appId, Map<String, String> defaultPreferences) {
        try {
            // Set application preferences
            preferencesClient.setApplicationPreferences(appId, defaultPreferences);
            
            // Verify preferences were set
            Map<String, String> verifyPrefs = preferencesClient.getApplicationPreferences(appId);
            if (!verifyPrefs.equals(defaultPreferences)) {
                System.err.println("Preference verification failed for " + appId.getApplication());
            } else {
                System.out.println("Successfully configured preferences for " + appId.getApplication());
            }
            
        } catch (Exception e) {
            System.err.println("Failed to set preferences for " + appId.getApplication() + ": " + e.getMessage());
            throw new RuntimeException("Preference setup failed", e);
        }
    }
    
    public void analyzeDatasetImpact(DatasetId datasetId) {
        try {
            long endTime = System.currentTimeMillis();
            long startTime = endTime - TimeUnit.DAYS.toMillis(30);
            
            // Get dataset dependencies
            Set<ProgramId> consumers = lineageClient.getDatasetConsumers(datasetId, startTime, endTime);
            Set<ProgramId> producers = lineageClient.getDatasetProducers(datasetId, startTime, endTime);
            
            System.out.println("Impact analysis for dataset: " + datasetId.getDataset());
            System.out.println("Affected consumers: " + consumers.size());
            System.out.println("Data producers: " + producers.size());
            
            // Warn if dataset has many dependencies
            if (consumers.size() > 10) {
                System.out.println("WARNING: Dataset has many consumers - changes may have wide impact");
            }
            
        } catch (Exception e) {
            System.err.println("Impact analysis failed for " + datasetId.getDataset() + ": " + e.getMessage());
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-cdap-cdap--cdap-client

docs

application-management.md

artifact-management.md

configuration.md

data-operations.md

dataset-operations.md

index.md

metrics-monitoring.md

program-control.md

schedule-management.md

security-administration.md

service-management.md

tile.json