CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap-client

CDAP Java Client library providing programmatic APIs for interacting with the CDAP platform

Pending
Overview
Eval results
Files

schedule-management.mddocs/

Schedule Management

The ScheduleClient provides comprehensive schedule creation, management, and workflow scheduling operations. Schedules enable time-based and data-driven execution of workflows and other programs in CDAP.

ScheduleClient

public class ScheduleClient {
    // Constructors
    public ScheduleClient(ClientConfig config);
    public ScheduleClient(ClientConfig config, RESTClient restClient);
    
    // Schedule management methods
    public void add(ScheduleId scheduleId, ScheduleDetail detail);
    public void update(ScheduleId scheduleId, ScheduleDetail detail);
    public List<ScheduleDetail> listSchedules(WorkflowId workflow);
    public List<ScheduledRuntime> nextRuntimes(WorkflowId workflow);
    public void suspend(ScheduleId scheduleId);
    public void resume(ScheduleId scheduleId);
    public void delete(ScheduleId scheduleId);
    public String getStatus(ScheduleId scheduleId);
    public void reEnableSuspendedSchedules(NamespaceId namespaceId, long startTimeMillis, long endTimeMillis);
    
    // Static utility methods
    public static String getEncodedScheduleName(String scheduleName);
}

Schedule Types and Configuration

public class ScheduleDetail {
    public String getName();
    public String getDescription();
    public ProgramId getProgram();
    public Map<String, String> getProperties();
    public Trigger getTrigger();
    public List<Constraint> getConstraints();
    public long getTimeoutMillis();
    
    public static Builder builder();
    
    public static class Builder {
        public Builder setName(String name);
        public Builder setDescription(String description);
        public Builder setProgram(ProgramId program);
        public Builder setProperties(Map<String, String> properties);
        public Builder setTrigger(Trigger trigger);
        public Builder setConstraints(List<Constraint> constraints);
        public Builder setTimeoutMillis(long timeoutMillis);
        public ScheduleDetail build();
    }
}

public class ScheduleId {
    public static ScheduleId of(ApplicationId application, String schedule);
    public ApplicationId getApplication();
    public String getSchedule();
}

public class WorkflowId {
    public static WorkflowId of(ApplicationId application, String workflow);
    public ApplicationId getApplication();
    public String getWorkflow();
}

public class ScheduledRuntime {
    public long getTime();
    public Map<String, String> getArguments();
}

Trigger Types

// Time-based triggers
public class TimeTrigger implements Trigger {
    public TimeTrigger(String cronExpression);
    public String getCronExpression();
}

// Data-based triggers  
public class PartitionTrigger implements Trigger {
    public PartitionTrigger(DatasetId dataset, int numPartitions);
    public DatasetId getDataset();
    public int getNumPartitions();
}

// Program status triggers
public class ProgramStatusTrigger implements Trigger {
    public ProgramStatusTrigger(ProgramId program, ProgramStatus... expectedStatuses);
    public ProgramId getProgram();
    public Set<ProgramStatus> getExpectedStatuses();
}

// Composite triggers
public class AndTrigger implements Trigger {
    public AndTrigger(Trigger... triggers);
    public List<Trigger> getTriggers();
}

public class OrTrigger implements Trigger {
    public OrTrigger(Trigger... triggers);
    public List<Trigger> getTriggers();
}

Schedule Creation and Management

Time-Based Schedules

// Create daily schedule
ApplicationId appId = ApplicationId.of(namespace, "data-processing", "1.0.0");
WorkflowId workflowId = WorkflowId.of(appId, "daily-etl");
ScheduleId scheduleId = ScheduleId.of(appId, "daily-etl-schedule");

// Cron expression for daily execution at 2 AM
TimeTrigger dailyTrigger = new TimeTrigger("0 2 * * *");

ScheduleDetail dailySchedule = ScheduleDetail.builder()
    .setName("daily-etl-schedule")
    .setDescription("Daily ETL processing at 2 AM")
    .setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "daily-etl"))
    .setTrigger(dailyTrigger)
    .setProperties(Map.of(
        "input.path", "/data/daily/",
        "output.path", "/processed/daily/",
        "retention.days", "30"
    ))
    .setTimeoutMillis(TimeUnit.HOURS.toMillis(4)) // 4 hour timeout
    .build();

