0
# Schedule Management
1
2
The ScheduleClient provides comprehensive schedule creation, management, and workflow scheduling operations. Schedules enable time-based and data-driven execution of workflows and other programs in CDAP.
3
4
## ScheduleClient
5
6
```java { .api }
7
public class ScheduleClient {
8
// Constructors
9
public ScheduleClient(ClientConfig config);
10
public ScheduleClient(ClientConfig config, RESTClient restClient);
11
12
// Schedule management methods
13
public void add(ScheduleId scheduleId, ScheduleDetail detail);
14
public void update(ScheduleId scheduleId, ScheduleDetail detail);
15
public List<ScheduleDetail> listSchedules(WorkflowId workflow);
16
public List<ScheduledRuntime> nextRuntimes(WorkflowId workflow);
17
public void suspend(ScheduleId scheduleId);
18
public void resume(ScheduleId scheduleId);
19
public void delete(ScheduleId scheduleId);
20
public String getStatus(ScheduleId scheduleId);
21
public void reEnableSuspendedSchedules(NamespaceId namespaceId, long startTimeMillis, long endTimeMillis);
22
23
// Static utility methods
24
public static String getEncodedScheduleName(String scheduleName);
25
}
26
```
27
28
## Schedule Types and Configuration
29
30
```java { .api }
31
public class ScheduleDetail {
32
public String getName();
33
public String getDescription();
34
public ProgramId getProgram();
35
public Map<String, String> getProperties();
36
public Trigger getTrigger();
37
public List<Constraint> getConstraints();
38
public long getTimeoutMillis();
39
40
public static Builder builder();
41
42
public static class Builder {
43
public Builder setName(String name);
44
public Builder setDescription(String description);
45
public Builder setProgram(ProgramId program);
46
public Builder setProperties(Map<String, String> properties);
47
public Builder setTrigger(Trigger trigger);
48
public Builder setConstraints(List<Constraint> constraints);
49
public Builder setTimeoutMillis(long timeoutMillis);
50
public ScheduleDetail build();
51
}
52
}
53
54
public class ScheduleId {
55
public static ScheduleId of(ApplicationId application, String schedule);
56
public ApplicationId getApplication();
57
public String getSchedule();
58
}
59
60
public class WorkflowId {
61
public static WorkflowId of(ApplicationId application, String workflow);
62
public ApplicationId getApplication();
63
public String getWorkflow();
64
}
65
66
public class ScheduledRuntime {
67
public long getTime();
68
public Map<String, String> getArguments();
69
}
70
```
71
72
## Trigger Types
73
74
```java { .api }
75
// Time-based triggers
76
public class TimeTrigger implements Trigger {
77
public TimeTrigger(String cronExpression);
78
public String getCronExpression();
79
}
80
81
// Data-based triggers
82
public class PartitionTrigger implements Trigger {
83
public PartitionTrigger(DatasetId dataset, int numPartitions);
84
public DatasetId getDataset();
85
public int getNumPartitions();
86
}
87
88
// Program status triggers
89
public class ProgramStatusTrigger implements Trigger {
90
public ProgramStatusTrigger(ProgramId program, ProgramStatus... expectedStatuses);
91
public ProgramId getProgram();
92
public Set<ProgramStatus> getExpectedStatuses();
93
}
94
95
// Composite triggers
96
public class AndTrigger implements Trigger {
97
public AndTrigger(Trigger... triggers);
98
public List<Trigger> getTriggers();
99
}
100
101
public class OrTrigger implements Trigger {
102
public OrTrigger(Trigger... triggers);
103
public List<Trigger> getTriggers();
104
}
105
```
106
107
## Schedule Creation and Management
108
109
### Time-Based Schedules
110
111
```java
112
// Create daily schedule
113
ApplicationId appId = ApplicationId.of(namespace, "data-processing", "1.0.0");
114
WorkflowId workflowId = WorkflowId.of(appId, "daily-etl");
115
ScheduleId scheduleId = ScheduleId.of(appId, "daily-etl-schedule");
116
117
// Cron expression for daily execution at 2 AM
118
TimeTrigger dailyTrigger = new TimeTrigger("0 2 * * *");
119
120
ScheduleDetail dailySchedule = ScheduleDetail.builder()
121
.setName("daily-etl-schedule")
122
.setDescription("Daily ETL processing at 2 AM")
123
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "daily-etl"))
124
.setTrigger(dailyTrigger)
125
.setProperties(Map.of(
126
"input.path", "/data/daily/",
127
"output.path", "/processed/daily/",
128
"retention.days", "30"
129
))
130
.setTimeoutMillis(TimeUnit.HOURS.toMillis(4)) // 4 hour timeout
131
.build();
132
133
scheduleClient.add(scheduleId, dailySchedule);
134
System.out.println("Created daily schedule: " + scheduleId.getSchedule());
135
136
// Create hourly schedule
137
TimeTrigger hourlyTrigger = new TimeTrigger("0 * * * *"); // Every hour
138
ScheduleId hourlyScheduleId = ScheduleId.of(appId, "hourly-aggregation");
139
140
ScheduleDetail hourlySchedule = ScheduleDetail.builder()
141
.setName("hourly-aggregation")
142
.setDescription("Hourly data aggregation")
143
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "aggregation-workflow"))
144
.setTrigger(hourlyTrigger)
145
.setTimeoutMillis(TimeUnit.MINUTES.toMillis(30))
146
.build();
147
148
scheduleClient.add(hourlyScheduleId, hourlySchedule);
149
```
150
151
### Advanced Cron Schedules
152
153
```java
154
// Weekly schedule (Sundays at 3 AM)
155
TimeTrigger weeklyTrigger = new TimeTrigger("0 3 * * 0");
156
157
// Monthly schedule (1st day of month at midnight)
158
TimeTrigger monthlyTrigger = new TimeTrigger("0 0 1 * *");
159
160
// Business hours only (9 AM to 5 PM, Monday to Friday)
161
TimeTrigger businessHoursTrigger = new TimeTrigger("0 9-17 * * 1-5");
162
163
// Multiple times per day (6 AM, 12 PM, 6 PM)
164
TimeTrigger multipleTrigger = new TimeTrigger("0 6,12,18 * * *");
165
166
// Custom schedule with complex cron expression
167
ScheduleDetail customSchedule = ScheduleDetail.builder()
168
.setName("business-hours-processing")
169
.setDescription("Process data every 2 hours during business days")
170
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "business-workflow"))
171
.setTrigger(new TimeTrigger("0 */2 9-17 * * 1-5")) // Every 2 hours, 9-5, Mon-Fri
172
.setProperties(Map.of("environment", "production"))
173
.build();
174
175
ScheduleId customScheduleId = ScheduleId.of(appId, "business-hours-schedule");
176
scheduleClient.add(customScheduleId, customSchedule);
177
```
178
179
### Data-Driven Schedules
180
181
```java
182
// Partition-based trigger
183
DatasetId inputDataset = DatasetId.of(namespace, "raw-events");
184
PartitionTrigger partitionTrigger = new PartitionTrigger(inputDataset, 5); // Wait for 5 partitions
185
186
ScheduleDetail partitionSchedule = ScheduleDetail.builder()
187
.setName("partition-driven-processing")
188
.setDescription("Process data when 5 new partitions are available")
189
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "batch-processor"))
190
.setTrigger(partitionTrigger)
191
.setProperties(Map.of(
192
"source.dataset", "raw-events",
193
"batch.size", "5"
194
))
195
.build();
196
197
ScheduleId partitionScheduleId = ScheduleId.of(appId, "partition-driven-schedule");
198
scheduleClient.add(partitionScheduleId, partitionSchedule);
199
200
// Program status trigger
201
ProgramId upstreamProgram = ProgramId.of(appId, ProgramType.WORKFLOW, "data-ingestion");
202
ProgramStatusTrigger statusTrigger = new ProgramStatusTrigger(upstreamProgram, ProgramStatus.COMPLETED);
203
204
ScheduleDetail statusSchedule = ScheduleDetail.builder()
205
.setName("downstream-processing")
206
.setDescription("Start when upstream data ingestion completes")
207
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "data-transformation"))
208
.setTrigger(statusTrigger)
209
.build();
210
211
ScheduleId statusScheduleId = ScheduleId.of(appId, "downstream-schedule");
212
scheduleClient.add(statusScheduleId, statusSchedule);
213
```
214
215
### Composite Triggers
216
217
```java
218
// AND trigger - both conditions must be met
219
TimeTrigger nightlyTrigger = new TimeTrigger("0 1 * * *"); // 1 AM daily
220
PartitionTrigger dataTrigger = new PartitionTrigger(inputDataset, 1); // At least 1 partition
221
222
AndTrigger compositeTrigger = new AndTrigger(nightlyTrigger, dataTrigger);
223
224
ScheduleDetail compositeSchedule = ScheduleDetail.builder()
225
.setName("nightly-data-processing")
226
.setDescription("Process daily at 1 AM if data is available")
227
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "nightly-batch"))
228
.setTrigger(compositeTrigger)
229
.build();
230
231
ScheduleId compositeScheduleId = ScheduleId.of(appId, "composite-schedule");
232
scheduleClient.add(compositeScheduleId, compositeSchedule);
233
234
// OR trigger - either condition can trigger execution
235
ProgramId program1 = ProgramId.of(appId, ProgramType.WORKFLOW, "source-a");
236
ProgramId program2 = ProgramId.of(appId, ProgramType.WORKFLOW, "source-b");
237
238
ProgramStatusTrigger trigger1 = new ProgramStatusTrigger(program1, ProgramStatus.COMPLETED);
239
ProgramStatusTrigger trigger2 = new ProgramStatusTrigger(program2, ProgramStatus.COMPLETED);
240
241
OrTrigger orTrigger = new OrTrigger(trigger1, trigger2);
242
243
ScheduleDetail orSchedule = ScheduleDetail.builder()
244
.setName("multi-source-processing")
245
.setDescription("Process when either source A or B completes")
246
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "merge-processor"))
247
.setTrigger(orTrigger)
248
.build();
249
```
250
251
## Schedule Control Operations
252
253
### Schedule Lifecycle
254
255
```java
256
// Suspend schedule (pause execution)
257
scheduleClient.suspend(scheduleId);
258
System.out.println("Schedule suspended: " + scheduleId.getSchedule());
259
260
// Resume schedule
261
scheduleClient.resume(scheduleId);
262
System.out.println("Schedule resumed: " + scheduleId.getSchedule());
263
264
// Check schedule status
265
String status = scheduleClient.getStatus(scheduleId);
266
System.out.println("Schedule status: " + status);
267
268
// Delete schedule
269
scheduleClient.delete(scheduleId);
270
System.out.println("Schedule deleted: " + scheduleId.getSchedule());
271
```
272
273
### Schedule Updates
274
275
```java
276
// Update existing schedule
277
ScheduleDetail updatedSchedule = ScheduleDetail.builder()
278
.setName("daily-etl-schedule")
279
.setDescription("Updated: Daily ETL processing at 3 AM") // Changed time
280
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "daily-etl"))
281
.setTrigger(new TimeTrigger("0 3 * * *")) // Changed from 2 AM to 3 AM
282
.setProperties(Map.of(
283
"input.path", "/data/daily/",
284
"output.path", "/processed/daily/",
285
"retention.days", "45", // Extended retention
286
"compression", "true" // Added compression
287
))
288
.setTimeoutMillis(TimeUnit.HOURS.toMillis(6)) // Extended timeout
289
.build();
290
291
scheduleClient.update(scheduleId, updatedSchedule);
292
System.out.println("Schedule updated: " + scheduleId.getSchedule());
293
```
294
295
## Schedule Information and Monitoring
296
297
### Schedule Listing
298
299
```java
300
// List all schedules for a workflow
301
List<ScheduleDetail> schedules = scheduleClient.listSchedules(workflowId);
302
System.out.println("Schedules for workflow " + workflowId.getWorkflow() + ":");
303
304
for (ScheduleDetail schedule : schedules) {
305
System.out.println("- " + schedule.getName());
306
System.out.println(" Description: " + schedule.getDescription());
307
System.out.println(" Trigger: " + schedule.getTrigger().getClass().getSimpleName());
308
System.out.println(" Properties: " + schedule.getProperties());
309
System.out.println(" Timeout: " + schedule.getTimeoutMillis() + " ms");
310
311
if (!schedule.getConstraints().isEmpty()) {
312
System.out.println(" Constraints: " + schedule.getConstraints().size());
313
}
314
}
315
```
316
317
### Next Runtime Prediction
318
319
```java
320
// Get next scheduled runtimes
321
List<ScheduledRuntime> nextRuntimes = scheduleClient.nextRuntimes(workflowId);
322
System.out.println("Next scheduled runs for " + workflowId.getWorkflow() + ":");
323
324
for (ScheduledRuntime runtime : nextRuntimes) {
325
Date nextRun = new Date(runtime.getTime());
326
System.out.println("- " + nextRun + " with arguments: " + runtime.getArguments());
327
}
328
329
// Display next runs in a readable format
330
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
331
for (int i = 0; i < Math.min(5, nextRuntimes.size()); i++) {
332
ScheduledRuntime runtime = nextRuntimes.get(i);
333
System.out.println((i + 1) + ". " + formatter.format(new Date(runtime.getTime())));
334
}
335
```
336
337
## Advanced Schedule Management
338
339
### Bulk Schedule Operations
340
341
```java
342
// Create multiple related schedules
343
public void createDataPipelineSchedules(ApplicationId appId) {
344
List<ScheduleCreationRequest> schedules = List.of(
345
new ScheduleCreationRequest("data-ingestion", "0 1 * * *", "Nightly data ingestion"),
346
new ScheduleCreationRequest("data-validation", "0 2 * * *", "Data quality validation"),
347
new ScheduleCreationRequest("data-transformation", "0 3 * * *", "Data transformation"),
348
new ScheduleCreationRequest("data-export", "0 5 * * *", "Export processed data")
349
);
350
351
for (ScheduleCreationRequest request : schedules) {
352
try {
353
ScheduleId scheduleId = ScheduleId.of(appId, request.name);
354
WorkflowId workflowId = WorkflowId.of(appId, request.name);
355
356
ScheduleDetail schedule = ScheduleDetail.builder()
357
.setName(request.name)
358
.setDescription(request.description)
359
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, request.name))
360
.setTrigger(new TimeTrigger(request.cronExpression))
361
.setTimeoutMillis(TimeUnit.HOURS.toMillis(2))
362
.build();
363
364
scheduleClient.add(scheduleId, schedule);
365
System.out.println("Created schedule: " + request.name);
366
367
} catch (Exception e) {
368
System.err.println("Failed to create schedule " + request.name + ": " + e.getMessage());
369
}
370
}
371
}
372
373
private static class ScheduleCreationRequest {
374
String name, cronExpression, description;
375
376
ScheduleCreationRequest(String name, String cronExpression, String description) {
377
this.name = name;
378
this.cronExpression = cronExpression;
379
this.description = description;
380
}
381
}
382
```
383
384
### Schedule Constraints and Policies
385
386
```java
387
// Schedule with constraints
388
List<Constraint> constraints = List.of(
389
new ConcurrencyConstraint(1), // Only one instance can run at a time
390
new DelayConstraint(TimeUnit.MINUTES.toMillis(5)), // 5 minute delay between runs
391
new LastRunConstraint(TimeUnit.HOURS.toMillis(23)) // Don't run if last run was within 23 hours
392
);
393
394
ScheduleDetail constrainedSchedule = ScheduleDetail.builder()
395
.setName("constrained-processing")
396
.setDescription("Processing with execution constraints")
397
.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "heavy-processing"))
398
.setTrigger(new TimeTrigger("0 */6 * * *")) // Every 6 hours
399
.setConstraints(constraints)
400
.setTimeoutMillis(TimeUnit.HOURS.toMillis(5))
401
.build();
402
403
ScheduleId constrainedScheduleId = ScheduleId.of(appId, "constrained-schedule");
404
scheduleClient.add(constrainedScheduleId, constrainedSchedule);
405
```
406
407
### Schedule Recovery and Maintenance
408
409
```java
410
// Re-enable suspended schedules after maintenance window
411
public void performScheduleMaintenance(NamespaceId namespace) {
412
long maintenanceStart = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2);
413
long maintenanceEnd = System.currentTimeMillis();
414
415
try {
416
// Re-enable schedules that were suspended during maintenance
417
scheduleClient.reEnableSuspendedSchedules(namespace, maintenanceStart, maintenanceEnd);
418
System.out.println("Re-enabled schedules suspended during maintenance window");
419
420
} catch (Exception e) {
421
System.err.println("Error during schedule maintenance: " + e.getMessage());
422
}
423
}
424
425
// Schedule health check
426
public void checkScheduleHealth(List<ScheduleId> criticalSchedules) {
427
System.out.println("=== Schedule Health Check ===");
428
429
for (ScheduleId scheduleId : criticalSchedules) {
430
try {
431
String status = scheduleClient.getStatus(scheduleId);
432
System.out.println(scheduleId.getSchedule() + ": " + status);
433
434
if ("SUSPENDED".equals(status)) {
435
System.out.println(" WARNING: Critical schedule is suspended!");
436
}
437
438
// Get next runtime information
439
WorkflowId workflowId = WorkflowId.of(scheduleId.getApplication(),
440
extractWorkflowName(scheduleId.getSchedule()));
441
List<ScheduledRuntime> nextRuns = scheduleClient.nextRuntimes(workflowId);
442
443
if (!nextRuns.isEmpty()) {
444
Date nextRun = new Date(nextRuns.get(0).getTime());
445
System.out.println(" Next run: " + nextRun);
446
} else {
447
System.out.println(" WARNING: No upcoming runs scheduled!");
448
}
449
450
} catch (Exception e) {
451
System.err.println(scheduleId.getSchedule() + ": ERROR - " + e.getMessage());
452
}
453
}
454
}
455
456
private String extractWorkflowName(String scheduleName) {
457
// Extract workflow name from schedule name (implement based on naming convention)
458
return scheduleName.replace("-schedule", "");
459
}
460
```
461
462
### Schedule URL Encoding
463
464
```java
465
// Handle schedule names with special characters
466
String scheduleNameWithSpaces = "Daily ETL Processing";
467
String encodedName = ScheduleClient.getEncodedScheduleName(scheduleNameWithSpaces);
468
System.out.println("Encoded schedule name: " + encodedName);
469
470
// Use encoded name for schedule ID
471
ScheduleId encodedScheduleId = ScheduleId.of(appId, encodedName);
472
```
473
474
## Error Handling
475
476
Schedule management operations may throw these exceptions:
477
478
- **ScheduleNotFoundException**: Schedule does not exist
479
- **ScheduleAlreadyExistsException**: Schedule already exists during creation
480
- **InvalidScheduleException**: Invalid schedule configuration
481
- **WorkflowNotFoundException**: Referenced workflow does not exist
482
- **BadRequestException**: Invalid schedule parameters
483
- **UnauthenticatedException**: Authentication required
484
- **UnauthorizedException**: Insufficient permissions
485
486
```java
487
try {
488
scheduleClient.add(scheduleId, scheduleDetail);
489
System.out.println("Schedule created successfully");
490
} catch (ScheduleAlreadyExistsException e) {
491
System.err.println("Schedule already exists: " + scheduleId.getSchedule());
492
} catch (WorkflowNotFoundException e) {
493
System.err.println("Referenced workflow not found: " + e.getMessage());
494
} catch (InvalidScheduleException e) {
495
System.err.println("Invalid schedule configuration: " + e.getMessage());
496
} catch (IOException e) {
497
System.err.println("Network error: " + e.getMessage());
498
}
499
```
500
501
## Best Practices
502
503
1. **Naming Conventions**: Use clear, descriptive names for schedules
504
2. **Resource Management**: Set appropriate timeouts for long-running workflows
505
3. **Dependency Management**: Use program status triggers for workflow dependencies
506
4. **Error Handling**: Implement proper error handling and retry logic
507
5. **Monitoring**: Regularly monitor schedule status and execution history
508
6. **Maintenance**: Plan for schedule suspension during maintenance windows
509
510
```java
511
// Good: Comprehensive schedule management with proper error handling
512
public class ScheduleManager {
513
private final ScheduleClient scheduleClient;
514
515
public ScheduleManager(ScheduleClient scheduleClient) {
516
this.scheduleClient = scheduleClient;
517
}
518
519
public void createScheduleWithValidation(ScheduleId scheduleId, ScheduleDetail scheduleDetail) {
520
try {
521
// Validate schedule configuration
522
validateScheduleDetail(scheduleDetail);
523
524
// Check if schedule already exists
525
try {
526
String existingStatus = scheduleClient.getStatus(scheduleId);
527
System.out.println("Schedule already exists with status: " + existingStatus);
528
529
// Update instead of create
530
scheduleClient.update(scheduleId, scheduleDetail);
531
System.out.println("Updated existing schedule: " + scheduleId.getSchedule());
532
return;
533
534
} catch (ScheduleNotFoundException e) {
535
// Schedule doesn't exist, proceed with creation
536
}
537
538
// Create the schedule
539
scheduleClient.add(scheduleId, scheduleDetail);
540
System.out.println("Created schedule: " + scheduleId.getSchedule());
541
542
// Verify creation
543
String status = scheduleClient.getStatus(scheduleId);
544
System.out.println("Schedule status after creation: " + status);
545
546
} catch (Exception e) {
547
System.err.println("Failed to create/update schedule " + scheduleId.getSchedule() + ": " + e.getMessage());
548
throw new RuntimeException("Schedule operation failed", e);
549
}
550
}
551
552
private void validateScheduleDetail(ScheduleDetail scheduleDetail) {
553
if (scheduleDetail.getName() == null || scheduleDetail.getName().trim().isEmpty()) {
554
throw new IllegalArgumentException("Schedule name cannot be empty");
555
}
556
557
if (scheduleDetail.getProgram() == null) {
558
throw new IllegalArgumentException("Schedule must reference a program");
559
}
560
561
if (scheduleDetail.getTrigger() == null) {
562
throw new IllegalArgumentException("Schedule must have a trigger");
563
}
564
565
// Validate timeout
566
if (scheduleDetail.getTimeoutMillis() <= 0) {
567
throw new IllegalArgumentException("Schedule timeout must be positive");
568
}
569
}
570
}
571
```