or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-management.mdartifact-management.mdconfiguration.mddata-operations.mddataset-operations.mdindex.mdmetrics-monitoring.mdprogram-control.mdschedule-management.mdsecurity-administration.mdservice-management.md

program-control.mddocs/

0

# Program Control and Monitoring

1

2

The ProgramClient provides comprehensive program lifecycle control including start, stop, restart operations, status monitoring, instance management, and run history tracking. Programs are the executable components within CDAP applications.

3

4

## ProgramClient

5

6

```java { .api }

7

public class ProgramClient {

8

// Constructors

9

public ProgramClient(ClientConfig config);

10

public ProgramClient(ClientConfig config, RESTClient restClient);

11

public ProgramClient(ClientConfig config, RESTClient restClient, ApplicationClient applicationClient);

12

13

// Program control methods

14

public void start(ProgramId program);

15

public void start(ProgramId program, boolean debug);

16

public void start(ProgramId program, boolean debug, Map<String, String> runtimeArgs);

17

public List<BatchProgramResult> start(NamespaceId namespace, List<BatchProgramStart> programs);

18

public void restart(ApplicationId applicationId, long startTimeSeconds, long endTimeSeconds);

19

public void stop(ProgramId programId);

20

public List<BatchProgramResult> stop(NamespaceId namespace, List<BatchProgram> programs);

21

public void stopAll(NamespaceId namespace);

22

23

// Program status methods

24

public String getStatus(ProgramId programId);

25

public List<BatchProgramStatus> getStatus(NamespaceId namespace, List<BatchProgram> programs);

26

public void waitForStatus(ProgramId program, ProgramStatus status, long timeout, TimeUnit timeoutUnit);

27

public DistributedProgramLiveInfo getLiveInfo(ProgramId program);

28

29

// Instance management methods

30

public int getWorkerInstances(ProgramId worker);

31

public void setWorkerInstances(ProgramId worker, int instances);

32

public int getServiceInstances(ServiceId service);

33

public void setServiceInstances(ServiceId service, int instances);

34

35

// Program data methods

36

public List<RunRecord> getProgramRuns(ProgramId program, String state, long startTime, long endTime, int limit);

37

public List<RunRecord> getAllProgramRuns(ProgramId program, long startTime, long endTime, int limit);

38

public String getProgramLogs(ProgramId program, long start, long stop);

39

public Map<String, String> getRuntimeArgs(ProgramId program);

40

public void setRuntimeArgs(ProgramId program, Map<String, String> runtimeArgs);

41

}

42

```

43

44

## Program Types and Status

45

46

```java { .api }

47

public enum ProgramType {

48

MAPREDUCE, WORKFLOW, SERVICE, SPARK, WORKER

49

}

50

51

public enum ProgramStatus {

52

PENDING, STARTING, RUNNING, SUSPENDED, RESUMING, COMPLETED, FAILED, KILLED, STOPPED

53

}

54

55

public class ProgramId {

56

public static ProgramId of(ApplicationId application, ProgramType type, String program);

57

public ApplicationId getApplication();

58

public ProgramType getType();

59

public String getProgram();

60

}

61

62

public class RunRecord {

63

public String getPid();

64

public long getStartTs();

65

public long getRunTs();

66

public long getStopTs();

67

public long getSuspendTs();

68

public long getResumeTs();

69

public String getStatus();

70

public Map<String, String> getProperties();

71

public ProgramRunCluster getCluster();

72

}

73

74

public class DistributedProgramLiveInfo {

75

public String getStatus();

76

public Map<String, Integer> getContainers();

77

public String getYarnAppId();

78

}

79

```

80

81

## Program Control Operations

82

83

### Basic Program Control

84

85

