or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-management.mdartifact-management.mdconfiguration.mddata-operations.mddataset-operations.mdindex.mdmetrics-monitoring.mdprogram-control.mdschedule-management.mdsecurity-administration.mdservice-management.md

schedule-management.mddocs/

0

# Schedule Management

1

2

The ScheduleClient provides comprehensive schedule creation, management, and workflow scheduling operations. Schedules enable time-based and data-driven execution of workflows and other programs in CDAP.

3

4

## ScheduleClient

5

6

```java { .api }

7

public class ScheduleClient {

8

// Constructors

9

public ScheduleClient(ClientConfig config);

10

public ScheduleClient(ClientConfig config, RESTClient restClient);

11

12

// Schedule management methods

13

public void add(ScheduleId scheduleId, ScheduleDetail detail);

14

public void update(ScheduleId scheduleId, ScheduleDetail detail);

15

public List<ScheduleDetail> listSchedules(WorkflowId workflow);

16

public List<ScheduledRuntime> nextRuntimes(WorkflowId workflow);

17

public void suspend(ScheduleId scheduleId);

18

public void resume(ScheduleId scheduleId);

19

public void delete(ScheduleId scheduleId);

20

public String getStatus(ScheduleId scheduleId);

21

public void reEnableSuspendedSchedules(NamespaceId namespaceId, long startTimeMillis, long endTimeMillis);

22

23

// Static utility methods

24

public static String getEncodedScheduleName(String scheduleName);

25

}

26

```

27

28

## Schedule Types and Configuration

29

30

```java { .api }

31

public class ScheduleDetail {

32

public String getName();

33

public String getDescription();

34

public ProgramId getProgram();

35

public Map<String, String> getProperties();

36

public Trigger getTrigger();

37

public List<Constraint> getConstraints();

38

public long getTimeoutMillis();

39

40

public static Builder builder();

41

42

public static class Builder {

43

public Builder setName(String name);

44

public Builder setDescription(String description);

45

public Builder setProgram(ProgramId program);

46

public Builder setProperties(Map<String, String> properties);

47

public Builder setTrigger(Trigger trigger);

48

public Builder setConstraints(List<Constraint> constraints);

49

public Builder setTimeoutMillis(long timeoutMillis);

50

public ScheduleDetail build();

51

}

52

}

53

54

public class ScheduleId {

55

public static ScheduleId of(ApplicationId application, String schedule);

56

public ApplicationId getApplication();

57

public String getSchedule();

58

}

59

60

public class WorkflowId {

61

public static WorkflowId of(ApplicationId application, String workflow);

62

public ApplicationId getApplication();

63

public String getWorkflow();

64

}

65

66

public class ScheduledRuntime {

67

public long getTime();

68

public Map<String, String> getArguments();

69

}

70

```

71

72

## Trigger Types

73

74

```java { .api }

75

// Time-based triggers

76

public class TimeTrigger implements Trigger {

77

public TimeTrigger(String cronExpression);

78

public String getCronExpression();

79

}

80

81

// Data-based triggers

82

public class PartitionTrigger implements Trigger {

83

public PartitionTrigger(DatasetId dataset, int numPartitions);

84

public DatasetId getDataset();

85

public int getNumPartitions();

86

}

87

88

// Program status triggers

89

public class ProgramStatusTrigger implements Trigger {

90

public ProgramStatusTrigger(ProgramId program, ProgramStatus... expectedStatuses);

91

public ProgramId getProgram();

92

public Set<ProgramStatus> getExpectedStatuses();

93

}

94

95

// Composite triggers

96

public class AndTrigger implements Trigger {

97

public AndTrigger(Trigger... triggers);

98

public List<Trigger> getTriggers();

99

}

100

101

public class OrTrigger implements Trigger {

102

public OrTrigger(Trigger... triggers);

103

public List<Trigger> getTriggers();

104

}

105

```

106

107