scheduleClient.add(scheduleId, dailySchedule);
System.out.println("Created daily schedule: " + scheduleId.getSchedule());

// Create hourly schedule
TimeTrigger hourlyTrigger = new TimeTrigger("0 * * * *"); // Every hour
ScheduleId hourlyScheduleId = ScheduleId.of(appId, "hourly-aggregation");

ScheduleDetail hourlySchedule = ScheduleDetail.builder()
    .setName("hourly-aggregation")
    .setDescription("Hourly data aggregation")
    .setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "aggregation-workflow"))
    .setTrigger(hourlyTrigger)
    .setTimeoutMillis(TimeUnit.MINUTES.toMillis(30))
    .build();

scheduleClient.add(hourlyScheduleId, hourlySchedule);

Advanced Cron Schedules

// Weekly schedule (Sundays at 3 AM)
TimeTrigger weeklyTrigger = new TimeTrigger("0 3 * * 0");

// Monthly schedule (1st day of month at midnight)  
TimeTrigger monthlyTrigger = new TimeTrigger("0 0 1 * *");

// Business hours only (9 AM to 5 PM, Monday to Friday)
TimeTrigger businessHoursTrigger = new TimeTrigger("0 9-17 * * 1-5");

// Multiple times per day (6 AM, 12 PM, 6 PM)
TimeTrigger multipleTrigger = new TimeTrigger("0 6,12,18 * * *");

// Custom schedule with complex cron expression
ScheduleDetail customSchedule = ScheduleDetail.builder()
    .setName("business-hours-processing")
    .setDescription("Process data every 2 hours during business days")
    .setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "business-workflow"))
    .setTrigger(new TimeTrigger("0 */2 9-17 * * 1-5")) // Every 2 hours, 9-5, Mon-Fri
    .setProperties(Map.of("environment", "production"))
    .build();

ScheduleId customScheduleId = ScheduleId.of(appId, "business-hours-schedule");
scheduleClient.add(customScheduleId, customSchedule);

Data-Driven Schedules

// Partition-based trigger
DatasetId inputDataset = DatasetId.of(namespace, "raw-events");
PartitionTrigger partitionTrigger = new PartitionTrigger(inputDataset, 5); // Wait for 5 partitions

ScheduleDetail partitionSchedule = ScheduleDetail.builder()
    .setName("partition-driven-processing")
    .setDescription("Process data when 5 new partitions are available")
    .setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "batch-processor"))
    .setTrigger(partitionTrigger)
    .setProperties(Map.of(
        "source.dataset", "raw-events",
        "batch.size", "5"
    ))
    .build();

ScheduleId partitionScheduleId = ScheduleId.of(appId, "partition-driven-schedule");
scheduleClient.add(partitionScheduleId, partitionSchedule);

// Program status trigger
ProgramId upstreamProgram = ProgramId.of(appId, ProgramType.WORKFLOW, "data-ingestion");
ProgramStatusTrigger statusTrigger = new ProgramStatusTrigger(upstreamProgram, ProgramStatus.COMPLETED);

ScheduleDetail statusSchedule = ScheduleDetail.builder()
    .setName("downstream-processing")
    .setDescription("Start when upstream data ingestion completes")
    .setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "data-transformation"))
    .setTrigger(statusTrigger)
    .build();

ScheduleId statusScheduleId = ScheduleId.of(appId, "downstream-schedule");
scheduleClient.add(statusScheduleId, statusSchedule);

Composite Triggers

// AND trigger - both conditions must be met
TimeTrigger nightlyTrigger = new TimeTrigger("0 1 * * *"); // 1 AM daily
PartitionTrigger dataTrigger = new PartitionTrigger(inputDataset, 1); // At least 1 partition

AndTrigger compositeTrigger = new AndTrigger(nightlyTrigger, dataTrigger);

ScheduleDetail compositeSchedule = ScheduleDetail.builder()
    .setName("nightly-data-processing")
    .setDescription("Process daily at 1 AM if data is available")
    .setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "nightly-batch"))
    .setTrigger(compositeTrigger)
    .build();

