or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-handles.mdconfiguration.mdindex.mdlaunchers.md

application-handles.mddocs/

0

# Application Handles

1

2

Comprehensive interface for monitoring and controlling running Spark applications with state-based lifecycle management and event notifications.

3

4

## Capabilities

5

6

### SparkAppHandle Interface

7

8

Primary interface for interacting with launched Spark applications, providing state monitoring, control operations, and event handling.

9

10

```java { .api }

11

/**

12

* Handle to a running Spark application providing runtime information and control actions

13

*/

14

public interface SparkAppHandle {

15

/** Add listener for state and info change notifications */

16

void addListener(Listener l);

17

18

/** Get current application state */

19

State getState();

20

21

/** Get application ID (may return null if not yet known) */

22

String getAppId();

23

24

/** Request application to stop gracefully (best-effort) */

25

void stop();

26

27

/** Force kill the underlying application process */

28

void kill();

29

30

/** Disconnect from application without stopping it */

31

void disconnect();

32

}

33

```

34

35

**Usage Examples:**

36

37

```java

38

import org.apache.spark.launcher.SparkLauncher;

39

import org.apache.spark.launcher.SparkAppHandle;

40

41

// Launch application and get handle

42

SparkAppHandle handle = new SparkLauncher()

43

.setAppResource("/apps/long-running-job.jar")

44

.setMainClass("com.company.LongRunningJob")

45

.setMaster("yarn")

46

.setDeployMode("cluster")

47

.setAppName("Long Running Analytics")

48

.startApplication();

49

50

// Monitor application state

51

System.out.println("Initial state: " + handle.getState());

52

System.out.println("Application ID: " + handle.getAppId());

53

54

// Wait for application to start running

55

while (handle.getState() == SparkAppHandle.State.UNKNOWN ||

56

handle.getState() == SparkAppHandle.State.SUBMITTED) {

57

Thread.sleep(1000);

58

System.out.println("Current state: " + handle.getState());

59

}

60

61

if (handle.getState() == SparkAppHandle.State.RUNNING) {

62

System.out.println("Application is running with ID: " + handle.getAppId());

63

64

// Application control examples

65

// Graceful shutdown after some condition

66

if (shouldStopApplication()) {

67

System.out.println("Requesting application stop...");

68

handle.stop();

69

70

// Wait for graceful shutdown with timeout

71

long timeout = System.currentTimeMillis() + 30000; // 30 seconds

72

while (!handle.getState().isFinal() && System.currentTimeMillis() < timeout) {

73

Thread.sleep(1000);

74

}

75

76

// Force kill if graceful shutdown failed

77

if (!handle.getState().isFinal()) {

78

System.err.println("Graceful shutdown timed out, force killing...");

79

handle.kill();

80

}

81

}

82

} else if (handle.getState() == SparkAppHandle.State.FAILED) {

83

System.err.println("Application failed to start");

84

}

85

86

// Disconnect from application (application continues running)

87

// handle.disconnect();

88

```

89

90

### Application State Management

91

92

Comprehensive state enumeration with final state detection for application lifecycle tracking.

93

94

```java { .api }

95

/**

96

* Application state enumeration with final state indicators

97

*/

98

public enum State {

99

/** Application has not reported back yet */

100

UNKNOWN(false),

101

102

/** Application has connected to the handle */

103

CONNECTED(false),

104

105

/** Application has been submitted to cluster */

106

SUBMITTED(false),

107

108

/** Application is running */

109

RUNNING(false),

110

111

/** Application finished with successful status (final) */

112

FINISHED(true),

113

114

/** Application finished with failed status (final) */

115

FAILED(true),

116

117

/** Application was killed (final) */

118

KILLED(true),

119

120

/** Spark Submit JVM exited with unknown status (final) */

121

LOST(true);

122

123

/** Returns true if this is a final state (application not running anymore) */

124

public boolean isFinal();

125

}

126

```

127

128

**Usage Examples:**

129

130