## Schedule Creation and Management

108

109

### Time-Based Schedules

110

111

```java

112

// Create daily schedule

113

ApplicationId appId = ApplicationId.of(namespace, "data-processing", "1.0.0");

114

WorkflowId workflowId = WorkflowId.of(appId, "daily-etl");

115

ScheduleId scheduleId = ScheduleId.of(appId, "daily-etl-schedule");

116

117

// Cron expression for daily execution at 2 AM

118

TimeTrigger dailyTrigger = new TimeTrigger("0 2 * * *");

119

120

ScheduleDetail dailySchedule = ScheduleDetail.builder()

121

.setName("daily-etl-schedule")

122

.setDescription("Daily ETL processing at 2 AM")

123

.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "daily-etl"))

124

.setTrigger(dailyTrigger)

125

.setProperties(Map.of(

126

"input.path", "/data/daily/",

127

"output.path", "/processed/daily/",

128

"retention.days", "30"

129

))

130

.setTimeoutMillis(TimeUnit.HOURS.toMillis(4)) // 4 hour timeout

131

.build();

132

133

scheduleClient.add(scheduleId, dailySchedule);

134

System.out.println("Created daily schedule: " + scheduleId.getSchedule());

135

136

// Create hourly schedule

137

TimeTrigger hourlyTrigger = new TimeTrigger("0 * * * *"); // Every hour

138

ScheduleId hourlyScheduleId = ScheduleId.of(appId, "hourly-aggregation");

139

140

ScheduleDetail hourlySchedule = ScheduleDetail.builder()

141

.setName("hourly-aggregation")

142

.setDescription("Hourly data aggregation")

143

.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "aggregation-workflow"))

144

.setTrigger(hourlyTrigger)

145

.setTimeoutMillis(TimeUnit.MINUTES.toMillis(30))

146

.build();

147

148

scheduleClient.add(hourlyScheduleId, hourlySchedule);

149

```

150

151

### Advanced Cron Schedules

152

153

```java

154

// Weekly schedule (Sundays at 3 AM)

155

TimeTrigger weeklyTrigger = new TimeTrigger("0 3 * * 0");

156

157

// Monthly schedule (1st day of month at midnight)

158

TimeTrigger monthlyTrigger = new TimeTrigger("0 0 1 * *");

159

160

// Business hours only (9 AM to 5 PM, Monday to Friday)

161

TimeTrigger businessHoursTrigger = new TimeTrigger("0 9-17 * * 1-5");

162

163

// Multiple times per day (6 AM, 12 PM, 6 PM)

164

TimeTrigger multipleTrigger = new TimeTrigger("0 6,12,18 * * *");

165

166

// Custom schedule with complex cron expression

167

ScheduleDetail customSchedule = ScheduleDetail.builder()

168

.setName("business-hours-processing")

169

.setDescription("Process data every 2 hours during business days")

170

.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "business-workflow"))

171

.setTrigger(new TimeTrigger("0 */2 9-17 * * 1-5")) // Every 2 hours, 9-5, Mon-Fri

172

.setProperties(Map.of("environment", "production"))

173

.build();

174

175

ScheduleId customScheduleId = ScheduleId.of(appId, "business-hours-schedule");

176

scheduleClient.add(customScheduleId, customSchedule);

177

```

178

179

### Data-Driven Schedules

180

181