```java

86

// Start a program

87

ApplicationId appId = ApplicationId.of(namespace, "data-processing-app", "1.0.0");

88

ProgramId workflowId = ProgramId.of(appId, ProgramType.WORKFLOW, "data-pipeline");

89

90

programClient.start(workflowId);

91

System.out.println("Started workflow: " + workflowId.getProgram());

92

93

// Start with debug mode enabled

94

programClient.start(workflowId, true);

95

96

// Start with runtime arguments

97

Map<String, String> runtimeArgs = Map.of(

98

"input.path", "/data/input/2023/12/01",

99

"output.path", "/data/output/processed",

100

"batch.size", "1000",

101

"num.partitions", "10"

102

);

103

programClient.start(workflowId, false, runtimeArgs);

104

105

// Stop a program

106

programClient.stop(workflowId);

107

System.out.println("Stopped workflow: " + workflowId.getProgram());

108

```

109

110

### Batch Operations

111

112

```java

113

// Start multiple programs at once

114

List<BatchProgramStart> programsToStart = List.of(

115

new BatchProgramStart(

116

ProgramId.of(appId, ProgramType.SERVICE, "data-service"),

117

Map.of("port", "8080", "threads", "10")

118

),

119

new BatchProgramStart(

120

ProgramId.of(appId, ProgramType.WORKFLOW, "etl-workflow"),

121

Map.of("schedule", "daily")

122

)

123

);

124

125

List<BatchProgramResult> startResults = programClient.start(namespace, programsToStart);

126

for (BatchProgramResult result : startResults) {

127

if (result.getError() != null) {

128

System.err.println("Failed to start " + result.getProgramId() + ": " + result.getError());

129

} else {

130

System.out.println("Started " + result.getProgramId());

131

}

132

}

133

134

// Stop multiple programs

135

List<BatchProgram> programsToStop = List.of(

136

new BatchProgram(ProgramId.of(appId, ProgramType.SERVICE, "data-service")),

137

new BatchProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "etl-workflow"))

138

);

139

140

List<BatchProgramResult> stopResults = programClient.stop(namespace, programsToStop);

141

142

// Stop all programs in namespace (use with caution!)

143

programClient.stopAll(namespace);

144

```

145

146

### Application-Level Operations

147

148

```java

149

// Restart all programs in application within time range

150

long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(24); // Last 24 hours

151

long endTime = System.currentTimeMillis();

152

153

programClient.restart(appId, startTime / 1000, endTime / 1000);

154

System.out.println("Restarted all programs in application: " + appId.getApplication());

155

```

156

157

## Program Status Monitoring

158

159

### Status Checking

160

161

```java

162

// Get current program status

163

String status = programClient.getStatus(workflowId);

164

System.out.println("Workflow status: " + status);

165

166

// Get status of multiple programs

167

List<BatchProgram> programs = List.of(

168

new BatchProgram(ProgramId.of(appId, ProgramType.SERVICE, "data-service")),

169

new BatchProgram(ProgramId.of(appId, ProgramType.WORKFLOW, "etl-workflow"))

170

);

171

172

List<BatchProgramStatus> statuses = programClient.getStatus(namespace, programs);

173

for (BatchProgramStatus programStatus : statuses) {

174

System.out.println(programStatus.getProgramId() + ": " + programStatus.getStatus());

175

}

176

```

177

178

### Wait for Status

179

180

```java

181

// Wait for program to reach specific status

182

try {

183

programClient.start(workflowId);

184

programClient.waitForStatus(workflowId, ProgramStatus.RUNNING, 60, TimeUnit.SECONDS);

185

System.out.println("Workflow is now running");

186

187

// Wait for completion

188

programClient.waitForStatus(workflowId, ProgramStatus.COMPLETED, 30, TimeUnit.MINUTES);

189

System.out.println("Workflow completed successfully");

190

191

} catch (TimeoutException e) {

192

System.err.println("Timeout waiting for status change");

193

} catch (InterruptedException e) {

194

System.err.println("Interrupted while waiting");

195

}

196

```

197

198

### Live Information

199

200

