CDAP Java Client library providing programmatic APIs for interacting with the CDAP platform
—
The ProgramClient provides comprehensive program lifecycle control including start, stop, restart operations, status monitoring, instance management, and run history tracking. Programs are the executable components within CDAP applications.
public class ProgramClient {
// Constructors
public ProgramClient(ClientConfig config);
public ProgramClient(ClientConfig config, RESTClient restClient);
public ProgramClient(ClientConfig config, RESTClient restClient, ApplicationClient applicationClient);
// Program control methods
public void start(ProgramId program);
public void start(ProgramId program, boolean debug);
public void start(ProgramId program, boolean debug, Map<String, String> runtimeArgs);
public List<BatchProgramResult> start(NamespaceId namespace, List<BatchProgramStart> programs);
public void restart(ApplicationId applicationId, long startTimeSeconds, long endTimeSeconds);
public void stop(ProgramId programId);
public List<BatchProgramResult> stop(NamespaceId namespace, List<BatchProgram> programs);
public void stopAll(NamespaceId namespace);
// Program status methods
public String getStatus(ProgramId programId);
public List<BatchProgramStatus> getStatus(NamespaceId namespace, List<BatchProgram> programs);
public void waitForStatus(ProgramId program, ProgramStatus status, long timeout, TimeUnit timeoutUnit);
public DistributedProgramLiveInfo getLiveInfo(ProgramId program);
// Instance management methods
public int getWorkerInstances(ProgramId worker);
public void setWorkerInstances(ProgramId worker, int instances);
public int getServiceInstances(ServiceId service);
public void setServiceInstances(ServiceId service, int instances);
// Program data methods
public List<RunRecord> getProgramRuns(ProgramId program, String state, long startTime, long endTime, int limit);
public List<RunRecord> getAllProgramRuns(ProgramId program, long startTime, long endTime, int limit);
public String getProgramLogs(ProgramId program, long start, long stop);
public Map<String, String> getRuntimeArgs(ProgramId program);
public void setRuntimeArgs(ProgramId program, Map<String, String> runtimeArgs);
}public enum ProgramType {
MAPREDUCE, WORKFLOW, SERVICE, SPARK, WORKER
}
public enum ProgramStatus {
PENDING, STARTING, RUNNING, SUSPENDED, RESUMING, COMPLETED, FAILED, KILLED, STOPPED
}
public class ProgramId {
public static ProgramId of(ApplicationId application, ProgramType type, String program);
public ApplicationId getApplication();
public ProgramType getType();
public String getProgram();
}
public class RunRecord {
public String getPid();
public long getStartTs();
public long getRunTs();
public long getStopTs();
public long getSuspendTs();
public long getResumeTs();
public String getStatus();
public Map<String, String> getProperties();
public ProgramRunCluster getCluster();
}
public class DistributedProgramLiveInfo {
public String getStatus();
public Map<String, Integer> getContainers();
public String getYarnAppId();
}// Start a program
ApplicationId appId = ApplicationId.of(namespace, "data-processing-app", "1.0.0");
ProgramId workflowId = ProgramId.of(appId, ProgramType.WORKFLOW, "data-pipeline");
programClient.start(workflowId);
System.out.println("Started workflow: " + workflowId.getProgram());
// Start with debug mode enabled
programClient.start(workflowId, true);
// Start with runtime arguments
Map<String, String> runtimeArgs = Map.of(
"input.path", "/data/input/2023/12/01",
"output.path", "/data/output/processed",
"batch.size", "1000",
"num.partitions", "10"
);
programClient.start(workflowId, false, runtimeArgs);
// Stop a program
programClient.stop(workflowId);
System.out.println("Stopped workflow: " + workflowId.getProgram());// Start multiple programs at once
List<BatchProgramStart> programsToStart = List.of(
new BatchProgramStart(
ProgramId.of(appId, ProgramType.SERVICE, "data-service"),
Map.of("port", "8080", "threads", "10")
),
new BatchProgramStart(
ProgramId.of(appId, ProgramType.WORKFLOW, "etl-workflow"),
Map.of("schedule", "daily")
)
);
List<BatchProgramResult> startResults = programClient.start(namespace, programsToStart);
for (BatchProgramResult result : startResults) {
if (result.getError() != null) {
System.err.println("Failed to start " + result.getProgramId() + ": " + result.getError());
} else {
System.out.println("Started " + result.getProgramId());
}
}
// Stop multiple programs
List<BatchProgram> programsToStop = List.of(
new BatchProgram(ProgramId.of(appId, ProgramType.SERVICE, "data-service")),
new BatchProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "etl-workflow"))
);
List<BatchProgramResult> stopResults = programClient.stop(namespace, programsToStop);
// Stop all programs in namespace (use with caution!)
programClient.stopAll(namespace);// Restart all programs in application within time range
long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(24); // Last 24 hours
long endTime = System.currentTimeMillis();
programClient.restart(appId, startTime / 1000, endTime / 1000);
System.out.println("Restarted all programs in application: " + appId.getApplication());// Get current program status
String status = programClient.getStatus(workflowId);
System.out.println("Workflow status: " + status);
// Get status of multiple programs
List<BatchProgram> programs = List.of(
new BatchProgram(ProgramId.of(appId, ProgramType.SERVICE, "data-service")),
new BatchProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "etl-workflow"))
);
List<BatchProgramStatus> statuses = programClient.getStatus(namespace, programs);
for (BatchProgramStatus programStatus : statuses) {
System.out.println(programStatus.getProgramId() + ": " + programStatus.getStatus());
}// Wait for program to reach specific status
try {
programClient.start(workflowId);
programClient.waitForStatus(workflowId, ProgramStatus.RUNNING, 60, TimeUnit.SECONDS);
System.out.println("Workflow is now running");
// Wait for completion
programClient.waitForStatus(workflowId, ProgramStatus.COMPLETED, 30, TimeUnit.MINUTES);
System.out.println("Workflow completed successfully");
} catch (TimeoutException e) {
System.err.println("Timeout waiting for status change");
} catch (InterruptedException e) {
System.err.println("Interrupted while waiting");
}// Get detailed live information about running program
DistributedProgramLiveInfo liveInfo = programClient.getLiveInfo(workflowId);
System.out.println("Status: " + liveInfo.getStatus());
System.out.println("YARN Application ID: " + liveInfo.getYarnAppId());
System.out.println("Containers: " + liveInfo.getContainers());
// Monitor container information
Map<String, Integer> containers = liveInfo.getContainers();
for (Map.Entry<String, Integer> entry : containers.entrySet()) {
System.out.println("Container " + entry.getKey() + ": " + entry.getValue() + " instances");
}// Get current worker instances
ProgramId workerId = ProgramId.of(appId, ProgramType.WORKER, "data-processor");
int currentInstances = programClient.getWorkerInstances(workerId);
System.out.println("Current worker instances: " + currentInstances);
// Scale worker instances
int newInstances = 5;
programClient.setWorkerInstances(workerId, newInstances);
System.out.println("Scaled worker to " + newInstances + " instances");
// Dynamic scaling based on load
int targetInstances = calculateOptimalInstances(); // Your scaling logic
if (currentInstances != targetInstances) {
programClient.setWorkerInstances(workerId, targetInstances);
System.out.println("Scaled from " + currentInstances + " to " + targetInstances + " instances");
}// Get current service instances
ServiceId serviceId = ServiceId.of(appId, "api-service");
int currentServiceInstances = programClient.getServiceInstances(serviceId);
System.out.println("Current service instances: " + currentServiceInstances);
// Scale service instances
int newServiceInstances = 3;
programClient.setServiceInstances(serviceId, newServiceInstances);
System.out.println("Scaled service to " + newServiceInstances + " instances");// Get current runtime arguments
Map<String, String> currentArgs = programClient.getRuntimeArgs(workflowId);
System.out.println("Current runtime arguments: " + currentArgs);
// Set new runtime arguments
Map<String, String> newArgs = Map.of(
"input.path", "/data/input/latest",
"output.path", "/data/output/" + System.currentTimeMillis(),
"batch.size", "2000",
"enable.compression", "true",
"log.level", "INFO"
);
programClient.setRuntimeArgs(workflowId, newArgs);
System.out.println("Updated runtime arguments");
// Merge with existing arguments
Map<String, String> mergedArgs = new HashMap<>(currentArgs);
mergedArgs.putAll(Map.of(
"new.parameter", "new-value",
"updated.parameter", "updated-value"
));
programClient.setRuntimeArgs(workflowId, mergedArgs);// Get recent program runs
long endTime = System.currentTimeMillis();
long startTime = endTime - TimeUnit.DAYS.toMillis(7); // Last 7 days
int limit = 50;
List<RunRecord> runs = programClient.getProgramRuns(workflowId, "ALL", startTime, endTime, limit);
System.out.println("Found " + runs.size() + " runs in the last 7 days");
for (RunRecord run : runs) {
System.out.println("Run ID: " + run.getPid());
System.out.println(" Status: " + run.getStatus());
System.out.println(" Start: " + new Date(run.getStartTs() * 1000));
System.out.println(" Duration: " + (run.getStopTs() - run.getStartTs()) + " seconds");
System.out.println(" Properties: " + run.getProperties());
}
// Get runs by status
List<RunRecord> failedRuns = programClient.getProgramRuns(workflowId, "FAILED", startTime, endTime, limit);
List<RunRecord> completedRuns = programClient.getProgramRuns(workflowId, "COMPLETED", startTime, endTime, limit);
// Get all runs regardless of status
List<RunRecord> allRuns = programClient.getAllProgramRuns(workflowId, startTime, endTime, limit);// Get program logs for a time range
long logStart = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1); // Last hour
long logStop = System.currentTimeMillis();
String logs = programClient.getProgramLogs(workflowId, logStart, logStop);
System.out.println("Program logs:");
System.out.println(logs);
// Parse and filter logs
String[] logLines = logs.split("\n");
for (String line : logLines) {
if (line.contains("ERROR")) {
System.err.println("Error found: " + line);
}
}// Complete workflow with status monitoring
public void runWorkflowWithMonitoring(ProgramId workflowId, Map<String, String> args) {
try {
// Set runtime arguments
programClient.setRuntimeArgs(workflowId, args);
// Start the workflow
programClient.start(workflowId);
System.out.println("Started workflow: " + workflowId.getProgram());
// Wait for running status
programClient.waitForStatus(workflowId, ProgramStatus.RUNNING, 2, TimeUnit.MINUTES);
System.out.println("Workflow is running");
// Monitor progress
String status;
do {
Thread.sleep(30000); // Check every 30 seconds
status = programClient.getStatus(workflowId);
System.out.println("Current status: " + status);
if (status.equals("RUNNING")) {
DistributedProgramLiveInfo liveInfo = programClient.getLiveInfo(workflowId);
System.out.println("YARN App ID: " + liveInfo.getYarnAppId());
}
} while (status.equals("RUNNING") || status.equals("STARTING"));
// Check final status
if (status.equals("COMPLETED")) {
System.out.println("Workflow completed successfully");
} else {
System.err.println("Workflow failed with status: " + status);
// Get recent logs to diagnose failure
long now = System.currentTimeMillis();
String errorLogs = programClient.getProgramLogs(workflowId, now - 600000, now);
System.err.println("Recent logs:\n" + errorLogs);
}
} catch (Exception e) {
System.err.println("Error managing workflow: " + e.getMessage());
try {
programClient.stop(workflowId);
} catch (Exception stopEx) {
System.err.println("Error stopping workflow: " + stopEx.getMessage());
}
}
}// Auto-scaling example for worker programs
public void autoScaleWorker(ProgramId workerId) {
try {
int currentInstances = programClient.getWorkerInstances(workerId);
// Get program status and live info
String status = programClient.getStatus(workerId);
if (!status.equals("RUNNING")) {
return; // Don't scale if not running
}
DistributedProgramLiveInfo liveInfo = programClient.getLiveInfo(workerId);
// Your scaling logic based on metrics
// This is a simplified example
int optimalInstances = calculateOptimalInstances(liveInfo);
if (optimalInstances != currentInstances) {
System.out.println("Scaling worker from " + currentInstances + " to " + optimalInstances);
programClient.setWorkerInstances(workerId, optimalInstances);
// Wait for scaling to take effect
Thread.sleep(30000);
// Verify scaling
int newInstances = programClient.getWorkerInstances(workerId);
System.out.println("Scaling completed. New instance count: " + newInstances);
}
} catch (Exception e) {
System.err.println("Error during auto-scaling: " + e.getMessage());
}
}
private int calculateOptimalInstances(DistributedProgramLiveInfo liveInfo) {
// Implement your scaling logic based on:
// - Container utilization
// - Queue depths
// - Processing rates
// - Time of day
// etc.
return 3; // Simplified example
}Program control operations may throw these exceptions:
try {
programClient.start(workflowId);
} catch (AlreadyRunningException e) {
System.out.println("Program is already running");
} catch (ProgramNotFoundException e) {
System.err.println("Program not found: " + workflowId);
} catch (UnauthorizedException e) {
System.err.println("No permission to start program: " + e.getMessage());
} catch (IOException e) {
System.err.println("Network error: " + e.getMessage());
}// Good: Comprehensive program management with error handling
public class ProgramManager {
private final ProgramClient programClient;
public void safeStartProgram(ProgramId programId, Map<String, String> args) {
try {
// Check current status
String status = programClient.getStatus(programId);
if ("RUNNING".equals(status)) {
System.out.println("Program already running: " + programId.getProgram());
return;
}
// Set runtime arguments
if (args != null && !args.isEmpty()) {
programClient.setRuntimeArgs(programId, args);
}
// Start program
programClient.start(programId);
// Wait for startup with timeout
programClient.waitForStatus(programId, ProgramStatus.RUNNING, 5, TimeUnit.MINUTES);
System.out.println("Successfully started program: " + programId.getProgram());
} catch (TimeoutException e) {
System.err.println("Timeout starting program: " + programId.getProgram());
safeStopProgram(programId); // Cleanup on timeout
} catch (Exception e) {
System.err.println("Error starting program: " + e.getMessage());
}
}
public void safeStopProgram(ProgramId programId) {
try {
String status = programClient.getStatus(programId);
if ("STOPPED".equals(status) || "COMPLETED".equals(status)) {
System.out.println("Program already stopped: " + programId.getProgram());
return;
}
programClient.stop(programId);
programClient.waitForStatus(programId, ProgramStatus.STOPPED, 2, TimeUnit.MINUTES);
System.out.println("Successfully stopped program: " + programId.getProgram());
} catch (Exception e) {
System.err.println("Error stopping program: " + e.getMessage());
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap-client