```java

182

// Partition-based trigger

183

DatasetId inputDataset = DatasetId.of(namespace, "raw-events");

184

PartitionTrigger partitionTrigger = new PartitionTrigger(inputDataset, 5); // Wait for 5 partitions

185

186

ScheduleDetail partitionSchedule = ScheduleDetail.builder()

187

.setName("partition-driven-processing")

188

.setDescription("Process data when 5 new partitions are available")

189

.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "batch-processor"))

190

.setTrigger(partitionTrigger)

191

.setProperties(Map.of(

192

"source.dataset", "raw-events",

193

"batch.size", "5"

194

))

195

.build();

196

197

ScheduleId partitionScheduleId = ScheduleId.of(appId, "partition-driven-schedule");

198

scheduleClient.add(partitionScheduleId, partitionSchedule);

199

200

// Program status trigger

201

ProgramId upstreamProgram = ProgramId.of(appId, ProgramType.WORKFLOW, "data-ingestion");

202

ProgramStatusTrigger statusTrigger = new ProgramStatusTrigger(upstreamProgram, ProgramStatus.COMPLETED);

203

204

ScheduleDetail statusSchedule = ScheduleDetail.builder()

205

.setName("downstream-processing")

206

.setDescription("Start when upstream data ingestion completes")

207

.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "data-transformation"))

208

.setTrigger(statusTrigger)

209

.build();

210

211

ScheduleId statusScheduleId = ScheduleId.of(appId, "downstream-schedule");

212

scheduleClient.add(statusScheduleId, statusSchedule);

213

```

214

215

### Composite Triggers

216

217

```java

218

// AND trigger - both conditions must be met

219

TimeTrigger nightlyTrigger = new TimeTrigger("0 1 * * *"); // 1 AM daily

220

PartitionTrigger dataTrigger = new PartitionTrigger(inputDataset, 1); // At least 1 partition

221

222

AndTrigger compositeTrigger = new AndTrigger(nightlyTrigger, dataTrigger);

223

224

ScheduleDetail compositeSchedule = ScheduleDetail.builder()

225

.setName("nightly-data-processing")

226

.setDescription("Process daily at 1 AM if data is available")

227

.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "nightly-batch"))

228

.setTrigger(compositeTrigger)

229

.build();

230

231

ScheduleId compositeScheduleId = ScheduleId.of(appId, "composite-schedule");

232

scheduleClient.add(compositeScheduleId, compositeSchedule);

233

234

// OR trigger - either condition can trigger execution

235

ProgramId program1 = ProgramId.of(appId, ProgramType.WORKFLOW, "source-a");

236

ProgramId program2 = ProgramId.of(appId, ProgramType.WORKFLOW, "source-b");

237

238

ProgramStatusTrigger trigger1 = new ProgramStatusTrigger(program1, ProgramStatus.COMPLETED);

239

ProgramStatusTrigger trigger2 = new ProgramStatusTrigger(program2, ProgramStatus.COMPLETED);

240

241

OrTrigger orTrigger = new OrTrigger(trigger1, trigger2);

242

243

ScheduleDetail orSchedule = ScheduleDetail.builder()

244

.setName("multi-source-processing")

245

.setDescription("Process when either source A or B completes")

246

.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "merge-processor"))

247

.setTrigger(orTrigger)

248

.build();

249

```

250

251

## Schedule Control Operations

252

253

### Schedule Lifecycle

254

255

```java

256

// Suspend schedule (pause execution)

257

scheduleClient.suspend(scheduleId);

258

System.out.println("Schedule suspended: " + scheduleId.getSchedule());

259

260

// Resume schedule

261

scheduleClient.resume(scheduleId);

262

System.out.println("Schedule resumed: " + scheduleId.getSchedule());

263

264

// Check schedule status

265

String status = scheduleClient.getStatus(scheduleId);

266

System.out.println("Schedule status: " + status);

267

268

// Delete schedule

269

scheduleClient.delete(scheduleId);

270

System.out.println("Schedule deleted: " + scheduleId.getSchedule());

271

```

272

273

### Schedule Updates

274

275

