CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap-client

CDAP Java Client library providing programmatic APIs for interacting with the CDAP platform

Pending
Overview
Eval results
Files

program-control.mddocs/

Program Control and Monitoring

The ProgramClient provides comprehensive program lifecycle control including start, stop, restart operations, status monitoring, instance management, and run history tracking. Programs are the executable components within CDAP applications.

ProgramClient

public class ProgramClient {
    // Constructors
    public ProgramClient(ClientConfig config);
    public ProgramClient(ClientConfig config, RESTClient restClient);
    public ProgramClient(ClientConfig config, RESTClient restClient, ApplicationClient applicationClient);
    
    // Program control methods
    public void start(ProgramId program);
    public void start(ProgramId program, boolean debug);
    public void start(ProgramId program, boolean debug, Map<String, String> runtimeArgs);
    public List<BatchProgramResult> start(NamespaceId namespace, List<BatchProgramStart> programs);
    public void restart(ApplicationId applicationId, long startTimeSeconds, long endTimeSeconds);
    public void stop(ProgramId programId);
    public List<BatchProgramResult> stop(NamespaceId namespace, List<BatchProgram> programs);
    public void stopAll(NamespaceId namespace);
    
    // Program status methods
    public String getStatus(ProgramId programId);
    public List<BatchProgramStatus> getStatus(NamespaceId namespace, List<BatchProgram> programs);
    public void waitForStatus(ProgramId program, ProgramStatus status, long timeout, TimeUnit timeoutUnit);
    public DistributedProgramLiveInfo getLiveInfo(ProgramId program);
    
    // Instance management methods
    public int getWorkerInstances(ProgramId worker);
    public void setWorkerInstances(ProgramId worker, int instances);
    public int getServiceInstances(ServiceId service);
    public void setServiceInstances(ServiceId service, int instances);
    
    // Program data methods
    public List<RunRecord> getProgramRuns(ProgramId program, String state, long startTime, long endTime, int limit);
    public List<RunRecord> getAllProgramRuns(ProgramId program, long startTime, long endTime, int limit);
    public String getProgramLogs(ProgramId program, long start, long stop);
    public Map<String, String> getRuntimeArgs(ProgramId program);
    public void setRuntimeArgs(ProgramId program, Map<String, String> runtimeArgs);
}

Program Types and Status

public enum ProgramType {
    MAPREDUCE, WORKFLOW, SERVICE, SPARK, WORKER
}

public enum ProgramStatus {
    PENDING, STARTING, RUNNING, SUSPENDED, RESUMING, COMPLETED, FAILED, KILLED, STOPPED
}

public class ProgramId {
    public static ProgramId of(ApplicationId application, ProgramType type, String program);
    public ApplicationId getApplication();
    public ProgramType getType();
    public String getProgram();
}

public class RunRecord {
    public String getPid();
    public long getStartTs();
    public long getRunTs();
    public long getStopTs();
    public long getSuspendTs();
    public long getResumeTs();
    public String getStatus();
    public Map<String, String> getProperties();
    public ProgramRunCluster getCluster();
}

public class DistributedProgramLiveInfo {
    public String getStatus();
    public Map<String, Integer> getContainers();
    public String getYarnAppId();
}

Program Control Operations

Basic Program Control

// Start a program
ApplicationId appId = ApplicationId.of(namespace, "data-processing-app", "1.0.0");
ProgramId workflowId = ProgramId.of(appId, ProgramType.WORKFLOW, "data-pipeline");

programClient.start(workflowId);
System.out.println("Started workflow: " + workflowId.getProgram());

// Start with debug mode enabled
programClient.start(workflowId, true);

// Start with runtime arguments
Map<String, String> runtimeArgs = Map.of(
    "input.path", "/data/input/2023/12/01",
    "output.path", "/data/output/processed",
    "batch.size", "1000",
    "num.partitions", "10"
);
programClient.start(workflowId, false, runtimeArgs);

// Stop a program
programClient.stop(workflowId);
System.out.println("Stopped workflow: " + workflowId.getProgram());

Batch Operations

// Start multiple programs at once
List<BatchProgramStart> programsToStart = List.of(
    new BatchProgramStart(
        ProgramId.of(appId, ProgramType.SERVICE, "data-service"),
        Map.of("port", "8080", "threads", "10")
    ),
    new BatchProgramStart(
        ProgramId.of(appId, ProgramType.WORKFLOW, "etl-workflow"),
        Map.of("schedule", "daily")
    )
);