```java

201

// Get detailed live information about running program

202

DistributedProgramLiveInfo liveInfo = programClient.getLiveInfo(workflowId);

203

System.out.println("Status: " + liveInfo.getStatus());

204

System.out.println("YARN Application ID: " + liveInfo.getYarnAppId());

205

System.out.println("Containers: " + liveInfo.getContainers());

206

207

// Monitor container information

208

Map<String, Integer> containers = liveInfo.getContainers();

209

for (Map.Entry<String, Integer> entry : containers.entrySet()) {

210

System.out.println("Container " + entry.getKey() + ": " + entry.getValue() + " instances");

211

}

212

```

213

214

## Instance Management

215

216

### Worker Instance Management

217

218

```java

219

// Get current worker instances

220

ProgramId workerId = ProgramId.of(appId, ProgramType.WORKER, "data-processor");

221

int currentInstances = programClient.getWorkerInstances(workerId);

222

System.out.println("Current worker instances: " + currentInstances);

223

224

// Scale worker instances

225

int newInstances = 5;

226

programClient.setWorkerInstances(workerId, newInstances);

227

System.out.println("Scaled worker to " + newInstances + " instances");

228

229

// Dynamic scaling based on load

230

int targetInstances = calculateOptimalInstances(); // Your scaling logic

231

if (currentInstances != targetInstances) {

232

programClient.setWorkerInstances(workerId, targetInstances);

233

System.out.println("Scaled from " + currentInstances + " to " + targetInstances + " instances");

234

}

235

```

236

237

### Service Instance Management

238

239

```java

240

// Get current service instances

241

ServiceId serviceId = ServiceId.of(appId, "api-service");

242

int currentServiceInstances = programClient.getServiceInstances(serviceId);

243

System.out.println("Current service instances: " + currentServiceInstances);

244

245

// Scale service instances

246

int newServiceInstances = 3;

247

programClient.setServiceInstances(serviceId, newServiceInstances);

248

System.out.println("Scaled service to " + newServiceInstances + " instances");

249

```

250

251

## Runtime Arguments

252

253

### Managing Runtime Arguments

254

255

```java

256

// Get current runtime arguments

257

Map<String, String> currentArgs = programClient.getRuntimeArgs(workflowId);

258

System.out.println("Current runtime arguments: " + currentArgs);

259

260

// Set new runtime arguments

261

Map<String, String> newArgs = Map.of(

262

"input.path", "/data/input/latest",

263

"output.path", "/data/output/" + System.currentTimeMillis(),

264

"batch.size", "2000",

265

"enable.compression", "true",

266

"log.level", "INFO"

267

);

268

programClient.setRuntimeArgs(workflowId, newArgs);

269

System.out.println("Updated runtime arguments");

270

271

// Merge with existing arguments

272

Map<String, String> mergedArgs = new HashMap<>(currentArgs);

273

mergedArgs.putAll(Map.of(

274

"new.parameter", "new-value",

275

"updated.parameter", "updated-value"

276

));

277

programClient.setRuntimeArgs(workflowId, mergedArgs);

278

```

279

280

## Run History and Logs

281

282

### Program Run History

283

284

```java

285

// Get recent program runs

286

long endTime = System.currentTimeMillis();

287

long startTime = endTime - TimeUnit.DAYS.toMillis(7); // Last 7 days

288

int limit = 50;

289

290

List<RunRecord> runs = programClient.getProgramRuns(workflowId, "ALL", startTime, endTime, limit);

291

System.out.println("Found " + runs.size() + " runs in the last 7 days");

292

293

for (RunRecord run : runs) {

294

System.out.println("Run ID: " + run.getPid());

295

System.out.println(" Status: " + run.getStatus());

296

System.out.println(" Start: " + new Date(run.getStartTs() * 1000));

297

System.out.println(" Duration: " + (run.getStopTs() - run.getStartTs()) + " seconds");

298

System.out.println(" Properties: " + run.getProperties());

299

}

300

301

// Get runs by status

302

List<RunRecord> failedRuns = programClient.getProgramRuns(workflowId, "FAILED", startTime, endTime, limit);

303

List<RunRecord> completedRuns = programClient.getProgramRuns(workflowId, "COMPLETED", startTime, endTime, limit);

304

305

// Get all runs regardless of status

306

List<RunRecord> allRuns = programClient.getAllProgramRuns(workflowId, startTime, endTime, limit);

307

```