```java

276

// Update existing schedule

277

ScheduleDetail updatedSchedule = ScheduleDetail.builder()

278

.setName("daily-etl-schedule")

279

.setDescription("Updated: Daily ETL processing at 3 AM") // Changed time

280

.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "daily-etl"))

281

.setTrigger(new TimeTrigger("0 3 * * *")) // Changed from 2 AM to 3 AM

282

.setProperties(Map.of(

283

"input.path", "/data/daily/",

284

"output.path", "/processed/daily/",

285

"retention.days", "45", // Extended retention

286

"compression", "true" // Added compression

287

))

288

.setTimeoutMillis(TimeUnit.HOURS.toMillis(6)) // Extended timeout

289

.build();

290

291

scheduleClient.update(scheduleId, updatedSchedule);

292

System.out.println("Schedule updated: " + scheduleId.getSchedule());

293

```

294

295

## Schedule Information and Monitoring

296

297

### Schedule Listing

298

299

```java

300

// List all schedules for a workflow

301

List<ScheduleDetail> schedules = scheduleClient.listSchedules(workflowId);

302

System.out.println("Schedules for workflow " + workflowId.getWorkflow() + ":");

303

304

for (ScheduleDetail schedule : schedules) {

305

System.out.println("- " + schedule.getName());

306

System.out.println(" Description: " + schedule.getDescription());

307

System.out.println(" Trigger: " + schedule.getTrigger().getClass().getSimpleName());

308

System.out.println(" Properties: " + schedule.getProperties());

309

System.out.println(" Timeout: " + schedule.getTimeoutMillis() + " ms");

310

311

if (!schedule.getConstraints().isEmpty()) {

312

System.out.println(" Constraints: " + schedule.getConstraints().size());

313

}

314

}

315

```

316

317

### Next Runtime Prediction

318

319

```java

320

// Get next scheduled runtimes

321

List<ScheduledRuntime> nextRuntimes = scheduleClient.nextRuntimes(workflowId);

322

System.out.println("Next scheduled runs for " + workflowId.getWorkflow() + ":");

323

324

for (ScheduledRuntime runtime : nextRuntimes) {

325

Date nextRun = new Date(runtime.getTime());

326

System.out.println("- " + nextRun + " with arguments: " + runtime.getArguments());

327

}

328

329

// Display next runs in a readable format

330

SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

331

for (int i = 0; i < Math.min(5, nextRuntimes.size()); i++) {

332

ScheduledRuntime runtime = nextRuntimes.get(i);

333

System.out.println((i + 1) + ". " + formatter.format(new Date(runtime.getTime())));

334

}

335

```

336

337

## Advanced Schedule Management

338

339

### Bulk Schedule Operations

340

341

```java

342

// Create multiple related schedules

343

public void createDataPipelineSchedules(ApplicationId appId) {

344

List<ScheduleCreationRequest> schedules = List.of(

345

new ScheduleCreationRequest("data-ingestion", "0 1 * * *", "Nightly data ingestion"),

346

new ScheduleCreationRequest("data-validation", "0 2 * * *", "Data quality validation"),

347

new ScheduleCreationRequest("data-transformation", "0 3 * * *", "Data transformation"),

348

new ScheduleCreationRequest("data-export", "0 5 * * *", "Export processed data")

349

);

350

351

for (ScheduleCreationRequest request : schedules) {

352

try {

353

ScheduleId scheduleId = ScheduleId.of(appId, request.name);

354

WorkflowId workflowId = WorkflowId.of(appId, request.name);

355

356

ScheduleDetail schedule = ScheduleDetail.builder()

357

.setName(request.name)

358

.setDescription(request.description)

359

.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, request.name))

360

.setTrigger(new TimeTrigger(request.cronExpression))

361

.setTimeoutMillis(TimeUnit.HOURS.toMillis(2))

362

.build();

363

364

scheduleClient.add(scheduleId, schedule);

365

System.out.println("Created schedule: " + request.name);

366

367

} catch (Exception e) {

368

System.err.println("Failed to create schedule " + request.name + ": " + e.getMessage());

369

}

370

}

371

}

372

373

private static class ScheduleCreationRequest {

374

String name, cronExpression, description;

375

376

ScheduleCreationRequest(String name, String cronExpression, String description) {

377

this.name = name;

378

this.cronExpression = cronExpression;

379

this.description = description;

380

}

381

}

382

```