ScheduleId compositeScheduleId = ScheduleId.of(appId, "composite-schedule");
scheduleClient.add(compositeScheduleId, compositeSchedule);

// OR trigger - either condition can trigger execution
ProgramId program1 = ProgramId.of(appId, ProgramType.WORKFLOW, "source-a");
ProgramId program2 = ProgramId.of(appId, ProgramType.WORKFLOW, "source-b");

ProgramStatusTrigger trigger1 = new ProgramStatusTrigger(program1, ProgramStatus.COMPLETED);
ProgramStatusTrigger trigger2 = new ProgramStatusTrigger(program2, ProgramStatus.COMPLETED);

OrTrigger orTrigger = new OrTrigger(trigger1, trigger2);

ScheduleDetail orSchedule = ScheduleDetail.builder()
    .setName("multi-source-processing")
    .setDescription("Process when either source A or B completes")
    .setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "merge-processor"))
    .setTrigger(orTrigger)
    .build();

Schedule Control Operations

Schedule Lifecycle

// Suspend schedule (pause execution)
scheduleClient.suspend(scheduleId);
System.out.println("Schedule suspended: " + scheduleId.getSchedule());

// Resume schedule
scheduleClient.resume(scheduleId);
System.out.println("Schedule resumed: " + scheduleId.getSchedule());

// Check schedule status
String status = scheduleClient.getStatus(scheduleId);
System.out.println("Schedule status: " + status);

// Delete schedule
scheduleClient.delete(scheduleId);
System.out.println("Schedule deleted: " + scheduleId.getSchedule());

Schedule Updates

// Update existing schedule
ScheduleDetail updatedSchedule = ScheduleDetail.builder()
    .setName("daily-etl-schedule")
    .setDescription("Updated: Daily ETL processing at 3 AM") // Changed time
    .setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "daily-etl"))
    .setTrigger(new TimeTrigger("0 3 * * *")) // Changed from 2 AM to 3 AM
    .setProperties(Map.of(
        "input.path", "/data/daily/",
        "output.path", "/processed/daily/",
        "retention.days", "45", // Extended retention
        "compression", "true"   // Added compression
    ))
    .setTimeoutMillis(TimeUnit.HOURS.toMillis(6)) // Extended timeout
    .build();

scheduleClient.update(scheduleId, updatedSchedule);
System.out.println("Schedule updated: " + scheduleId.getSchedule());

Schedule Information and Monitoring

Schedule Listing

// List all schedules for a workflow
List<ScheduleDetail> schedules = scheduleClient.listSchedules(workflowId);
System.out.println("Schedules for workflow " + workflowId.getWorkflow() + ":");

for (ScheduleDetail schedule : schedules) {
    System.out.println("- " + schedule.getName());
    System.out.println("  Description: " + schedule.getDescription());
    System.out.println("  Trigger: " + schedule.getTrigger().getClass().getSimpleName());
    System.out.println("  Properties: " + schedule.getProperties());
    System.out.println("  Timeout: " + schedule.getTimeoutMillis() + " ms");
    
    if (!schedule.getConstraints().isEmpty()) {
        System.out.println("  Constraints: " + schedule.getConstraints().size());
    }
}

Next Runtime Prediction

// Get next scheduled runtimes
List<ScheduledRuntime> nextRuntimes = scheduleClient.nextRuntimes(workflowId);
System.out.println("Next scheduled runs for " + workflowId.getWorkflow() + ":");

for (ScheduledRuntime runtime : nextRuntimes) {
    Date nextRun = new Date(runtime.getTime());
    System.out.println("- " + nextRun + " with arguments: " + runtime.getArguments());
}

// Display next runs in a readable format
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (int i = 0; i < Math.min(5, nextRuntimes.size()); i++) {
    ScheduledRuntime runtime = nextRuntimes.get(i);
    System.out.println((i + 1) + ". " + formatter.format(new Date(runtime.getTime())));
}

Advanced Schedule Management

Bulk Schedule Operations