308

309

### Program Logs

310

311

```java

312

// Get program logs for a time range

313

long logStart = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1); // Last hour

314

long logStop = System.currentTimeMillis();

315

316

String logs = programClient.getProgramLogs(workflowId, logStart, logStop);

317

System.out.println("Program logs:");

318

System.out.println(logs);

319

320

// Parse and filter logs

321

String[] logLines = logs.split("\n");

322

for (String line : logLines) {

323

if (line.contains("ERROR")) {

324

System.err.println("Error found: " + line);

325

}

326

}

327

```

328

329

## Advanced Program Management

330

331

### Status-Based Workflow

332

333

```java

334

// Complete workflow with status monitoring

335

public void runWorkflowWithMonitoring(ProgramId workflowId, Map<String, String> args) {

336

try {

337

// Set runtime arguments

338

programClient.setRuntimeArgs(workflowId, args);

339

340

// Start the workflow

341

programClient.start(workflowId);

342

System.out.println("Started workflow: " + workflowId.getProgram());

343

344

// Wait for running status

345

programClient.waitForStatus(workflowId, ProgramStatus.RUNNING, 2, TimeUnit.MINUTES);

346

System.out.println("Workflow is running");

347

348

// Monitor progress

349

String status;

350

do {

351

Thread.sleep(30000); // Check every 30 seconds

352

status = programClient.getStatus(workflowId);

353

System.out.println("Current status: " + status);

354

355

if (status.equals("RUNNING")) {

356

DistributedProgramLiveInfo liveInfo = programClient.getLiveInfo(workflowId);

357

System.out.println("YARN App ID: " + liveInfo.getYarnAppId());

358

}

359

} while (status.equals("RUNNING") || status.equals("STARTING"));

360

361

// Check final status

362

if (status.equals("COMPLETED")) {

363

System.out.println("Workflow completed successfully");

364

} else {

365

System.err.println("Workflow failed with status: " + status);

366

367

// Get recent logs to diagnose failure

368

long now = System.currentTimeMillis();

369

String errorLogs = programClient.getProgramLogs(workflowId, now - 600000, now);

370

System.err.println("Recent logs:\n" + errorLogs);

371

}

372

373

} catch (Exception e) {

374

System.err.println("Error managing workflow: " + e.getMessage());

375

try {

376

programClient.stop(workflowId);

377

} catch (Exception stopEx) {

378

System.err.println("Error stopping workflow: " + stopEx.getMessage());

379

}

380

}

381

}

382

```

383

384

### Auto-scaling Based on Metrics

385

386

```java

387

// Auto-scaling example for worker programs

388

public void autoScaleWorker(ProgramId workerId) {

389

try {

390

int currentInstances = programClient.getWorkerInstances(workerId);

391

392

// Get program status and live info

393

String status = programClient.getStatus(workerId);

394

if (!status.equals("RUNNING")) {

395

return; // Don't scale if not running

396

}

397

398

DistributedProgramLiveInfo liveInfo = programClient.getLiveInfo(workerId);

399

400

// Your scaling logic based on metrics

401

// This is a simplified example

402

int optimalInstances = calculateOptimalInstances(liveInfo);

403

404

if (optimalInstances != currentInstances) {

405

System.out.println("Scaling worker from " + currentInstances + " to " + optimalInstances);

406

programClient.setWorkerInstances(workerId, optimalInstances);

407

408

// Wait for scaling to take effect

409

Thread.sleep(30000);

410

411

// Verify scaling

412

int newInstances = programClient.getWorkerInstances(workerId);

413

System.out.println("Scaling completed. New instance count: " + newInstances);

414

}

415

} catch (Exception e) {

416

System.err.println("Error during auto-scaling: " + e.getMessage());

417

}

418

}

419

420

private int calculateOptimalInstances(DistributedProgramLiveInfo liveInfo) {

421

// Implement your scaling logic based on:

422

// - Container utilization

423

// - Queue depths

424

// - Processing rates

425

// - Time of day

426

// etc.

427

return 3; // Simplified example

428

}

429

```