383

384

### Schedule Constraints and Policies

385

386

```java

387

// Schedule with constraints

388

List<Constraint> constraints = List.of(

389

new ConcurrencyConstraint(1), // Only one instance can run at a time

390

new DelayConstraint(TimeUnit.MINUTES.toMillis(5)), // 5 minute delay between runs

391

new LastRunConstraint(TimeUnit.HOURS.toMillis(23)) // Don't run if last run was within 23 hours

392

);

393

394

ScheduleDetail constrainedSchedule = ScheduleDetail.builder()

395

.setName("constrained-processing")

396

.setDescription("Processing with execution constraints")

397

.setProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "heavy-processing"))

398

.setTrigger(new TimeTrigger("0 */6 * * *")) // Every 6 hours

399

.setConstraints(constraints)

400

.setTimeoutMillis(TimeUnit.HOURS.toMillis(5))

401

.build();

402

403

ScheduleId constrainedScheduleId = ScheduleId.of(appId, "constrained-schedule");

404

scheduleClient.add(constrainedScheduleId, constrainedSchedule);

405

```

406

407

### Schedule Recovery and Maintenance

408

409

```java

410

// Re-enable suspended schedules after maintenance window

411

public void performScheduleMaintenance(NamespaceId namespace) {

412

long maintenanceStart = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2);

413

long maintenanceEnd = System.currentTimeMillis();

414

415

try {

416

// Re-enable schedules that were suspended during maintenance

417

scheduleClient.reEnableSuspendedSchedules(namespace, maintenanceStart, maintenanceEnd);

418

System.out.println("Re-enabled schedules suspended during maintenance window");

419

420

} catch (Exception e) {

421

System.err.println("Error during schedule maintenance: " + e.getMessage());

422

}

423

}

424

425

// Schedule health check

426

public void checkScheduleHealth(List<ScheduleId> criticalSchedules) {

427

System.out.println("=== Schedule Health Check ===");

428

429

for (ScheduleId scheduleId : criticalSchedules) {

430

try {

431

String status = scheduleClient.getStatus(scheduleId);

432

System.out.println(scheduleId.getSchedule() + ": " + status);

433

434

if ("SUSPENDED".equals(status)) {

435

System.out.println(" WARNING: Critical schedule is suspended!");

436

}

437

438

// Get next runtime information

439

WorkflowId workflowId = WorkflowId.of(scheduleId.getApplication(),

440

extractWorkflowName(scheduleId.getSchedule()));

441

List<ScheduledRuntime> nextRuns = scheduleClient.nextRuntimes(workflowId);

442

443

if (!nextRuns.isEmpty()) {

444

Date nextRun = new Date(nextRuns.get(0).getTime());

445

System.out.println(" Next run: " + nextRun);

446

} else {

447

System.out.println(" WARNING: No upcoming runs scheduled!");

448

}

449

450

} catch (Exception e) {

451

System.err.println(scheduleId.getSchedule() + ": ERROR - " + e.getMessage());

452

}

453

}

454

}

455

456

private String extractWorkflowName(String scheduleName) {

457

// Extract workflow name from schedule name (implement based on naming convention)

458

return scheduleName.replace("-schedule", "");

459

}

460

```

461

462

### Schedule URL Encoding

463

464

```java

465

// Handle schedule names with special characters

466

String scheduleNameWithSpaces = "Daily ETL Processing";

467

String encodedName = ScheduleClient.getEncodedScheduleName(scheduleNameWithSpaces);

468

System.out.println("Encoded schedule name: " + encodedName);

469

470

// Use encoded name for schedule ID

471

ScheduleId encodedScheduleId = ScheduleId.of(appId, encodedName);

472

```

473

474

## Error Handling

475

476