```java

131

import org.apache.spark.launcher.SparkAppHandle.State;

132

133

// State monitoring and decision making

134

SparkAppHandle handle = launcher.startApplication();

135

136

// Check for specific states

137

if (handle.getState() == State.UNKNOWN) {

138

System.out.println("Application hasn't reported back yet, waiting...");

139

}

140

141

if (handle.getState() == State.RUNNING) {

142

System.out.println("Application is actively running");

143

performRuntimeOperations();

144

}

145

146

// Check for final states

147

if (handle.getState().isFinal()) {

148

System.out.println("Application has completed");

149

150

switch (handle.getState()) {

151

case FINISHED:

152

System.out.println("Application completed successfully");

153

processSuccessfulCompletion();

154

break;

155

case FAILED:

156

System.err.println("Application failed");

157

handleFailure();

158

break;

159

case KILLED:

160

System.out.println("Application was killed");

161

handleKilledApplication();

162

break;

163

case LOST:

164

System.err.println("Lost connection to application");

165

handleLostConnection();

166

break;

167

}

168

}

169

170

// State transition logic

171

State previousState = State.UNKNOWN;

172

while (!handle.getState().isFinal()) {

173

State currentState = handle.getState();

174

175

if (currentState != previousState) {

176

System.out.println("State transition: " + previousState + " -> " + currentState);

177

178

// Handle specific transitions

179

if (previousState == State.SUBMITTED && currentState == State.RUNNING) {

180

System.out.println("Application started successfully");

181

onApplicationStarted();

182

}

183

184

previousState = currentState;

185

}

186

187

Thread.sleep(2000);

188

}

189

190

// Final state handling

191

System.out.println("Final application state: " + handle.getState());

192

if (handle.getState() == State.FINISHED) {

193

generateSuccessReport();

194

} else {

195

generateErrorReport();

196

}

197

```

198

199

### Event Listener Interface

200

201

Callback interface for receiving real-time notifications about application state changes and information updates.

202

203

```java { .api }

204

/**

205

* Listener interface for application handle events

206

*/

207

public interface Listener {

208

/** Called when application state changes */

209

void stateChanged(SparkAppHandle handle);

210

211

/** Called when application information changes (not state) */

212

void infoChanged(SparkAppHandle handle);

213

}

214

```

215

216

**Usage Examples:**

217

218

```java

219

import org.apache.spark.launcher.SparkAppHandle;

220

221

// Custom listener implementation

222

public class ApplicationMonitor implements SparkAppHandle.Listener {

223

private long startTime;

224

private String applicationName;

225

226

public ApplicationMonitor(String applicationName) {

227

this.applicationName = applicationName;

228

this.startTime = System.currentTimeMillis();

229

}

230

231

@Override

232

public void stateChanged(SparkAppHandle handle) {

233

long elapsed = System.currentTimeMillis() - startTime;

234

System.out.printf("[%s] %s - State changed to: %s (elapsed: %d ms)%n",

235

new java.util.Date(), applicationName, handle.getState(), elapsed);

236

237

switch (handle.getState()) {

238

case CONNECTED:

239

System.out.println("Application connected to launcher");

240

break;

241

case SUBMITTED:

242

System.out.println("Application submitted to cluster");

243

break;

244

case RUNNING:

245

System.out.println("Application is now running with ID: " + handle.getAppId());

246

sendNotification("Application started successfully");

247

break;

248

case FINISHED:

249

System.out.println("Application completed successfully");

250

sendNotification("Application finished");

251

break;

252

case FAILED:

253

System.err.println("Application failed!");

254

sendAlert("Application failure detected");

255

break;

256

case KILLED:

257

System.out.println("Application was terminated");

258

sendNotification("Application killed");

259

break;

260

case LOST:

261

System.err.println("Lost connection to application");

262

sendAlert("Connection lost to application");

263

break;

264

}

265

}

266

267

@Override

268

public void infoChanged(SparkAppHandle handle) {

269

System.out.printf("[%s] %s - Info updated for application: %s%n",

270

new java.util.Date(), applicationName, handle.getAppId());

271

}

272

273

private void sendNotification(String message) {

274

// Implementation for notifications

275

System.out.println("NOTIFICATION: " + message);

276

}

277

278

private void sendAlert(String message) {

279

// Implementation for alerts

280

System.err.println("ALERT: " + message);

281

}

282

}

283

284

// Using the custom listener

285

SparkAppHandle handle = new SparkLauncher()

286

.setAppResource("/apps/critical-job.jar")

287

.setMainClass("com.company.CriticalJob")

288

.setMaster("yarn")

289

.setDeployMode("cluster")

290

.setAppName("Critical Production Job")

291

.startApplication(new ApplicationMonitor("Critical Production Job"));

292

293

// Multiple listeners

294

handle.addListener(new SparkAppHandle.Listener() {

295

@Override

296

public void stateChanged(SparkAppHandle handle) {

297

if (handle.getState().isFinal()) {

298

logFinalState(handle);

299

cleanupResources();

300

}

301

}

302

303

@Override

304

public void infoChanged(SparkAppHandle handle) {

305

updateDashboard(handle);

306

}

307

});

308

309

// Anonymous listener for simple cases

310

handle.addListener(new SparkAppHandle.Listener() {

311

@Override

312

public void stateChanged(SparkAppHandle handle) {

313

if (handle.getState() == SparkAppHandle.State.FAILED) {

314

restartApplication();

315

}

316

}

317

318

@Override

319

public void infoChanged(SparkAppHandle handle) {

320

// No-op for info changes

321

}

322

});

323

```