430

431

## Error Handling

432

433

Program control operations may throw these exceptions:

434

435

- **ProgramNotFoundException**: Program does not exist

436

- **NotRunningException**: Program is not currently running (for stop operations)

437

- **AlreadyRunningException**: Program is already running (for start operations)

438

- **BadRequestException**: Invalid program state or parameters

439

- **UnauthenticatedException**: Authentication required

440

- **UnauthorizedException**: Insufficient permissions

441

442

```java

443

try {

444

programClient.start(workflowId);

445

} catch (AlreadyRunningException e) {

446

System.out.println("Program is already running");

447

} catch (ProgramNotFoundException e) {

448

System.err.println("Program not found: " + workflowId);

449

} catch (UnauthorizedException e) {

450

System.err.println("No permission to start program: " + e.getMessage());

451

} catch (IOException e) {

452

System.err.println("Network error: " + e.getMessage());

453

}

454

```

455

456

## Best Practices

457

458

1. **Status Monitoring**: Always check program status before performing operations

459

2. **Timeout Handling**: Use appropriate timeouts for waitForStatus operations

460

3. **Resource Management**: Monitor and manage instance counts based on workload

461

4. **Error Handling**: Implement proper error handling and cleanup procedures

462

5. **Logging**: Regularly check program logs for issues and performance insights

463

6. **Batch Operations**: Use batch operations for managing multiple programs efficiently

464

465

```java

466

// Good: Comprehensive program management with error handling

467

public class ProgramManager {

468

private final ProgramClient programClient;

469

470

public void safeStartProgram(ProgramId programId, Map<String, String> args) {

471

try {

472

// Check current status

473

String status = programClient.getStatus(programId);

474

if ("RUNNING".equals(status)) {

475

System.out.println("Program already running: " + programId.getProgram());

476

return;

477

}

478

479

// Set runtime arguments

480

if (args != null && !args.isEmpty()) {

481

programClient.setRuntimeArgs(programId, args);

482

}

483

484

// Start program

485

programClient.start(programId);

486

487

// Wait for startup with timeout

488

programClient.waitForStatus(programId, ProgramStatus.RUNNING, 5, TimeUnit.MINUTES);

489

490

System.out.println("Successfully started program: " + programId.getProgram());

491

492

} catch (TimeoutException e) {

493

System.err.println("Timeout starting program: " + programId.getProgram());

494

safeStopProgram(programId); // Cleanup on timeout

495

} catch (Exception e) {

496

System.err.println("Error starting program: " + e.getMessage());

497

}

498

}

499

500

public void safeStopProgram(ProgramId programId) {

501

try {

502

String status = programClient.getStatus(programId);

503

if ("STOPPED".equals(status) || "COMPLETED".equals(status)) {

504

System.out.println("Program already stopped: " + programId.getProgram());

505

return;

506

}

507

508

programClient.stop(programId);

509

programClient.waitForStatus(programId, ProgramStatus.STOPPED, 2, TimeUnit.MINUTES);

510

511

System.out.println("Successfully stopped program: " + programId.getProgram());

512

513

} catch (Exception e) {

514

System.err.println("Error stopping program: " + e.getMessage());

515

}

516

}

517

}

518

```