CDAP Java Client library providing programmatic APIs for interacting with the CDAP platform
—
The ScheduleClient provides comprehensive schedule creation, management, and workflow scheduling operations. Schedules enable time-based and data-driven execution of workflows and other programs in CDAP.
public class ScheduleClient {
// Constructors
public ScheduleClient(ClientConfig config);
public ScheduleClient(ClientConfig config, RESTClient restClient);
// Schedule management methods
public void add(ScheduleId scheduleId, ScheduleDetail detail);
public void update(ScheduleId scheduleId, ScheduleDetail detail);
public List<ScheduleDetail> listSchedules(WorkflowId workflow);
public List<ScheduledRuntime> nextRuntimes(WorkflowId workflow);
public void suspend(ScheduleId scheduleId);
public void resume(ScheduleId scheduleId);
public void delete(ScheduleId scheduleId);
public String getStatus(ScheduleId scheduleId);
public void reEnableSuspendedSchedules(NamespaceId namespaceId, long startTimeMillis, long endTimeMillis);
// Static utility methods
public static String getEncodedScheduleName(String scheduleName);
}public class ScheduleDetail {
public String getName();
public String getDescription();
public ProgramId getProgram();
public Map<String, String> getProperties();
public Trigger getTrigger();
public List<Constraint> getConstraints();
public long getTimeoutMillis();
public static Builder builder();
public static class Builder {
public Builder setName(String name);
public Builder setDescription(String description);
public Builder setProgram(ProgramId program);
public Builder setProperties(Map<String, String> properties);
public Builder setTrigger(Trigger trigger);
public Builder setConstraints(List<Constraint> constraints);
public Builder setTimeoutMillis(long timeoutMillis);
public ScheduleDetail build();
}
}
public class ScheduleId {
public static ScheduleId of(ApplicationId application, String schedule);
public ApplicationId getApplication();
public String getSchedule();
}
public class WorkflowId {
public static WorkflowId of(ApplicationId application, String workflow);
public ApplicationId getApplication();
public String getWorkflow();
}
public class ScheduledRuntime {
public long getTime();
public Map<String, String> getArguments();
}// Time-based triggers
public class TimeTrigger implements Trigger {
public TimeTrigger(String cronExpression);
public String getCronExpression();
}
// Data-based triggers
public class PartitionTrigger implements Trigger {
public PartitionTrigger(DatasetId dataset, int numPartitions);
public DatasetId getDataset();
public int getNumPartitions();
}
// Program status triggers
public class ProgramStatusTrigger implements Trigger {
public ProgramStatusTrigger(ProgramId program, ProgramStatus... expectedStatuses);
public ProgramId getProgram();
public Set<ProgramStatus> getExpectedStatuses();
}
// Composite triggers
public class AndTrigger implements Trigger {
public AndTrigger(Trigger... triggers);
public List<Trigger> getTriggers();
}
public class OrTrigger implements Trigger {
public OrTrigger(Trigger... triggers);
public List<Trigger> getTriggers();
}// Create daily schedule
ApplicationId appId = ApplicationId.of(namespace, "data-processing", "1.0.0");
WorkflowId workflowId = WorkflowId.of(appId, "daily-etl");
ScheduleId scheduleId = ScheduleId.of(appId, "daily-etl-schedule");
// Cron expression for daily execution at 2 AM
TimeTrigger dailyTrigger = new TimeTrigger("0 2 * * *");
ScheduleDetail dailySchedule = ScheduleDetail.builder()
.setName("daily-etl-schedule")
.setDescription("Daily ETL processing at 2 AM")
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "daily-etl"))
.setTrigger(dailyTrigger)
.setProperties(Map.of(
"input.path", "/data/daily/",
"output.path", "/processed/daily/",
"retention.days", "30"
))
.setTimeoutMillis(TimeUnit.HOURS.toMillis(4)) // 4 hour timeout
.build();
scheduleClient.add(scheduleId, dailySchedule);
System.out.println("Created daily schedule: " + scheduleId.getSchedule());
// Create hourly schedule
TimeTrigger hourlyTrigger = new TimeTrigger("0 * * * *"); // Every hour
ScheduleId hourlyScheduleId = ScheduleId.of(appId, "hourly-aggregation");
ScheduleDetail hourlySchedule = ScheduleDetail.builder()
.setName("hourly-aggregation")
.setDescription("Hourly data aggregation")
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "aggregation-workflow"))
.setTrigger(hourlyTrigger)
.setTimeoutMillis(TimeUnit.MINUTES.toMillis(30))
.build();
scheduleClient.add(hourlyScheduleId, hourlySchedule);// Weekly schedule (Sundays at 3 AM)
TimeTrigger weeklyTrigger = new TimeTrigger("0 3 * * 0");
// Monthly schedule (1st day of month at midnight)
TimeTrigger monthlyTrigger = new TimeTrigger("0 0 1 * *");
// Business hours only (9 AM to 5 PM, Monday to Friday)
TimeTrigger businessHoursTrigger = new TimeTrigger("0 9-17 * * 1-5");
// Multiple times per day (6 AM, 12 PM, 6 PM)
TimeTrigger multipleTrigger = new TimeTrigger("0 6,12,18 * * *");
// Custom schedule with complex cron expression
ScheduleDetail customSchedule = ScheduleDetail.builder()
.setName("business-hours-processing")
.setDescription("Process data every 2 hours during business days")
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "business-workflow"))
.setTrigger(new TimeTrigger("0 */2 9-17 * * 1-5")) // Every 2 hours, 9-5, Mon-Fri
.setProperties(Map.of("environment", "production"))
.build();
ScheduleId customScheduleId = ScheduleId.of(appId, "business-hours-schedule");
scheduleClient.add(customScheduleId, customSchedule);// Partition-based trigger
DatasetId inputDataset = DatasetId.of(namespace, "raw-events");
PartitionTrigger partitionTrigger = new PartitionTrigger(inputDataset, 5); // Wait for 5 partitions
ScheduleDetail partitionSchedule = ScheduleDetail.builder()
.setName("partition-driven-processing")
.setDescription("Process data when 5 new partitions are available")
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "batch-processor"))
.setTrigger(partitionTrigger)
.setProperties(Map.of(
"source.dataset", "raw-events",
"batch.size", "5"
))
.build();
ScheduleId partitionScheduleId = ScheduleId.of(appId, "partition-driven-schedule");
scheduleClient.add(partitionScheduleId, partitionSchedule);
// Program status trigger
ProgramId upstreamProgram = ProgramId.of(appId, ProgramType.WORKFLOW, "data-ingestion");
ProgramStatusTrigger statusTrigger = new ProgramStatusTrigger(upstreamProgram, ProgramStatus.COMPLETED);
ScheduleDetail statusSchedule = ScheduleDetail.builder()
.setName("downstream-processing")
.setDescription("Start when upstream data ingestion completes")
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "data-transformation"))
.setTrigger(statusTrigger)
.build();
ScheduleId statusScheduleId = ScheduleId.of(appId, "downstream-schedule");
scheduleClient.add(statusScheduleId, statusSchedule);// AND trigger - both conditions must be met
TimeTrigger nightlyTrigger = new TimeTrigger("0 1 * * *"); // 1 AM daily
PartitionTrigger dataTrigger = new PartitionTrigger(inputDataset, 1); // At least 1 partition
AndTrigger compositeTrigger = new AndTrigger(nightlyTrigger, dataTrigger);
ScheduleDetail compositeSchedule = ScheduleDetail.builder()
.setName("nightly-data-processing")
.setDescription("Process daily at 1 AM if data is available")
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "nightly-batch"))
.setTrigger(compositeTrigger)
.build();
ScheduleId compositeScheduleId = ScheduleId.of(appId, "composite-schedule");
scheduleClient.add(compositeScheduleId, compositeSchedule);
// OR trigger - either condition can trigger execution
ProgramId program1 = ProgramId.of(appId, ProgramType.WORKFLOW, "source-a");
ProgramId program2 = ProgramId.of(appId, ProgramType.WORKFLOW, "source-b");
ProgramStatusTrigger trigger1 = new ProgramStatusTrigger(program1, ProgramStatus.COMPLETED);
ProgramStatusTrigger trigger2 = new ProgramStatusTrigger(program2, ProgramStatus.COMPLETED);
OrTrigger orTrigger = new OrTrigger(trigger1, trigger2);
ScheduleDetail orSchedule = ScheduleDetail.builder()
.setName("multi-source-processing")
.setDescription("Process when either source A or B completes")
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "merge-processor"))
.setTrigger(orTrigger)
.build();// Suspend schedule (pause execution)
scheduleClient.suspend(scheduleId);
System.out.println("Schedule suspended: " + scheduleId.getSchedule());
// Resume schedule
scheduleClient.resume(scheduleId);
System.out.println("Schedule resumed: " + scheduleId.getSchedule());
// Check schedule status
String status = scheduleClient.getStatus(scheduleId);
System.out.println("Schedule status: " + status);
// Delete schedule
scheduleClient.delete(scheduleId);
System.out.println("Schedule deleted: " + scheduleId.getSchedule());// Update existing schedule
ScheduleDetail updatedSchedule = ScheduleDetail.builder()
.setName("daily-etl-schedule")
.setDescription("Updated: Daily ETL processing at 3 AM") // Changed time
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "daily-etl"))
.setTrigger(new TimeTrigger("0 3 * * *")) // Changed from 2 AM to 3 AM
.setProperties(Map.of(
"input.path", "/data/daily/",
"output.path", "/processed/daily/",
"retention.days", "45", // Extended retention
"compression", "true" // Added compression
))
.setTimeoutMillis(TimeUnit.HOURS.toMillis(6)) // Extended timeout
.build();
scheduleClient.update(scheduleId, updatedSchedule);
System.out.println("Schedule updated: " + scheduleId.getSchedule());// List all schedules for a workflow
List<ScheduleDetail> schedules = scheduleClient.listSchedules(workflowId);
System.out.println("Schedules for workflow " + workflowId.getWorkflow() + ":");
for (ScheduleDetail schedule : schedules) {
System.out.println("- " + schedule.getName());
System.out.println(" Description: " + schedule.getDescription());
System.out.println(" Trigger: " + schedule.getTrigger().getClass().getSimpleName());
System.out.println(" Properties: " + schedule.getProperties());
System.out.println(" Timeout: " + schedule.getTimeoutMillis() + " ms");
if (!schedule.getConstraints().isEmpty()) {
System.out.println(" Constraints: " + schedule.getConstraints().size());
}
}// Get next scheduled runtimes
List<ScheduledRuntime> nextRuntimes = scheduleClient.nextRuntimes(workflowId);
System.out.println("Next scheduled runs for " + workflowId.getWorkflow() + ":");
for (ScheduledRuntime runtime : nextRuntimes) {
Date nextRun = new Date(runtime.getTime());
System.out.println("- " + nextRun + " with arguments: " + runtime.getArguments());
}
// Display next runs in a readable format
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (int i = 0; i < Math.min(5, nextRuntimes.size()); i++) {
ScheduledRuntime runtime = nextRuntimes.get(i);
System.out.println((i + 1) + ". " + formatter.format(new Date(runtime.getTime())));
}// Create multiple related schedules
public void createDataPipelineSchedules(ApplicationId appId) {
List<ScheduleCreationRequest> schedules = List.of(
new ScheduleCreationRequest("data-ingestion", "0 1 * * *", "Nightly data ingestion"),
new ScheduleCreationRequest("data-validation", "0 2 * * *", "Data quality validation"),
new ScheduleCreationRequest("data-transformation", "0 3 * * *", "Data transformation"),
new ScheduleCreationRequest("data-export", "0 5 * * *", "Export processed data")
);
for (ScheduleCreationRequest request : schedules) {
try {
ScheduleId scheduleId = ScheduleId.of(appId, request.name);
WorkflowId workflowId = WorkflowId.of(appId, request.name);
ScheduleDetail schedule = ScheduleDetail.builder()
.setName(request.name)
.setDescription(request.description)
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, request.name))
.setTrigger(new TimeTrigger(request.cronExpression))
.setTimeoutMillis(TimeUnit.HOURS.toMillis(2))
.build();
scheduleClient.add(scheduleId, schedule);
System.out.println("Created schedule: " + request.name);
} catch (Exception e) {
System.err.println("Failed to create schedule " + request.name + ": " + e.getMessage());
}
}
}
private static class ScheduleCreationRequest {
String name, cronExpression, description;
ScheduleCreationRequest(String name, String cronExpression, String description) {
this.name = name;
this.cronExpression = cronExpression;
this.description = description;
}
}// Schedule with constraints
List<Constraint> constraints = List.of(
new ConcurrencyConstraint(1), // Only one instance can run at a time
new DelayConstraint(TimeUnit.MINUTES.toMillis(5)), // 5 minute delay between runs
new LastRunConstraint(TimeUnit.HOURS.toMillis(23)) // Don't run if last run was within 23 hours
);
ScheduleDetail constrainedSchedule = ScheduleDetail.builder()
.setName("constrained-processing")
.setDescription("Processing with execution constraints")
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "heavy-processing"))
.setTrigger(new TimeTrigger("0 */6 * * *")) // Every 6 hours
.setConstraints(constraints)
.setTimeoutMillis(TimeUnit.HOURS.toMillis(5))
.build();
ScheduleId constrainedScheduleId = ScheduleId.of(appId, "constrained-schedule");
scheduleClient.add(constrainedScheduleId, constrainedSchedule);// Re-enable suspended schedules after maintenance window
public void performScheduleMaintenance(NamespaceId namespace) {
long maintenanceStart = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2);
long maintenanceEnd = System.currentTimeMillis();
try {
// Re-enable schedules that were suspended during maintenance
scheduleClient.reEnableSuspendedSchedules(namespace, maintenanceStart, maintenanceEnd);
System.out.println("Re-enabled schedules suspended during maintenance window");
} catch (Exception e) {
System.err.println("Error during schedule maintenance: " + e.getMessage());
}
}
// Schedule health check
public void checkScheduleHealth(List<ScheduleId> criticalSchedules) {
System.out.println("=== Schedule Health Check ===");
for (ScheduleId scheduleId : criticalSchedules) {
try {
String status = scheduleClient.getStatus(scheduleId);
System.out.println(scheduleId.getSchedule() + ": " + status);
if ("SUSPENDED".equals(status)) {
System.out.println(" WARNING: Critical schedule is suspended!");
}
// Get next runtime information
WorkflowId workflowId = WorkflowId.of(scheduleId.getApplication(),
extractWorkflowName(scheduleId.getSchedule()));
List<ScheduledRuntime> nextRuns = scheduleClient.nextRuntimes(workflowId);
if (!nextRuns.isEmpty()) {
Date nextRun = new Date(nextRuns.get(0).getTime());
System.out.println(" Next run: " + nextRun);
} else {
System.out.println(" WARNING: No upcoming runs scheduled!");
}
} catch (Exception e) {
System.err.println(scheduleId.getSchedule() + ": ERROR - " + e.getMessage());
}
}
}
private String extractWorkflowName(String scheduleName) {
// Extract workflow name from schedule name (implement based on naming convention)
return scheduleName.replace("-schedule", "");
}// Handle schedule names with special characters
String scheduleNameWithSpaces = "Daily ETL Processing";
String encodedName = ScheduleClient.getEncodedScheduleName(scheduleNameWithSpaces);
System.out.println("Encoded schedule name: " + encodedName);
// Use encoded name for schedule ID
ScheduleId encodedScheduleId = ScheduleId.of(appId, encodedName);Schedule management operations may throw these exceptions:
try {
scheduleClient.add(scheduleId, scheduleDetail);
System.out.println("Schedule created successfully");
} catch (ScheduleAlreadyExistsException e) {
System.err.println("Schedule already exists: " + scheduleId.getSchedule());
} catch (WorkflowNotFoundException e) {
System.err.println("Referenced workflow not found: " + e.getMessage());
} catch (InvalidScheduleException e) {
System.err.println("Invalid schedule configuration: " + e.getMessage());
} catch (IOException e) {
System.err.println("Network error: " + e.getMessage());
}// Good: Comprehensive schedule management with proper error handling
public class ScheduleManager {
private final ScheduleClient scheduleClient;
public ScheduleManager(ScheduleClient scheduleClient) {
this.scheduleClient = scheduleClient;
}
public void createScheduleWithValidation(ScheduleId scheduleId, ScheduleDetail scheduleDetail) {
try {
// Validate schedule configuration
validateScheduleDetail(scheduleDetail);
// Check if schedule already exists
try {
String existingStatus = scheduleClient.getStatus(scheduleId);
System.out.println("Schedule already exists with status: " + existingStatus);
// Update instead of create
scheduleClient.update(scheduleId, scheduleDetail);
System.out.println("Updated existing schedule: " + scheduleId.getSchedule());
return;
} catch (ScheduleNotFoundException e) {
// Schedule doesn't exist, proceed with creation
}
// Create the schedule
scheduleClient.add(scheduleId, scheduleDetail);
System.out.println("Created schedule: " + scheduleId.getSchedule());
// Verify creation
String status = scheduleClient.getStatus(scheduleId);
System.out.println("Schedule status after creation: " + status);
} catch (Exception e) {
System.err.println("Failed to create/update schedule " + scheduleId.getSchedule() + ": " + e.getMessage());
throw new RuntimeException("Schedule operation failed", e);
}
}
private void validateScheduleDetail(ScheduleDetail scheduleDetail) {
if (scheduleDetail.getName() == null || scheduleDetail.getName().trim().isEmpty()) {
throw new IllegalArgumentException("Schedule name cannot be empty");
}
if (scheduleDetail.getProgram() == null) {
throw new IllegalArgumentException("Schedule must reference a program");
}
if (scheduleDetail.getTrigger() == null) {
throw new IllegalArgumentException("Schedule must have a trigger");
}
// Validate timeout
if (scheduleDetail.getTimeoutMillis() <= 0) {
throw new IllegalArgumentException("Schedule timeout must be positive");
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap-client