324

325

## Advanced Monitoring Patterns

326

327

### Application Lifecycle Manager

328

329

```java

330

public class SparkApplicationManager {

331

private final Map<String, SparkAppHandle> runningApps = new ConcurrentHashMap<>();

332

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

333

334

public void launchAndMonitor(String appName, SparkLauncher launcher) {

335

try {

336

SparkAppHandle handle = launcher.setAppName(appName)

337

.startApplication(new ApplicationLifecycleListener(appName));

338

339

runningApps.put(appName, handle);

340

341

// Schedule periodic health checks

342

scheduler.scheduleWithFixedDelay(() -> {

343

checkApplicationHealth(appName, handle);

344

}, 30, 30, TimeUnit.SECONDS);

345

346

} catch (IOException e) {

347

System.err.println("Failed to launch application " + appName + ": " + e.getMessage());

348

}

349

}

350

351

private void checkApplicationHealth(String appName, SparkAppHandle handle) {

352

if (handle.getState().isFinal()) {

353

runningApps.remove(appName);

354

System.out.println("Removed completed application: " + appName);

355

} else if (handle.getState() == SparkAppHandle.State.UNKNOWN) {

356

// Handle stuck applications

357

System.err.println("Application " + appName + " appears stuck in UNKNOWN state");

358

}

359

}

360

361

public void stopAllApplications() {

362

runningApps.values().forEach(handle -> {

363

if (!handle.getState().isFinal()) {

364

handle.stop();

365

}

366

});

367

}

368

369

public void emergencyKillAll() {

370

runningApps.values().forEach(SparkAppHandle::kill);

371

}

372

373

private class ApplicationLifecycleListener implements SparkAppHandle.Listener {

374

private final String appName;

375

376

public ApplicationLifecycleListener(String appName) {

377

this.appName = appName;

378

}

379

380

@Override

381

public void stateChanged(SparkAppHandle handle) {

382

if (handle.getState().isFinal()) {

383

handleApplicationCompletion(appName, handle);

384

}

385

}

386

387

@Override

388

public void infoChanged(SparkAppHandle handle) {

389

updateApplicationMetrics(appName, handle);

390

}

391

}

392

393

private void handleApplicationCompletion(String appName, SparkAppHandle handle) {

394

if (handle.getState() == SparkAppHandle.State.FAILED) {

395

// Implement retry logic or failure notifications

396

scheduleRetry(appName);

397

}

398

}

399

400

private void updateApplicationMetrics(String appName, SparkAppHandle handle) {

401

// Update monitoring dashboard or metrics system

402

}

403

404

private void scheduleRetry(String appName) {

405

// Implement application retry logic

406

}

407

}

408

```

409

410

### Batch Processing Coordinator

411

412

```java

413

public class BatchProcessingCoordinator {

414

private final List<SparkAppHandle> batchJobs = new ArrayList<>();

415

private final CountDownLatch completionLatch;

416

417

public BatchProcessingCoordinator(int jobCount) {

418

this.completionLatch = new CountDownLatch(jobCount);

419

}

420

421

public void submitBatchJob(SparkLauncher launcher, String jobName) {

422

try {

423

SparkAppHandle handle = launcher.setAppName(jobName)

424

.startApplication(new BatchJobListener(jobName));

425

426

batchJobs.add(handle);

427

428

} catch (IOException e) {

429

System.err.println("Failed to submit batch job " + jobName + ": " + e.getMessage());

430

completionLatch.countDown(); // Count failed jobs as completed

431

}

432

}

433

434

public boolean waitForAllJobs(long timeout, TimeUnit unit) throws InterruptedException {

435

return completionLatch.await(timeout, unit);

436

}

437

438

public BatchResults getResults() {

439

long successful = batchJobs.stream()

440

.mapToLong(handle -> handle.getState() == SparkAppHandle.State.FINISHED ? 1 : 0)

441

.sum();

442

443

long failed = batchJobs.stream()

444

.mapToLong(handle -> handle.getState() == SparkAppHandle.State.FAILED ? 1 : 0)

445

.sum();

446

447

return new BatchResults(successful, failed, batchJobs.size());

448

}

449

450

private class BatchJobListener implements SparkAppHandle.Listener {

451

private final String jobName;

452

453

public BatchJobListener(String jobName) {

454

this.jobName = jobName;

455

}

456

457

@Override

458

public void stateChanged(SparkAppHandle handle) {

459

if (handle.getState().isFinal()) {

460

System.out.println("Batch job " + jobName + " completed with state: " + handle.getState());

461

completionLatch.countDown();

462

}

463

}

464

465

@Override

466

public void infoChanged(SparkAppHandle handle) {

467

// Log info changes for batch tracking

468

}

469

}

470

471

public static class BatchResults {

472

public final long successful;

473

public final long failed;

474

public final long total;

475

476

public BatchResults(long successful, long failed, long total) {

477

this.successful = successful;

478

this.failed = failed;

479

this.total = total;

480

}

481

482

public boolean allSuccessful() {

483

return successful == total;

484

}

485

486

public double successRate() {

487

return total > 0 ? (double) successful / total : 0.0;

488

}

489

}

490

}

491

```