// Create multiple related schedules
public void createDataPipelineSchedules(ApplicationId appId) {
    List<ScheduleCreationRequest> schedules = List.of(
        new ScheduleCreationRequest("data-ingestion", "0 1 * * *", "Nightly data ingestion"),
        new ScheduleCreationRequest("data-validation", "0 2 * * *", "Data quality validation"),
        new ScheduleCreationRequest("data-transformation", "0 3 * * *", "Data transformation"),
        new ScheduleCreationRequest("data-export", "0 5 * * *", "Export processed data")
    );
    
    for (ScheduleCreationRequest request : schedules) {
        try {
            ScheduleId scheduleId = ScheduleId.of(appId, request.name);
            WorkflowId workflowId = WorkflowId.of(appId, request.name);
            
            ScheduleDetail schedule = ScheduleDetail.builder()
                .setName(request.name)
                .setDescription(request.description)
                .setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, request.name))
                .setTrigger(new TimeTrigger(request.cronExpression))
                .setTimeoutMillis(TimeUnit.HOURS.toMillis(2))
                .build();
            
            scheduleClient.add(scheduleId, schedule);
            System.out.println("Created schedule: " + request.name);
            
        } catch (Exception e) {
            System.err.println("Failed to create schedule " + request.name + ": " + e.getMessage());
        }
    }
}

private static class ScheduleCreationRequest {
    String name, cronExpression, description;
    
    ScheduleCreationRequest(String name, String cronExpression, String description) {
        this.name = name;
        this.cronExpression = cronExpression;
        this.description = description;
    }
}

Schedule Constraints and Policies

// Schedule with constraints
List<Constraint> constraints = List.of(
    new ConcurrencyConstraint(1), // Only one instance can run at a time
    new DelayConstraint(TimeUnit.MINUTES.toMillis(5)), // 5 minute delay between runs
    new LastRunConstraint(TimeUnit.HOURS.toMillis(23)) // Don't run if last run was within 23 hours
);

ScheduleDetail constrainedSchedule = ScheduleDetail.builder()
    .setName("constrained-processing")
    .setDescription("Processing with execution constraints")
    .setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "heavy-processing"))
    .setTrigger(new TimeTrigger("0 */6 * * *")) // Every 6 hours
    .setConstraints(constraints)
    .setTimeoutMillis(TimeUnit.HOURS.toMillis(5))
    .build();

ScheduleId constrainedScheduleId = ScheduleId.of(appId, "constrained-schedule");
scheduleClient.add(constrainedScheduleId, constrainedSchedule);

Schedule Recovery and Maintenance

// Re-enable suspended schedules after maintenance window
public void performScheduleMaintenance(NamespaceId namespace) {
    long maintenanceStart = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2);
    long maintenanceEnd = System.currentTimeMillis();
    
    try {
        // Re-enable schedules that were suspended during maintenance
        scheduleClient.reEnableSuspendedSchedules(namespace, maintenanceStart, maintenanceEnd);
        System.out.println("Re-enabled schedules suspended during maintenance window");
        
    } catch (Exception e) {
        System.err.println("Error during schedule maintenance: " + e.getMessage());
    }
}

// Schedule health check
public void checkScheduleHealth(List<ScheduleId> criticalSchedules) {
    System.out.println("=== Schedule Health Check ===");
    
    for (ScheduleId scheduleId : criticalSchedules) {
        try {
            String status = scheduleClient.getStatus(scheduleId);
            System.out.println(scheduleId.getSchedule() + ": " + status);
            
            if ("SUSPENDED".equals(status)) {
                System.out.println("  WARNING: Critical schedule is suspended!");
            }
            
            // Get next runtime information
            WorkflowId workflowId = WorkflowId.of(scheduleId.getApplication(), 
                extractWorkflowName(scheduleId.getSchedule()));
            List<ScheduledRuntime> nextRuns = scheduleClient.nextRuntimes(workflowId);
            
            if (!nextRuns.isEmpty()) {
                Date nextRun = new Date(nextRuns.get(0).getTime());
                System.out.println("  Next run: " + nextRun);
            } else {
                System.out.println("  WARNING: No upcoming runs scheduled!");
            }
            
        } catch (Exception e) {
            System.err.println(scheduleId.getSchedule() + ": ERROR - " + e.getMessage());
        }
    }
}

private String extractWorkflowName(String scheduleName) {
    // Extract workflow name from schedule name (implement based on naming convention)
    return scheduleName.replace("-schedule", "");
}