List<BatchProgramResult> startResults = programClient.start(namespace, programsToStart);
for (BatchProgramResult result : startResults) {
    if (result.getError() != null) {
        System.err.println("Failed to start " + result.getProgramId() + ": " + result.getError());
    } else {
        System.out.println("Started " + result.getProgramId());
    }
}

// Stop multiple programs
List<BatchProgram> programsToStop = List.of(
    new BatchProgram(ProgramId.of(appId, ProgramType.SERVICE, "data-service")),
    new BatchProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "etl-workflow"))
);

List<BatchProgramResult> stopResults = programClient.stop(namespace, programsToStop);

// Stop all programs in namespace (use with caution!)
programClient.stopAll(namespace);

Application-Level Operations

// Restart all programs in application within time range
long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(24); // Last 24 hours
long endTime = System.currentTimeMillis();

programClient.restart(appId, startTime / 1000, endTime / 1000);
System.out.println("Restarted all programs in application: " + appId.getApplication());

Program Status Monitoring

Status Checking

// Get current program status
String status = programClient.getStatus(workflowId);
System.out.println("Workflow status: " + status);

// Get status of multiple programs
List<BatchProgram> programs = List.of(
    new BatchProgram(ProgramId.of(appId, ProgramType.SERVICE, "data-service")),
    new BatchProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "etl-workflow"))
);

List<BatchProgramStatus> statuses = programClient.getStatus(namespace, programs);
for (BatchProgramStatus programStatus : statuses) {
    System.out.println(programStatus.getProgramId() + ": " + programStatus.getStatus());
}

Wait for Status

// Wait for program to reach specific status
try {
    programClient.start(workflowId);
    programClient.waitForStatus(workflowId, ProgramStatus.RUNNING, 60, TimeUnit.SECONDS);
    System.out.println("Workflow is now running");
    
    // Wait for completion
    programClient.waitForStatus(workflowId, ProgramStatus.COMPLETED, 30, TimeUnit.MINUTES);
    System.out.println("Workflow completed successfully");
    
} catch (TimeoutException e) {
    System.err.println("Timeout waiting for status change");
} catch (InterruptedException e) {
    System.err.println("Interrupted while waiting");
}

Live Information

// Get detailed live information about running program
DistributedProgramLiveInfo liveInfo = programClient.getLiveInfo(workflowId);
System.out.println("Status: " + liveInfo.getStatus());
System.out.println("YARN Application ID: " + liveInfo.getYarnAppId());
System.out.println("Containers: " + liveInfo.getContainers());

// Monitor container information
Map<String, Integer> containers = liveInfo.getContainers();
for (Map.Entry<String, Integer> entry : containers.entrySet()) {
    System.out.println("Container " + entry.getKey() + ": " + entry.getValue() + " instances");
}

Instance Management

Worker Instance Management

// Get current worker instances
ProgramId workerId = ProgramId.of(appId, ProgramType.WORKER, "data-processor");
int currentInstances = programClient.getWorkerInstances(workerId);
System.out.println("Current worker instances: " + currentInstances);

// Scale worker instances
int newInstances = 5;
programClient.setWorkerInstances(workerId, newInstances);
System.out.println("Scaled worker to " + newInstances + " instances");

// Dynamic scaling based on load
int targetInstances = calculateOptimalInstances(); // Your scaling logic
if (currentInstances != targetInstances) {
    programClient.setWorkerInstances(workerId, targetInstances);
    System.out.println("Scaled from " + currentInstances + " to " + targetInstances + " instances");
}

Service Instance Management

// Get current service instances
ServiceId serviceId = ServiceId.of(appId, "api-service");
int currentServiceInstances = programClient.getServiceInstances(serviceId);
System.out.println("Current service instances: " + currentServiceInstances);

// Scale service instances
int newServiceInstances = 3;
programClient.setServiceInstances(serviceId, newServiceInstances);
System.out.println("Scaled service to " + newServiceInstances + " instances");

Runtime Arguments

Managing Runtime Arguments

// Get current runtime arguments
Map<String, String> currentArgs = programClient.getRuntimeArgs(workflowId);
System.out.println("Current runtime arguments: " + currentArgs);