492

493

## Error Handling and Recovery

494

495

### State-Based Error Detection

496

497

```java

498

public class ApplicationErrorHandler {

499

500

public void handleApplicationWithRecovery(SparkLauncher launcher, String appName) {

501

int maxRetries = 3;

502

int retryCount = 0;

503

504

while (retryCount < maxRetries) {

505

try {

506

SparkAppHandle handle = launcher.setAppName(appName + "-attempt-" + (retryCount + 1))

507

.startApplication(new RetryListener(appName, retryCount));

508

509

// Wait for completion

510

while (!handle.getState().isFinal()) {

511

Thread.sleep(5000);

512

513

// Check for stuck states

514

if (isApplicationStuck(handle)) {

515

System.err.println("Application appears stuck, killing and retrying...");

516

handle.kill();

517

break;

518

}

519

}

520

521

if (handle.getState() == SparkAppHandle.State.FINISHED) {

522

System.out.println("Application completed successfully");

523

return; // Success, exit retry loop

524

} else {

525

System.err.println("Application failed with state: " + handle.getState());

526

}

527

528

} catch (IOException e) {

529

System.err.println("Failed to launch application: " + e.getMessage());

530

} catch (InterruptedException e) {

531

Thread.currentThread().interrupt();

532

return;

533

}

534

535

retryCount++;

536

if (retryCount < maxRetries) {

537

System.out.println("Retrying application launch (" + retryCount + "/" + maxRetries + ")");

538

try {

539

Thread.sleep(10000); // Wait before retry

540

} catch (InterruptedException e) {

541

Thread.currentThread().interrupt();

542

return;

543

}

544

}

545

}

546

547

System.err.println("Application failed after " + maxRetries + " attempts");

548

}

549

550

private boolean isApplicationStuck(SparkAppHandle handle) {

551

// Implement logic to detect stuck applications

552

// e.g., application in SUBMITTED state for too long

553

return false;

554

}

555

556

private class RetryListener implements SparkAppHandle.Listener {

557

private final String appName;

558

private final int attempt;

559

560

public RetryListener(String appName, int attempt) {

561

this.appName = appName;

562

this.attempt = attempt;

563

}

564

565

@Override

566

public void stateChanged(SparkAppHandle handle) {

567

System.out.println(String.format("[%s-attempt-%d] State: %s",

568

appName, attempt + 1, handle.getState()));

569

}

570

571

@Override

572

public void infoChanged(SparkAppHandle handle) {

573

System.out.println(String.format("[%s-attempt-%d] Info updated: %s",

574

appName, attempt + 1, handle.getAppId()));

575

}

576

}

577

}

578

```

579

580

## Performance Considerations

581

582

### Listener Thread Safety

583

- Listeners are called from background threads processing application updates

584

- Avoid blocking operations in listener callbacks

585

- Use thread-safe data structures for shared state

586

- Consider using executor services for heavy processing in listeners

587

588

### State Polling vs Event-Driven

589

- Use listeners for reactive programming patterns

590

- Avoid busy-waiting on `getState()` calls

591

- Combine listeners with periodic health checks for robust monitoring

592

- Handle listener exceptions to prevent callback chain failures

593

594

### Resource Management

595

- Always handle final states to clean up resources

596

- Use `disconnect()` when monitoring is no longer needed

597

- Implement timeouts for long-running operations

598

- Consider using `kill()` as last resort for cleanup