Schedule URL Encoding

// Handle schedule names with special characters
String scheduleNameWithSpaces = "Daily ETL Processing";
String encodedName = ScheduleClient.getEncodedScheduleName(scheduleNameWithSpaces);
System.out.println("Encoded schedule name: " + encodedName);

// Use encoded name for schedule ID
ScheduleId encodedScheduleId = ScheduleId.of(appId, encodedName);

Error Handling

Schedule management operations may throw these exceptions:

  • ScheduleNotFoundException: Schedule does not exist
  • ScheduleAlreadyExistsException: Schedule already exists during creation
  • InvalidScheduleException: Invalid schedule configuration
  • WorkflowNotFoundException: Referenced workflow does not exist
  • BadRequestException: Invalid schedule parameters
  • UnauthenticatedException: Authentication required
  • UnauthorizedException: Insufficient permissions
try {
    scheduleClient.add(scheduleId, scheduleDetail);
    System.out.println("Schedule created successfully");
} catch (ScheduleAlreadyExistsException e) {
    System.err.println("Schedule already exists: " + scheduleId.getSchedule());
} catch (WorkflowNotFoundException e) {
    System.err.println("Referenced workflow not found: " + e.getMessage());
} catch (InvalidScheduleException e) {
    System.err.println("Invalid schedule configuration: " + e.getMessage());
} catch (IOException e) {
    System.err.println("Network error: " + e.getMessage());
}

Best Practices

  1. Naming Conventions: Use clear, descriptive names for schedules
  2. Resource Management: Set appropriate timeouts for long-running workflows
  3. Dependency Management: Use program status triggers for workflow dependencies
  4. Error Handling: Implement proper error handling and retry logic
  5. Monitoring: Regularly monitor schedule status and execution history
  6. Maintenance: Plan for schedule suspension during maintenance windows
// Good: Comprehensive schedule management with proper error handling
public class ScheduleManager {
    private final ScheduleClient scheduleClient;
    
    public ScheduleManager(ScheduleClient scheduleClient) {
        this.scheduleClient = scheduleClient;
    }
    
    public void createScheduleWithValidation(ScheduleId scheduleId, ScheduleDetail scheduleDetail) {
        try {
            // Validate schedule configuration
            validateScheduleDetail(scheduleDetail);
            
            // Check if schedule already exists
            try {
                String existingStatus = scheduleClient.getStatus(scheduleId);
                System.out.println("Schedule already exists with status: " + existingStatus);
                
                // Update instead of create
                scheduleClient.update(scheduleId, scheduleDetail);
                System.out.println("Updated existing schedule: " + scheduleId.getSchedule());
                return;
                
            } catch (ScheduleNotFoundException e) {
                // Schedule doesn't exist, proceed with creation
            }
            
            // Create the schedule
            scheduleClient.add(scheduleId, scheduleDetail);
            System.out.println("Created schedule: " + scheduleId.getSchedule());
            
            // Verify creation
            String status = scheduleClient.getStatus(scheduleId);
            System.out.println("Schedule status after creation: " + status);
            
        } catch (Exception e) {
            System.err.println("Failed to create/update schedule " + scheduleId.getSchedule() + ": " + e.getMessage());
            throw new RuntimeException("Schedule operation failed", e);
        }
    }
    
    private void validateScheduleDetail(ScheduleDetail scheduleDetail) {
        if (scheduleDetail.getName() == null || scheduleDetail.getName().trim().isEmpty()) {
            throw new IllegalArgumentException("Schedule name cannot be empty");
        }
        
        if (scheduleDetail.getProgram() == null) {
            throw new IllegalArgumentException("Schedule must reference a program");
        }
        
        if (scheduleDetail.getTrigger() == null) {
            throw new IllegalArgumentException("Schedule must have a trigger");
        }
        
        // Validate timeout
        if (scheduleDetail.getTimeoutMillis() <= 0) {
            throw new IllegalArgumentException("Schedule timeout must be positive");
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-cdap-cdap--cdap-client

docs

application-management.md

artifact-management.md

configuration.md

data-operations.md

dataset-operations.md

index.md

metrics-monitoring.md

program-control.md

schedule-management.md

security-administration.md

service-management.md

tile.json