// Set new runtime arguments
Map<String, String> newArgs = Map.of(
    "input.path", "/data/input/latest",
    "output.path", "/data/output/" + System.currentTimeMillis(),
    "batch.size", "2000",
    "enable.compression", "true",
    "log.level", "INFO"
);
programClient.setRuntimeArgs(workflowId, newArgs);
System.out.println("Updated runtime arguments");

// Merge with existing arguments
Map<String, String> mergedArgs = new HashMap<>(currentArgs);
mergedArgs.putAll(Map.of(
    "new.parameter", "new-value",
    "updated.parameter", "updated-value"
));
programClient.setRuntimeArgs(workflowId, mergedArgs);

Run History and Logs

Program Run History

// Get recent program runs
long endTime = System.currentTimeMillis();
long startTime = endTime - TimeUnit.DAYS.toMillis(7); // Last 7 days
int limit = 50;

List<RunRecord> runs = programClient.getProgramRuns(workflowId, "ALL", startTime, endTime, limit);
System.out.println("Found " + runs.size() + " runs in the last 7 days");

for (RunRecord run : runs) {
    System.out.println("Run ID: " + run.getPid());
    System.out.println("  Status: " + run.getStatus());
    System.out.println("  Start: " + new Date(run.getStartTs() * 1000));
    System.out.println("  Duration: " + (run.getStopTs() - run.getStartTs()) + " seconds");
    System.out.println("  Properties: " + run.getProperties());
}

// Get runs by status
List<RunRecord> failedRuns = programClient.getProgramRuns(workflowId, "FAILED", startTime, endTime, limit);
List<RunRecord> completedRuns = programClient.getProgramRuns(workflowId, "COMPLETED", startTime, endTime, limit);

// Get all runs regardless of status
List<RunRecord> allRuns = programClient.getAllProgramRuns(workflowId, startTime, endTime, limit);

Program Logs

// Get program logs for a time range
long logStart = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1); // Last hour
long logStop = System.currentTimeMillis();

String logs = programClient.getProgramLogs(workflowId, logStart, logStop);
System.out.println("Program logs:");
System.out.println(logs);

// Parse and filter logs
String[] logLines = logs.split("\n");
for (String line : logLines) {
    if (line.contains("ERROR")) {
        System.err.println("Error found: " + line);
    }
}

Advanced Program Management

Status-Based Workflow

// Complete workflow with status monitoring
public void runWorkflowWithMonitoring(ProgramId workflowId, Map<String, String> args) {
    try {
        // Set runtime arguments
        programClient.setRuntimeArgs(workflowId, args);
        
        // Start the workflow
        programClient.start(workflowId);
        System.out.println("Started workflow: " + workflowId.getProgram());
        
        // Wait for running status
        programClient.waitForStatus(workflowId, ProgramStatus.RUNNING, 2, TimeUnit.MINUTES);
        System.out.println("Workflow is running");
        
        // Monitor progress
        String status;
        do {
            Thread.sleep(30000); // Check every 30 seconds
            status = programClient.getStatus(workflowId);
            System.out.println("Current status: " + status);
            
            if (status.equals("RUNNING")) {
                DistributedProgramLiveInfo liveInfo = programClient.getLiveInfo(workflowId);
                System.out.println("YARN App ID: " + liveInfo.getYarnAppId());
            }
        } while (status.equals("RUNNING") || status.equals("STARTING"));
        
        // Check final status
        if (status.equals("COMPLETED")) {
            System.out.println("Workflow completed successfully");
        } else {
            System.err.println("Workflow failed with status: " + status);
            
            // Get recent logs to diagnose failure
            long now = System.currentTimeMillis();
            String errorLogs = programClient.getProgramLogs(workflowId, now - 600000, now);
            System.err.println("Recent logs:\n" + errorLogs);
        }
        
    } catch (Exception e) {
        System.err.println("Error managing workflow: " + e.getMessage());
        try {
            programClient.stop(workflowId);
        } catch (Exception stopEx) {
            System.err.println("Error stopping workflow: " + stopEx.getMessage());
        }
    }
}

Auto-scaling Based on Metrics

