0
# Program Control and Monitoring
1
2
The ProgramClient provides comprehensive program lifecycle control including start, stop, restart operations, status monitoring, instance management, and run history tracking. Programs are the executable components within CDAP applications.
3
4
## ProgramClient
5
6
```java { .api }
7
public class ProgramClient {
8
// Constructors
9
public ProgramClient(ClientConfig config);
10
public ProgramClient(ClientConfig config, RESTClient restClient);
11
public ProgramClient(ClientConfig config, RESTClient restClient, ApplicationClient applicationClient);
12
13
// Program control methods
14
public void start(ProgramId program);
15
public void start(ProgramId program, boolean debug);
16
public void start(ProgramId program, boolean debug, Map<String, String> runtimeArgs);
17
public List<BatchProgramResult> start(NamespaceId namespace, List<BatchProgramStart> programs);
18
public void restart(ApplicationId applicationId, long startTimeSeconds, long endTimeSeconds);
19
public void stop(ProgramId programId);
20
public List<BatchProgramResult> stop(NamespaceId namespace, List<BatchProgram> programs);
21
public void stopAll(NamespaceId namespace);
22
23
// Program status methods
24
public String getStatus(ProgramId programId);
25
public List<BatchProgramStatus> getStatus(NamespaceId namespace, List<BatchProgram> programs);
26
public void waitForStatus(ProgramId program, ProgramStatus status, long timeout, TimeUnit timeoutUnit);
27
public DistributedProgramLiveInfo getLiveInfo(ProgramId program);
28
29
// Instance management methods
30
public int getWorkerInstances(ProgramId worker);
31
public void setWorkerInstances(ProgramId worker, int instances);
32
public int getServiceInstances(ServiceId service);
33
public void setServiceInstances(ServiceId service, int instances);
34
35
// Program data methods
36
public List<RunRecord> getProgramRuns(ProgramId program, String state, long startTime, long endTime, int limit);
37
public List<RunRecord> getAllProgramRuns(ProgramId program, long startTime, long endTime, int limit);
38
public String getProgramLogs(ProgramId program, long start, long stop);
39
public Map<String, String> getRuntimeArgs(ProgramId program);
40
public void setRuntimeArgs(ProgramId program, Map<String, String> runtimeArgs);
41
}
42
```
43
44
## Program Types and Status
45
46
```java { .api }
47
public enum ProgramType {
48
MAPREDUCE, WORKFLOW, SERVICE, SPARK, WORKER
49
}
50
51
public enum ProgramStatus {
52
PENDING, STARTING, RUNNING, SUSPENDED, RESUMING, COMPLETED, FAILED, KILLED, STOPPED
53
}
54
55
public class ProgramId {
56
public static ProgramId of(ApplicationId application, ProgramType type, String program);
57
public ApplicationId getApplication();
58
public ProgramType getType();
59
public String getProgram();
60
}
61
62
public class RunRecord {
63
public String getPid();
64
public long getStartTs();
65
public long getRunTs();
66
public long getStopTs();
67
public long getSuspendTs();
68
public long getResumeTs();
69
public String getStatus();
70
public Map<String, String> getProperties();
71
public ProgramRunCluster getCluster();
72
}
73
74
public class DistributedProgramLiveInfo {
75
public String getStatus();
76
public Map<String, Integer> getContainers();
77
public String getYarnAppId();
78
}
79
```
80
81
## Program Control Operations
82
83
### Basic Program Control
84
85
```java
86
// Start a program
87
ApplicationId appId = ApplicationId.of(namespace, "data-processing-app", "1.0.0");
88
ProgramId workflowId = ProgramId.of(appId, ProgramType.WORKFLOW, "data-pipeline");
89
90
programClient.start(workflowId);
91
System.out.println("Started workflow: " + workflowId.getProgram());
92
93
// Start with debug mode enabled
94
programClient.start(workflowId, true);
95
96
// Start with runtime arguments
97
Map<String, String> runtimeArgs = Map.of(
98
"input.path", "/data/input/2023/12/01",
99
"output.path", "/data/output/processed",
100
"batch.size", "1000",
101
"num.partitions", "10"
102
);
103
programClient.start(workflowId, false, runtimeArgs);
104
105
// Stop a program
106
programClient.stop(workflowId);
107
System.out.println("Stopped workflow: " + workflowId.getProgram());
108
```
109
110
### Batch Operations
111
112
```java
113
// Start multiple programs at once
114
List<BatchProgramStart> programsToStart = List.of(
115
new BatchProgramStart(
116
ProgramId.of(appId, ProgramType.SERVICE, "data-service"),
117
Map.of("port", "8080", "threads", "10")
118
),
119
new BatchProgramStart(
120
ProgramId.of(appId, ProgramType.WORKFLOW, "etl-workflow"),
121
Map.of("schedule", "daily")
122
)
123
);
124
125
List<BatchProgramResult> startResults = programClient.start(namespace, programsToStart);
126
for (BatchProgramResult result : startResults) {
127
if (result.getError() != null) {
128
System.err.println("Failed to start " + result.getProgramId() + ": " + result.getError());
129
} else {
130
System.out.println("Started " + result.getProgramId());
131
}
132
}
133
134
// Stop multiple programs
135
List<BatchProgram> programsToStop = List.of(
136
new BatchProgram(ProgramId.of(appId, ProgramType.SERVICE, "data-service")),
137
new BatchProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "etl-workflow"))
138
);
139
140
List<BatchProgramResult> stopResults = programClient.stop(namespace, programsToStop);
141
142
// Stop all programs in namespace (use with caution!)
143
programClient.stopAll(namespace);
144
```
145
146
### Application-Level Operations
147
148
```java
149
// Restart all programs in application within time range
150
long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(24); // Last 24 hours
151
long endTime = System.currentTimeMillis();
152
153
programClient.restart(appId, startTime / 1000, endTime / 1000);
154
System.out.println("Restarted all programs in application: " + appId.getApplication());
155
```
156
157
## Program Status Monitoring
158
159
### Status Checking
160
161
```java
162
// Get current program status
163
String status = programClient.getStatus(workflowId);
164
System.out.println("Workflow status: " + status);
165
166
// Get status of multiple programs
167
List<BatchProgram> programs = List.of(
168
new BatchProgram(ProgramId.of(appId, ProgramType.SERVICE, "data-service")),
169
new BatchProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "etl-workflow"))
170
);
171
172
List<BatchProgramStatus> statuses = programClient.getStatus(namespace, programs);
173
for (BatchProgramStatus programStatus : statuses) {
174
System.out.println(programStatus.getProgramId() + ": " + programStatus.getStatus());
175
}
176
```
177
178
### Wait for Status
179
180
```java
181
// Wait for program to reach specific status
182
try {
183
programClient.start(workflowId);
184
programClient.waitForStatus(workflowId, ProgramStatus.RUNNING, 60, TimeUnit.SECONDS);
185
System.out.println("Workflow is now running");
186
187
// Wait for completion
188
programClient.waitForStatus(workflowId, ProgramStatus.COMPLETED, 30, TimeUnit.MINUTES);
189
System.out.println("Workflow completed successfully");
190
191
} catch (TimeoutException e) {
192
System.err.println("Timeout waiting for status change");
193
} catch (InterruptedException e) {
194
System.err.println("Interrupted while waiting");
195
}
196
```
197
198
### Live Information
199
200
```java
201
// Get detailed live information about running program
202
DistributedProgramLiveInfo liveInfo = programClient.getLiveInfo(workflowId);
203
System.out.println("Status: " + liveInfo.getStatus());
204
System.out.println("YARN Application ID: " + liveInfo.getYarnAppId());
205
System.out.println("Containers: " + liveInfo.getContainers());
206
207
// Monitor container information
208
Map<String, Integer> containers = liveInfo.getContainers();
209
for (Map.Entry<String, Integer> entry : containers.entrySet()) {
210
System.out.println("Container " + entry.getKey() + ": " + entry.getValue() + " instances");
211
}
212
```
213
214
## Instance Management
215
216
### Worker Instance Management
217
218
```java
219
// Get current worker instances
220
ProgramId workerId = ProgramId.of(appId, ProgramType.WORKER, "data-processor");
221
int currentInstances = programClient.getWorkerInstances(workerId);
222
System.out.println("Current worker instances: " + currentInstances);
223
224
// Scale worker instances
225
int newInstances = 5;
226
programClient.setWorkerInstances(workerId, newInstances);
227
System.out.println("Scaled worker to " + newInstances + " instances");
228
229
// Dynamic scaling based on load
230
int targetInstances = calculateOptimalInstances(); // Your scaling logic
231
if (currentInstances != targetInstances) {
232
programClient.setWorkerInstances(workerId, targetInstances);
233
System.out.println("Scaled from " + currentInstances + " to " + targetInstances + " instances");
234
}
235
```
236
237
### Service Instance Management
238
239
```java
240
// Get current service instances
241
ServiceId serviceId = ServiceId.of(appId, "api-service");
242
int currentServiceInstances = programClient.getServiceInstances(serviceId);
243
System.out.println("Current service instances: " + currentServiceInstances);
244
245
// Scale service instances
246
int newServiceInstances = 3;
247
programClient.setServiceInstances(serviceId, newServiceInstances);
248
System.out.println("Scaled service to " + newServiceInstances + " instances");
249
```
250
251
## Runtime Arguments
252
253
### Managing Runtime Arguments
254
255
```java
256
// Get current runtime arguments
257
Map<String, String> currentArgs = programClient.getRuntimeArgs(workflowId);
258
System.out.println("Current runtime arguments: " + currentArgs);
259
260
// Set new runtime arguments
261
Map<String, String> newArgs = Map.of(
262
"input.path", "/data/input/latest",
263
"output.path", "/data/output/" + System.currentTimeMillis(),
264
"batch.size", "2000",
265
"enable.compression", "true",
266
"log.level", "INFO"
267
);
268
programClient.setRuntimeArgs(workflowId, newArgs);
269
System.out.println("Updated runtime arguments");
270
271
// Merge with existing arguments
272
Map<String, String> mergedArgs = new HashMap<>(currentArgs);
273
mergedArgs.putAll(Map.of(
274
"new.parameter", "new-value",
275
"updated.parameter", "updated-value"
276
));
277
programClient.setRuntimeArgs(workflowId, mergedArgs);
278
```
279
280
## Run History and Logs
281
282
### Program Run History
283
284
```java
285
// Get recent program runs
286
long endTime = System.currentTimeMillis();
287
long startTime = endTime - TimeUnit.DAYS.toMillis(7); // Last 7 days
288
int limit = 50;
289
290
List<RunRecord> runs = programClient.getProgramRuns(workflowId, "ALL", startTime, endTime, limit);
291
System.out.println("Found " + runs.size() + " runs in the last 7 days");
292
293
for (RunRecord run : runs) {
294
System.out.println("Run ID: " + run.getPid());
295
System.out.println(" Status: " + run.getStatus());
296
System.out.println(" Start: " + new Date(run.getStartTs() * 1000));
297
System.out.println(" Duration: " + (run.getStopTs() - run.getStartTs()) + " seconds");
298
System.out.println(" Properties: " + run.getProperties());
299
}
300
301
// Get runs by status
302
List<RunRecord> failedRuns = programClient.getProgramRuns(workflowId, "FAILED", startTime, endTime, limit);
303
List<RunRecord> completedRuns = programClient.getProgramRuns(workflowId, "COMPLETED", startTime, endTime, limit);
304
305
// Get all runs regardless of status
306
List<RunRecord> allRuns = programClient.getAllProgramRuns(workflowId, startTime, endTime, limit);
307
```
308
309
### Program Logs
310
311
```java
312
// Get program logs for a time range
313
long logStart = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1); // Last hour
314
long logStop = System.currentTimeMillis();
315
316
String logs = programClient.getProgramLogs(workflowId, logStart, logStop);
317
System.out.println("Program logs:");
318
System.out.println(logs);
319
320
// Parse and filter logs
321
String[] logLines = logs.split("\n");
322
for (String line : logLines) {
323
if (line.contains("ERROR")) {
324
System.err.println("Error found: " + line);
325
}
326
}
327
```
328
329
## Advanced Program Management
330
331
### Status-Based Workflow
332
333
```java
334
// Complete workflow with status monitoring
335
public void runWorkflowWithMonitoring(ProgramId workflowId, Map<String, String> args) {
336
try {
337
// Set runtime arguments
338
programClient.setRuntimeArgs(workflowId, args);
339
340
// Start the workflow
341
programClient.start(workflowId);
342
System.out.println("Started workflow: " + workflowId.getProgram());
343
344
// Wait for running status
345
programClient.waitForStatus(workflowId, ProgramStatus.RUNNING, 2, TimeUnit.MINUTES);
346
System.out.println("Workflow is running");
347
348
// Monitor progress
349
String status;
350
do {
351
Thread.sleep(30000); // Check every 30 seconds
352
status = programClient.getStatus(workflowId);
353
System.out.println("Current status: " + status);
354
355
if (status.equals("RUNNING")) {
356
DistributedProgramLiveInfo liveInfo = programClient.getLiveInfo(workflowId);
357
System.out.println("YARN App ID: " + liveInfo.getYarnAppId());
358
}
359
} while (status.equals("RUNNING") || status.equals("STARTING"));
360
361
// Check final status
362
if (status.equals("COMPLETED")) {
363
System.out.println("Workflow completed successfully");
364
} else {
365
System.err.println("Workflow failed with status: " + status);
366
367
// Get recent logs to diagnose failure
368
long now = System.currentTimeMillis();
369
String errorLogs = programClient.getProgramLogs(workflowId, now - 600000, now);
370
System.err.println("Recent logs:\n" + errorLogs);
371
}
372
373
} catch (Exception e) {
374
System.err.println("Error managing workflow: " + e.getMessage());
375
try {
376
programClient.stop(workflowId);
377
} catch (Exception stopEx) {
378
System.err.println("Error stopping workflow: " + stopEx.getMessage());
379
}
380
}
381
}
382
```
383
384
### Auto-scaling Based on Metrics
385
386
```java
387
// Auto-scaling example for worker programs
388
public void autoScaleWorker(ProgramId workerId) {
389
try {
390
int currentInstances = programClient.getWorkerInstances(workerId);
391
392
// Get program status and live info
393
String status = programClient.getStatus(workerId);
394
if (!status.equals("RUNNING")) {
395
return; // Don't scale if not running
396
}
397
398
DistributedProgramLiveInfo liveInfo = programClient.getLiveInfo(workerId);
399
400
// Your scaling logic based on metrics
401
// This is a simplified example
402
int optimalInstances = calculateOptimalInstances(liveInfo);
403
404
if (optimalInstances != currentInstances) {
405
System.out.println("Scaling worker from " + currentInstances + " to " + optimalInstances);
406
programClient.setWorkerInstances(workerId, optimalInstances);
407
408
// Wait for scaling to take effect
409
Thread.sleep(30000);
410
411
// Verify scaling
412
int newInstances = programClient.getWorkerInstances(workerId);
413
System.out.println("Scaling completed. New instance count: " + newInstances);
414
}
415
} catch (Exception e) {
416
System.err.println("Error during auto-scaling: " + e.getMessage());
417
}
418
}
419
420
private int calculateOptimalInstances(DistributedProgramLiveInfo liveInfo) {
421
// Implement your scaling logic based on:
422
// - Container utilization
423
// - Queue depths
424
// - Processing rates
425
// - Time of day
426
// etc.
427
return 3; // Simplified example
428
}
429
```
430
431
## Error Handling
432
433
Program control operations may throw these exceptions:
434
435
- **ProgramNotFoundException**: Program does not exist
436
- **NotRunningException**: Program is not currently running (for stop operations)
437
- **AlreadyRunningException**: Program is already running (for start operations)
438
- **BadRequestException**: Invalid program state or parameters
439
- **UnauthenticatedException**: Authentication required
440
- **UnauthorizedException**: Insufficient permissions
441
442
```java
443
try {
444
programClient.start(workflowId);
445
} catch (AlreadyRunningException e) {
446
System.out.println("Program is already running");
447
} catch (ProgramNotFoundException e) {
448
System.err.println("Program not found: " + workflowId);
449
} catch (UnauthorizedException e) {
450
System.err.println("No permission to start program: " + e.getMessage());
451
} catch (IOException e) {
452
System.err.println("Network error: " + e.getMessage());
453
}
454
```
455
456
## Best Practices
457
458
1. **Status Monitoring**: Always check program status before performing operations
459
2. **Timeout Handling**: Use appropriate timeouts for waitForStatus operations
460
3. **Resource Management**: Monitor and manage instance counts based on workload
461
4. **Error Handling**: Implement proper error handling and cleanup procedures
462
5. **Logging**: Regularly check program logs for issues and performance insights
463
6. **Batch Operations**: Use batch operations for managing multiple programs efficiently
464
465
```java
466
// Good: Comprehensive program management with error handling
467
public class ProgramManager {
468
private final ProgramClient programClient;
469
470
public void safeStartProgram(ProgramId programId, Map<String, String> args) {
471
try {
472
// Check current status
473
String status = programClient.getStatus(programId);
474
if ("RUNNING".equals(status)) {
475
System.out.println("Program already running: " + programId.getProgram());
476
return;
477
}
478
479
// Set runtime arguments
480
if (args != null && !args.isEmpty()) {
481
programClient.setRuntimeArgs(programId, args);
482
}
483
484
// Start program
485
programClient.start(programId);
486
487
// Wait for startup with timeout
488
programClient.waitForStatus(programId, ProgramStatus.RUNNING, 5, TimeUnit.MINUTES);
489
490
System.out.println("Successfully started program: " + programId.getProgram());
491
492
} catch (TimeoutException e) {
493
System.err.println("Timeout starting program: " + programId.getProgram());
494
safeStopProgram(programId); // Cleanup on timeout
495
} catch (Exception e) {
496
System.err.println("Error starting program: " + e.getMessage());
497
}
498
}
499
500
public void safeStopProgram(ProgramId programId) {
501
try {
502
String status = programClient.getStatus(programId);
503
if ("STOPPED".equals(status) || "COMPLETED".equals(status)) {
504
System.out.println("Program already stopped: " + programId.getProgram());
505
return;
506
}
507
508
programClient.stop(programId);
509
programClient.waitForStatus(programId, ProgramStatus.STOPPED, 2, TimeUnit.MINUTES);
510
511
System.out.println("Successfully stopped program: " + programId.getProgram());
512
513
} catch (Exception e) {
514
System.err.println("Error stopping program: " + e.getMessage());
515
}
516
}
517
}
518
```