CDAP Java Client library providing programmatic APIs for interacting with the CDAP platform
—
Additional data operations including lineage tracking, preferences management, dataset modules, workflow management, and system utilities that complement the core CDAP functionality.
public class LineageClient {
// Constructors
public LineageClient(ClientConfig config);
public LineageClient(ClientConfig config, RESTClient restClient);
// Data lineage operations
public Lineage getLineage(DatasetId dataset, long startTime, long endTime, int levels);
public FieldLineage getFieldLineage(DatasetId dataset, String field, long startTime, long endTime);
public Set<DatasetId> getDatasetDependencies(DatasetId dataset, long startTime, long endTime);
public Set<ProgramId> getDatasetConsumers(DatasetId dataset, long startTime, long endTime);
public Set<ProgramId> getDatasetProducers(DatasetId dataset, long startTime, long endTime);
}public class PreferencesClient {
// Constructors
public PreferencesClient(ClientConfig config);
public PreferencesClient(ClientConfig config, RESTClient restClient);
// Namespace preferences
public void setPreferences(NamespaceId namespace, Map<String, String> preferences);
public Map<String, String> getPreferences(NamespaceId namespace);
public void deletePreferences(NamespaceId namespace);
// Application preferences
public void setApplicationPreferences(ApplicationId application, Map<String, String> preferences);
public Map<String, String> getApplicationPreferences(ApplicationId application);
public void deleteApplicationPreferences(ApplicationId application);
// Program preferences
public void setProgramPreferences(ProgramId program, Map<String, String> preferences);
public Map<String, String> getProgramPreferences(ProgramId program);
public void deleteProgramPreferences(ProgramId program);
// Resolved preferences (with inheritance)
public Map<String, String> getResolvedPreferences(ProgramId program);
}public class DatasetModuleClient {
// Constructors
public DatasetModuleClient(ClientConfig config);
public DatasetModuleClient(ClientConfig config, RESTClient restClient);
// Dataset module management
public void deploy(NamespaceId namespace, String moduleName, String className, File jarFile);
public void deploy(DatasetModuleId moduleId, String className, File jarFile);
public List<DatasetModuleMeta> list(NamespaceId namespace);
public DatasetModuleMeta get(DatasetModuleId moduleId);
public void delete(DatasetModuleId moduleId);
public void deleteAll(NamespaceId namespace);
}public class DatasetTypeClient {
// Constructors
public DatasetTypeClient(ClientConfig config);
public DatasetTypeClient(ClientConfig config, RESTClient restClient);
// Dataset type operations
public List<DatasetTypeMeta> list(NamespaceId namespace);
public DatasetTypeMeta get(DatasetTypeId typeId);
}public class WorkflowClient {
// Constructors
public WorkflowClient(ClientConfig config);
public WorkflowClient(ClientConfig config, RESTClient restClient);
// Workflow token operations
public WorkflowToken getWorkflowToken(ProgramRunId workflowRun);
public WorkflowToken getWorkflowToken(ProgramRunId workflowRun, WorkflowToken.Scope scope);
public WorkflowToken getWorkflowToken(ProgramRunId workflowRun, WorkflowToken.Scope scope, String key);
// Workflow state operations
public List<WorkflowNodeStateDetail> getNodeStates(ProgramRunId workflowRun);
public WorkflowNodeStateDetail getNodeState(ProgramRunId workflowRun, String nodeId);
// Workflow statistics
public WorkflowStatistics getStatistics(ProgramId workflow, long startTime, long endTime);
}public class MetaClient {
// Constructors
public MetaClient(ClientConfig config);
public MetaClient(ClientConfig config, RESTClient restClient);
// System information
public String getVersion();
public void ping();
public Map<String, String> getConfiguration();
public CDAPCapabilities getCapabilities();
}// Get lineage for a dataset
DatasetId userProfiles = DatasetId.of(namespace, "user-profiles");
long endTime = System.currentTimeMillis();
long startTime = endTime - TimeUnit.DAYS.toMillis(7); // Last 7 days
int levels = 3; // 3 levels of lineage
Lineage lineage = lineageClient.getLineage(userProfiles, startTime, endTime, levels);
System.out.println("Lineage for dataset: " + userProfiles.getDataset());
System.out.println("Programs that read from this dataset:");
for (ProgramId consumer : lineage.getConsumers()) {
System.out.println("- " + consumer.getApplication() + "." + consumer.getProgram() +
" (" + consumer.getType() + ")");
}
System.out.println("Programs that write to this dataset:");
for (ProgramId producer : lineage.getProducers()) {
System.out.println("- " + producer.getApplication() + "." + producer.getProgram() +
" (" + producer.getType() + ")");
}
// Get upstream and downstream datasets
Set<DatasetId> upstreamDatasets = lineage.getUpstreamDatasets();
Set<DatasetId> downstreamDatasets = lineage.getDownstreamDatasets();
System.out.println("Upstream datasets: " + upstreamDatasets);
System.out.println("Downstream datasets: " + downstreamDatasets);// Get field lineage for specific field
String fieldName = "user_id";
FieldLineage fieldLineage = lineageClient.getFieldLineage(userProfiles, fieldName, startTime, endTime);
System.out.println("Field lineage for " + fieldName + ":");
System.out.println("Source fields: " + fieldLineage.getSourceFields());
System.out.println("Destination fields: " + fieldLineage.getDestinationFields());
// Analyze field transformations
for (FieldTransformation transformation : fieldLineage.getTransformations()) {
System.out.println("Transformation in " + transformation.getProgram() + ":");
System.out.println(" From: " + transformation.getSourceField());
System.out.println(" To: " + transformation.getDestinationField());
System.out.println(" Operation: " + transformation.getOperation());
}// Get dataset dependencies
Set<DatasetId> dependencies = lineageClient.getDatasetDependencies(userProfiles, startTime, endTime);
System.out.println("Datasets that " + userProfiles.getDataset() + " depends on: " + dependencies);
// Get consumers and producers
Set<ProgramId> consumers = lineageClient.getDatasetConsumers(userProfiles, startTime, endTime);
Set<ProgramId> producers = lineageClient.getDatasetProducers(userProfiles, startTime, endTime);
System.out.println("Programs consuming from " + userProfiles.getDataset() + ":");
consumers.forEach(program ->
System.out.println("- " + program.getApplication() + "." + program.getProgram()));
System.out.println("Programs producing to " + userProfiles.getDataset() + ":");
producers.forEach(program ->
System.out.println("- " + program.getApplication() + "." + program.getProgram()));// Set namespace preferences
Map<String, String> namespacePrefs = Map.of(
"default.batch.size", "1000",
"default.timeout.minutes", "30",
"log.level", "INFO",
"metrics.collection.enabled", "true",
"data.retention.default.days", "90"
);
preferencesClient.setPreferences(namespace, namespacePrefs);
System.out.println("Set namespace preferences");
// Get namespace preferences
Map<String, String> retrievedPrefs = preferencesClient.getPreferences(namespace);
System.out.println("Namespace preferences: " + retrievedPrefs);
// Update specific preference
Map<String, String> updatedPrefs = new HashMap<>(retrievedPrefs);
updatedPrefs.put("log.level", "DEBUG");
preferencesClient.setPreferences(namespace, updatedPrefs);// Set application preferences
ApplicationId appId = ApplicationId.of(namespace, "data-pipeline", "1.0.0");
Map<String, String> appPrefs = Map.of(
"input.path", "/data/pipeline/input",
"output.path", "/data/pipeline/output",
"parallel.workers", "4",
"checkpoint.interval", "60000",
"error.handling", "skip"
);
preferencesClient.setApplicationPreferences(appId, appPrefs);
System.out.println("Set application preferences for: " + appId.getApplication());
// Get application preferences
Map<String, String> appRetrievedPrefs = preferencesClient.getApplicationPreferences(appId);
System.out.println("Application preferences: " + appRetrievedPrefs);// Set program-specific preferences
ProgramId workflowId = ProgramId.of(appId, ProgramType.WORKFLOW, "etl-workflow");
Map<String, String> programPrefs = Map.of(
"workflow.timeout", "7200000", // 2 hours
"max.retries", "3",
"retry.delay.seconds", "30",
"notification.email", "team@company.com"
);
preferencesClient.setProgramPreferences(workflowId, programPrefs);
// Get resolved preferences (with inheritance from namespace and application)
Map<String, String> resolvedPrefs = preferencesClient.getResolvedPreferences(workflowId);
System.out.println("Resolved preferences for " + workflowId.getProgram() + ": " + resolvedPrefs);
// Analyze preference inheritance
System.out.println("Preference resolution order (highest to lowest priority):");
System.out.println("1. Program level: " + preferencesClient.getProgramPreferences(workflowId).keySet());
System.out.println("2. Application level: " + preferencesClient.getApplicationPreferences(appId).keySet());
System.out.println("3. Namespace level: " + preferencesClient.getPreferences(namespace).keySet());// Deploy custom dataset module
File moduleJar = new File("/path/to/custom-dataset-module.jar");
String moduleName = "custom-table-module";
String mainClassName = "com.company.dataset.CustomTableModule";
datasetModuleClient.deploy(namespace, moduleName, mainClassName, moduleJar);
System.out.println("Deployed dataset module: " + moduleName);
// Deploy with module ID
DatasetModuleId moduleId = DatasetModuleId.of(namespace, moduleName);
datasetModuleClient.deploy(moduleId, mainClassName, moduleJar);
// List deployed modules
List<DatasetModuleMeta> modules = datasetModuleClient.list(namespace);
System.out.println("Deployed dataset modules:");
for (DatasetModuleMeta module : modules) {
System.out.println("- " + module.getName());
System.out.println(" Class: " + module.getClassName());
System.out.println(" JAR: " + module.getJarLocationPath());
System.out.println(" Types: " + module.getTypes());
}// Get specific module information
DatasetModuleMeta moduleInfo = datasetModuleClient.get(moduleId);
System.out.println("Module: " + moduleInfo.getName());
System.out.println("Class: " + moduleInfo.getClassName());
System.out.println("JAR location: " + moduleInfo.getJarLocationPath());
System.out.println("Provided types: " + moduleInfo.getTypes());
// Delete module
try {
datasetModuleClient.delete(moduleId);
System.out.println("Deleted module: " + moduleName);
} catch (DatasetModuleInUseException e) {
System.err.println("Cannot delete module - datasets depend on it: " + e.getMessage());
}
// Clean up all modules in namespace (use with caution!)
// datasetModuleClient.deleteAll(namespace);// List available dataset types
List<DatasetTypeMeta> types = datasetTypeClient.list(namespace);
System.out.println("Available dataset types:");
for (DatasetTypeMeta type : types) {
System.out.println("- " + type.getName());
System.out.println(" Modules: " + type.getModules());
}
// Get specific type information
DatasetTypeId typeId = DatasetTypeId.of(namespace, "custom-table");
try {
DatasetTypeMeta typeInfo = datasetTypeClient.get(typeId);
System.out.println("Type: " + typeInfo.getName());
System.out.println("Modules: " + typeInfo.getModules());
} catch (DatasetTypeNotFoundException e) {
System.err.println("Dataset type not found: " + typeId.getType());
}// Get workflow token for a specific run
ApplicationId appId = ApplicationId.of(namespace, "analytics-app", "1.0.0");
ProgramId workflowId = ProgramId.of(appId, ProgramType.WORKFLOW, "data-processing");
String runId = "run-12345"; // Obtained from program runs
ProgramRunId workflowRunId = ProgramRunId.of(workflowId, runId);
WorkflowToken token = workflowClient.getWorkflowToken(workflowRunId);
System.out.println("Workflow token for run: " + runId);
// Get all token data
Map<String, WorkflowToken.Value> allTokens = token.getAll();
for (Map.Entry<String, WorkflowToken.Value> entry : allTokens.entrySet()) {
WorkflowToken.Value value = entry.getValue();
System.out.println("Key: " + entry.getKey());
System.out.println(" Value: " + value.toString());
System.out.println(" Node: " + value.getNodeName());
}
// Get tokens by scope
WorkflowToken systemTokens = workflowClient.getWorkflowToken(workflowRunId, WorkflowToken.Scope.SYSTEM);
WorkflowToken userTokens = workflowClient.getWorkflowToken(workflowRunId, WorkflowToken.Scope.USER);
System.out.println("System tokens: " + systemTokens.getAll().keySet());
System.out.println("User tokens: " + userTokens.getAll().keySet());
// Get specific token value
String specificKey = "processed.records.count";
WorkflowToken specificToken = workflowClient.getWorkflowToken(workflowRunId, WorkflowToken.Scope.USER, specificKey);
if (specificToken.getAll().containsKey(specificKey)) {
WorkflowToken.Value recordCount = specificToken.get(specificKey);
System.out.println("Processed records: " + recordCount.toString());
}// Get workflow node states
List<WorkflowNodeStateDetail> nodeStates = workflowClient.getNodeStates(workflowRunId);
System.out.println("Workflow nodes (" + nodeStates.size() + "):");
for (WorkflowNodeStateDetail nodeState : nodeStates) {
System.out.println("- Node: " + nodeState.getNodeId());
System.out.println(" Status: " + nodeState.getNodeStatus());
System.out.println(" Type: " + nodeState.getNodeType());
System.out.println(" Start: " + new Date(nodeState.getStartTime()));
if (nodeState.getEndTime() != null) {
System.out.println(" End: " + new Date(nodeState.getEndTime()));
long duration = nodeState.getEndTime() - nodeState.getStartTime();
System.out.println(" Duration: " + duration + " ms");
}
if (nodeState.getFailureCause() != null) {
System.out.println(" Failure: " + nodeState.getFailureCause());
}
}
// Get specific node state
String nodeId = "data-validation-action";
WorkflowNodeStateDetail specificNode = workflowClient.getNodeState(workflowRunId, nodeId);
System.out.println("Node " + nodeId + " status: " + specificNode.getNodeStatus());// Get workflow statistics over time period
long endTime = System.currentTimeMillis();
long startTime = endTime - TimeUnit.DAYS.toMillis(30); // Last 30 days
WorkflowStatistics stats = workflowClient.getStatistics(workflowId, startTime, endTime);
System.out.println("Workflow statistics for " + workflowId.getProgram() + ":");
System.out.println("Total runs: " + stats.getTotalRuns());
System.out.println("Successful runs: " + stats.getSuccessfulRuns());
System.out.println("Failed runs: " + stats.getFailedRuns());
System.out.println("Average duration: " + stats.getAverageDuration() + " ms");
// Node-level statistics
Map<String, WorkflowNodeStatistics> nodeStats = stats.getNodeStatistics();
for (Map.Entry<String, WorkflowNodeStatistics> entry : nodeStats.entrySet()) {
WorkflowNodeStatistics nodeStat = entry.getValue();
System.out.println("Node " + entry.getKey() + ":");
System.out.println(" Success rate: " + (nodeStat.getSuccessfulRuns() * 100.0 / nodeStat.getTotalRuns()) + "%");
System.out.println(" Average duration: " + nodeStat.getAverageDuration() + " ms");
}// Get CDAP version
String version = metaClient.getVersion();
System.out.println("CDAP Version: " + version);
// Ping CDAP instance
try {
metaClient.ping();
System.out.println("CDAP instance is responsive");
} catch (IOException e) {
System.err.println("CDAP instance is not responsive: " + e.getMessage());
}
// Get system configuration
Map<String, String> config = metaClient.getConfiguration();
System.out.println("System configuration:");
for (Map.Entry<String, String> entry : config.entrySet()) {
if (entry.getKey().contains("password") || entry.getKey().contains("secret")) {
System.out.println(" " + entry.getKey() + ": [REDACTED]");
} else {
System.out.println(" " + entry.getKey() + ": " + entry.getValue());
}
}
// Get system capabilities
CDAPCapabilities capabilities = metaClient.getCapabilities();
System.out.println("System capabilities:");
System.out.println(" Security enabled: " + capabilities.isSecurityEnabled());
System.out.println(" Namespaces supported: " + capabilities.isNamespacesSupported());
System.out.println(" Audit logging: " + capabilities.isAuditLoggingEnabled());public class DataPipelineAnalyzer {
private final LineageClient lineageClient;
private final PreferencesClient preferencesClient;
private final WorkflowClient workflowClient;
public DataPipelineAnalyzer(LineageClient lineageClient,
PreferencesClient preferencesClient,
WorkflowClient workflowClient) {
this.lineageClient = lineageClient;
this.preferencesClient = preferencesClient;
this.workflowClient = workflowClient;
}
public PipelineAnalysisReport analyzePipeline(DatasetId dataset, long analysisWindowDays) {
long endTime = System.currentTimeMillis();
long startTime = endTime - TimeUnit.DAYS.toMillis(analysisWindowDays);
PipelineAnalysisReport.Builder reportBuilder = PipelineAnalysisReport.builder()
.dataset(dataset)
.analysisWindow(startTime, endTime);
try {
// Get lineage information
Lineage lineage = lineageClient.getLineage(dataset, startTime, endTime, 5);
reportBuilder.lineage(lineage);
// Analyze producers and consumers
Set<ProgramId> producers = lineage.getProducers();
Set<ProgramId> consumers = lineage.getConsumers();
reportBuilder.producers(producers).consumers(consumers);
// Get workflow statistics for each producer/consumer
for (ProgramId program : producers) {
if (program.getType() == ProgramType.WORKFLOW) {
WorkflowStatistics stats = workflowClient.getStatistics(program, startTime, endTime);
reportBuilder.addWorkflowStats(program, stats);
}
}
for (ProgramId program : consumers) {
if (program.getType() == ProgramType.WORKFLOW) {
WorkflowStatistics stats = workflowClient.getStatistics(program, startTime, endTime);
reportBuilder.addWorkflowStats(program, stats);
}
}
// Get preferences for involved applications
Set<ApplicationId> applications = new HashSet<>();
producers.forEach(p -> applications.add(p.getApplication()));
consumers.forEach(p -> applications.add(p.getApplication()));
for (ApplicationId app : applications) {
Map<String, String> prefs = preferencesClient.getApplicationPreferences(app);
reportBuilder.addApplicationPreferences(app, prefs);
}
} catch (Exception e) {
reportBuilder.error("Analysis failed: " + e.getMessage());
}
return reportBuilder.build();
}
}
// Analysis report data structure
public class PipelineAnalysisReport {
private final DatasetId dataset;
private final long startTime;
private final long endTime;
private final Lineage lineage;
private final Set<ProgramId> producers;
private final Set<ProgramId> consumers;
private final Map<ProgramId, WorkflowStatistics> workflowStats;
private final Map<ApplicationId, Map<String, String>> applicationPreferences;
private final String error;
// Constructor, getters, and builder
}Data operations may throw these common exceptions:
try {
Lineage lineage = lineageClient.getLineage(datasetId, startTime, endTime, 3);
System.out.println("Lineage retrieved successfully");
} catch (LineageNotFoundException e) {
System.err.println("No lineage data available for time range");
} catch (UnauthorizedException e) {
System.err.println("No permission to access lineage data: " + e.getMessage());
} catch (IOException e) {
System.err.println("Network error: " + e.getMessage());
}// Good: Comprehensive data operations with proper error handling
public class DataOperationsManager {
private final LineageClient lineageClient;
private final PreferencesClient preferencesClient;
private final WorkflowClient workflowClient;
public DataOperationsManager(LineageClient lineageClient,
PreferencesClient preferencesClient,
WorkflowClient workflowClient) {
this.lineageClient = lineageClient;
this.preferencesClient = preferencesClient;
this.workflowClient = workflowClient;
}
public void setupApplicationDefaults(ApplicationId appId, Map<String, String> defaultPreferences) {
try {
// Set application preferences
preferencesClient.setApplicationPreferences(appId, defaultPreferences);
// Verify preferences were set
Map<String, String> verifyPrefs = preferencesClient.getApplicationPreferences(appId);
if (!verifyPrefs.equals(defaultPreferences)) {
System.err.println("Preference verification failed for " + appId.getApplication());
} else {
System.out.println("Successfully configured preferences for " + appId.getApplication());
}
} catch (Exception e) {
System.err.println("Failed to set preferences for " + appId.getApplication() + ": " + e.getMessage());
throw new RuntimeException("Preference setup failed", e);
}
}
public void analyzeDatasetImpact(DatasetId datasetId) {
try {
long endTime = System.currentTimeMillis();
long startTime = endTime - TimeUnit.DAYS.toMillis(30);
// Get dataset dependencies
Set<ProgramId> consumers = lineageClient.getDatasetConsumers(datasetId, startTime, endTime);
Set<ProgramId> producers = lineageClient.getDatasetProducers(datasetId, startTime, endTime);
System.out.println("Impact analysis for dataset: " + datasetId.getDataset());
System.out.println("Affected consumers: " + consumers.size());
System.out.println("Data producers: " + producers.size());
// Warn if dataset has many dependencies
if (consumers.size() > 10) {
System.out.println("WARNING: Dataset has many consumers - changes may have wide impact");
}
} catch (Exception e) {
System.err.println("Impact analysis failed for " + datasetId.getDataset() + ": " + e.getMessage());
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap-client