// Auto-scaling example for worker programs
public void autoScaleWorker(ProgramId workerId) {
    try {
        int currentInstances = programClient.getWorkerInstances(workerId);
        
        // Get program status and live info
        String status = programClient.getStatus(workerId);
        if (!status.equals("RUNNING")) {
            return; // Don't scale if not running
        }
        
        DistributedProgramLiveInfo liveInfo = programClient.getLiveInfo(workerId);
        
        // Your scaling logic based on metrics
        // This is a simplified example
        int optimalInstances = calculateOptimalInstances(liveInfo);
        
        if (optimalInstances != currentInstances) {
            System.out.println("Scaling worker from " + currentInstances + " to " + optimalInstances);
            programClient.setWorkerInstances(workerId, optimalInstances);
            
            // Wait for scaling to take effect
            Thread.sleep(30000);
            
            // Verify scaling
            int newInstances = programClient.getWorkerInstances(workerId);
            System.out.println("Scaling completed. New instance count: " + newInstances);
        }
    } catch (Exception e) {
        System.err.println("Error during auto-scaling: " + e.getMessage());
    }
}

private int calculateOptimalInstances(DistributedProgramLiveInfo liveInfo) {
    // Implement your scaling logic based on:
    // - Container utilization
    // - Queue depths
    // - Processing rates
    // - Time of day
    // etc.
    return 3; // Simplified example
}

Error Handling

Program control operations may throw these exceptions:

  • ProgramNotFoundException: Program does not exist
  • NotRunningException: Program is not currently running (for stop operations)
  • AlreadyRunningException: Program is already running (for start operations)
  • BadRequestException: Invalid program state or parameters
  • UnauthenticatedException: Authentication required
  • UnauthorizedException: Insufficient permissions
try {
    programClient.start(workflowId);
} catch (AlreadyRunningException e) {
    System.out.println("Program is already running");
} catch (ProgramNotFoundException e) {
    System.err.println("Program not found: " + workflowId);
} catch (UnauthorizedException e) {
    System.err.println("No permission to start program: " + e.getMessage());
} catch (IOException e) {
    System.err.println("Network error: " + e.getMessage());
}

Best Practices

  1. Status Monitoring: Always check program status before performing operations
  2. Timeout Handling: Use appropriate timeouts for waitForStatus operations
  3. Resource Management: Monitor and manage instance counts based on workload
  4. Error Handling: Implement proper error handling and cleanup procedures
  5. Logging: Regularly check program logs for issues and performance insights
  6. Batch Operations: Use batch operations for managing multiple programs efficiently
// Good: Comprehensive program management with error handling
public class ProgramManager {
    private final ProgramClient programClient;
    
    public void safeStartProgram(ProgramId programId, Map<String, String> args) {
        try {
            // Check current status
            String status = programClient.getStatus(programId);
            if ("RUNNING".equals(status)) {
                System.out.println("Program already running: " + programId.getProgram());
                return;
            }
            
            // Set runtime arguments
            if (args != null && !args.isEmpty()) {
                programClient.setRuntimeArgs(programId, args);
            }
            
            // Start program
            programClient.start(programId);
            
            // Wait for startup with timeout
            programClient.waitForStatus(programId, ProgramStatus.RUNNING, 5, TimeUnit.MINUTES);
            
            System.out.println("Successfully started program: " + programId.getProgram());
            
        } catch (TimeoutException e) {
            System.err.println("Timeout starting program: " + programId.getProgram());
            safeStopProgram(programId); // Cleanup on timeout
        } catch (Exception e) {
            System.err.println("Error starting program: " + e.getMessage());
        }
    }
    
    public void safeStopProgram(ProgramId programId) {
        try {
            String status = programClient.getStatus(programId);
            if ("STOPPED".equals(status) || "COMPLETED".equals(status)) {
                System.out.println("Program already stopped: " + programId.getProgram());
                return;
            }
            
            programClient.stop(programId);
            programClient.waitForStatus(programId, ProgramStatus.STOPPED, 2, TimeUnit.MINUTES);
            
            System.out.println("Successfully stopped program: " + programId.getProgram());
            
        } catch (Exception e) {
            System.err.println("Error stopping program: " + e.getMessage());
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-cdap-cdap--cdap-client

docs

application-management.md

artifact-management.md

configuration.md

data-operations.md

dataset-operations.md

index.md

metrics-monitoring.md

program-control.md

schedule-management.md

security-administration.md

service-management.md

tile.json