Schedule management operations may throw these exceptions:

477

478

- **ScheduleNotFoundException**: Schedule does not exist

479

- **ScheduleAlreadyExistsException**: Schedule already exists during creation

480

- **InvalidScheduleException**: Invalid schedule configuration

481

- **WorkflowNotFoundException**: Referenced workflow does not exist

482

- **BadRequestException**: Invalid schedule parameters

483

- **UnauthenticatedException**: Authentication required

484

- **UnauthorizedException**: Insufficient permissions

485

486

```java

487

try {

488

scheduleClient.add(scheduleId, scheduleDetail);

489

System.out.println("Schedule created successfully");

490

} catch (ScheduleAlreadyExistsException e) {

491

System.err.println("Schedule already exists: " + scheduleId.getSchedule());

492

} catch (WorkflowNotFoundException e) {

493

System.err.println("Referenced workflow not found: " + e.getMessage());

494

} catch (InvalidScheduleException e) {

495

System.err.println("Invalid schedule configuration: " + e.getMessage());

496

} catch (IOException e) {

497

System.err.println("Network error: " + e.getMessage());

498

}

499

```

500

501

## Best Practices

502

503

1. **Naming Conventions**: Use clear, descriptive names for schedules

504

2. **Resource Management**: Set appropriate timeouts for long-running workflows

505

3. **Dependency Management**: Use program status triggers for workflow dependencies

506

4. **Error Handling**: Implement proper error handling and retry logic

507

5. **Monitoring**: Regularly monitor schedule status and execution history

508

6. **Maintenance**: Plan for schedule suspension during maintenance windows

509

510

```java

511

// Good: Comprehensive schedule management with proper error handling

512

public class ScheduleManager {

513

private final ScheduleClient scheduleClient;

514

515

public ScheduleManager(ScheduleClient scheduleClient) {

516

this.scheduleClient = scheduleClient;

517

}

518

519

public void createScheduleWithValidation(ScheduleId scheduleId, ScheduleDetail scheduleDetail) {

520

try {

521

// Validate schedule configuration

522

validateScheduleDetail(scheduleDetail);

523

524

// Check if schedule already exists

525

try {

526

String existingStatus = scheduleClient.getStatus(scheduleId);

527

System.out.println("Schedule already exists with status: " + existingStatus);

528

529

// Update instead of create

530

scheduleClient.update(scheduleId, scheduleDetail);

531

System.out.println("Updated existing schedule: " + scheduleId.getSchedule());

532

return;

533

534

} catch (ScheduleNotFoundException e) {

535

// Schedule doesn't exist, proceed with creation

536

}

537

538

// Create the schedule

539

scheduleClient.add(scheduleId, scheduleDetail);

540

System.out.println("Created schedule: " + scheduleId.getSchedule());

541

542

// Verify creation

543

String status = scheduleClient.getStatus(scheduleId);

544

System.out.println("Schedule status after creation: " + status);

545

546

} catch (Exception e) {

547

System.err.println("Failed to create/update schedule " + scheduleId.getSchedule() + ": " + e.getMessage());

548

throw new RuntimeException("Schedule operation failed", e);

549

}

550

}

551

552

private void validateScheduleDetail(ScheduleDetail scheduleDetail) {

553

if (scheduleDetail.getName() == null || scheduleDetail.getName().trim().isEmpty()) {

554

throw new IllegalArgumentException("Schedule name cannot be empty");

555

}

556

557

if (scheduleDetail.getProgram() == null) {

558

throw new IllegalArgumentException("Schedule must reference a program");

559

}

560

561

if (scheduleDetail.getTrigger() == null) {

562

throw new IllegalArgumentException("Schedule must have a trigger");

563

}

564

565

// Validate timeout

566

if (scheduleDetail.getTimeoutMillis() <= 0) {

567

throw new IllegalArgumentException("Schedule timeout must be positive");

568

}

569

}

570

}

571

```