or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-processing.mddistributed-execution.mddynamic-compilation.mdexecution-contexts.mdhttp-services.mdindex.mdruntime-providers.mdtransaction-management.md

distributed-execution.mddocs/

0

# Distributed Execution

1

2

Distributed execution framework built on Apache Twill that provides scalable, fault-tolerant Spark application deployment across YARN clusters with proper resource management, lifecycle control, and integration with CDAP's distributed application infrastructure.

3

4

## Capabilities

5

6

### Spark Execution Service

7

8

Service for managing distributed Spark execution with full lifecycle management and resource allocation across cluster nodes.

9

10

```java { .api }

11

/**

12

* Service for managing distributed Spark execution

13

* Provides scalable deployment and management of Spark applications across clusters

14

*/

15

public class SparkExecutionService {

16

/**

17

* Submits a Spark program for distributed execution

18

* @param programRunId Unique identifier for the program run

19

* @param programOptions Configuration options for program execution

20

* @return Future containing the program controller for managing execution

21

* @throws ExecutionException if submission fails

22

*/

23

public ListenableFuture<ProgramController> submit(ProgramRunId programRunId, ProgramOptions programOptions);

24

25

/**

26

* Stops the execution service and all running programs

27

* Gracefully shuts down all managed Spark applications

28

*/

29

public void stop();

30

31

/**

32

* Gets the current state of the execution service

33

* @return ServiceState indicating current service status

34

*/

35

public ServiceState getState();

36

37

/**

38

* Gets information about running programs

39

* @return Set of ProgramRunId for currently running programs

40

*/

41

public Set<ProgramRunId> getRunningPrograms();

42

43

/**

44

* Gets program controller for a specific run

45

* @param programRunId Program run identifier

46

* @return ProgramController for the specified run, or null if not found

47

*/

48

public ProgramController getProgramController(ProgramRunId programRunId);

49

}

50

```

51

52

### Spark Twill Runnable

53

54

Twill runnable implementation that enables Spark applications to run as distributed applications with proper resource management and fault tolerance.

55

56

```java { .api }

57

/**

58

* Twill runnable for distributed Spark execution

59

* Enables Spark applications to run as distributed services with fault tolerance

60

*/

61

public class SparkTwillRunnable implements TwillRunnable {

62

/**

63

* Main execution method for the runnable

64

* Starts the Spark application and manages its lifecycle

65

*/

66

public void run();

67

68

/**

69

* Stops the running Spark application gracefully

70

* Ensures proper cleanup of resources and state

71

*/

72

public void stop();

73

74

/**

75

* Handles commands sent to the running application

76

* @param command Command to execute

77

* @throws Exception if command execution fails

78

*/

79

public void handleCommand(Command command) throws Exception;

80

81

/**

82

* Initializes the runnable with context

83

* @param context Twill runtime context

84

*/

85

public void initialize(TwillContext context);

86

87

/**

88

* Destroys the runnable and cleans up resources

89

*/

90

public void destroy();

91

92

/**

93

* Gets the Twill context

94

* @return TwillContext for accessing runtime information

95

*/

96

protected TwillContext getContext();

97

}

98

```

99

100

### Spark Twill Program Controller

101

102

Program controller implementation for managing distributed Spark execution through the Twill framework.

103

104

```java { .api }

105

/**

106

* Program controller for distributed Spark execution via Twill

107

* Provides lifecycle management and command interface for distributed Spark programs

108

*/

109

public class SparkTwillProgramController implements ProgramController {

110

/**

111

* Sends a command to the distributed Spark program

112

* @param command Command name to execute

113

* @param args Command arguments

114

* @return Future representing the command execution result

115

* @throws Exception if command execution fails

116

*/

117

public ListenableFuture<ProgramController> command(String command, Object... args) throws Exception;

118

119

/**

120

* Stops the distributed Spark program gracefully

121

* @return Future representing the stop operation

122

* @throws Exception if stop operation fails

123

*/

124

public ListenableFuture<ProgramController> stop() throws Exception;

125

126

/**

127

* Kills the distributed Spark program forcefully

128

* @return Future representing the kill operation

129

*/

130

public ListenableFuture<ProgramController> kill();

131

132

/**

133

* Gets the current state of the program

134

* @return Current program state

135

*/

136

public State getState();

137

138

/**

139

* Gets the program run ID

140

* @return ProgramRunId identifying this program run

141

*/

142

public ProgramRunId getProgramRunId();

143

144

/**

145

* Gets the Twill controller for low-level operations

146

* @return TwillController for direct Twill operations

147

*/

148

public TwillController getTwillController();

149

150

/**

151

* Gets resource report for the running program

152

* @return ResourceReport containing resource usage information

153

*/

154

public ResourceReport getResourceReport();

155

156

/**

157

* Adds a listener for program state changes

158

* @param listener Listener to be notified of state changes

159

*/

160

public void addListener(Listener listener);

161

}

162

```

163

164

### Distributed Execution Context

165

166

Context for distributed execution that provides access to cluster information and resource management.

167

168

```java { .api }

169

/**

170

* Context for distributed Spark execution

171

* Provides access to cluster information and distributed resources

172

*/

173

public class DistributedExecutionContext {

174

/**

175

* Gets the number of executor instances

176

* @return Number of Spark executor instances

177

*/

178

public int getExecutorInstances();

179

180

/**

181

* Gets executor resource allocation

182

* @return Resources allocated to each executor

183

*/

184

public Resources getExecutorResources();

185

186

/**

187

* Gets driver resource allocation

188

* @return Resources allocated to the driver

189

*/

190

public Resources getDriverResources();

191

192

/**

193

* Gets the cluster configuration

194

* @return Configuration for the target cluster

195

*/

196

public Configuration getClusterConfiguration();

197

198

/**

199

* Gets the YARN application ID (if running on YARN)

200

* @return Application ID or null if not running on YARN

201

*/

202

public ApplicationId getYarnApplicationId();

203

204

/**

205

* Gets the list of executor hosts

206

* @return Set of hostnames running executors

207

*/

208

public Set<String> getExecutorHosts();

209

210

/**

211

* Scales the number of executors

212

* @param targetExecutors Desired number of executors

213

* @return Future indicating completion of scaling operation

214

*/

215

public ListenableFuture<Boolean> scaleExecutors(int targetExecutors);

216

}

217

```

218

219

## Usage Examples

220

221

**Basic Distributed Execution:**

222

223

```java

224

import co.cask.cdap.app.runtime.spark.distributed.SparkExecutionService;

225

import co.cask.cdap.app.runtime.ProgramController;

226

import co.cask.cdap.proto.id.ProgramRunId;

227

228

// Create execution service

229

SparkExecutionService executionService = new SparkExecutionService(

230

cConf, locationFactory, discoveryServiceClient

231

);

232

233

// Submit Spark program for distributed execution

234

ProgramRunId runId = new ProgramRunId("namespace", "app", ProgramType.SPARK, "program", "run-1");

235

ListenableFuture<ProgramController> future = executionService.submit(runId, programOptions);

236

237

// Get controller when submission completes

238

ProgramController controller = future.get();

239

240

// Monitor program state

241

System.out.println("Program state: " + controller.getState());

242

243

// Send commands to program

244

controller.command("scale-executors", 10).get();

245

246

// Stop program gracefully

247

controller.stop().get();

248

```

249

250

**Twill Runnable Implementation:**

251

252

```java

253

import co.cask.cdap.app.runtime.spark.distributed.SparkTwillRunnable;

254

import org.apache.twill.api.TwillContext;

255

import org.apache.twill.api.Command;

256

257

public class MySparkTwillRunnable extends SparkTwillRunnable {

258

259

@Override

260

public void initialize(TwillContext context) {

261

super.initialize(context);

262

263

// Get instance information

264

int instanceId = context.getInstanceId();

265

int instanceCount = context.getInstanceCount();

266

267

System.out.println(String.format(

268

"Initializing instance %d of %d", instanceId, instanceCount

269

));

270

}

271

272

@Override

273

public void run() {

274

try {

275

// Initialize Spark context

276

SparkContext sparkContext = createSparkContext();

277

278

// Run Spark application

279

runSparkApplication(sparkContext);

280

281

// Keep running until stopped

282

while (!Thread.currentThread().isInterrupted()) {

283

Thread.sleep(1000);

284

}

285

286

} catch (InterruptedException e) {

287

Thread.currentThread().interrupt();

288

} finally {

289

cleanup();

290

}

291

}

292

293

@Override

294

public void handleCommand(Command command) throws Exception {

295

String commandName = command.getCommand();

296

297

switch (commandName) {

298

case "scale-executors":

299

int targetCount = Integer.parseInt(command.getOptions().get("count"));

300

scaleExecutors(targetCount);

301

break;

302

303

case "checkpoint":

304

checkpointApplication();

305

break;

306

307

default:

308

super.handleCommand(command);

309

}

310

}

311

}

312

```

313

314

**Program Controller Usage:**

315

316

```java

317

import co.cask.cdap.app.runtime.spark.distributed.SparkTwillProgramController;

318

import co.cask.cdap.app.runtime.ProgramController.Listener;

319

320

// Create program controller

321

SparkTwillProgramController controller = new SparkTwillProgramController(

322

twillController, programRunId

323

);

324

325

// Add state change listener

326

controller.addListener(new Listener() {

327

@Override

328

public void init(State currentState, Throwable cause) {

329

System.out.println("Program initialized with state: " + currentState);

330

}

331

332

@Override

333

public void stateChanged(State newState, Throwable cause) {

334

System.out.println("Program state changed to: " + newState);

335

if (cause != null) {

336

System.err.println("State change caused by error: " + cause.getMessage());

337

}

338

}

339

});

340

341

// Monitor resource usage

342

ResourceReport report = controller.getResourceReport();

343

for (TwillRunResources resources : report.getResources()) {

344

System.out.println(String.format(

345

"Instance %d: %d cores, %d MB memory",

346

resources.getInstanceId(),

347

resources.getVirtualCores(),

348

resources.getMemoryMB()

349

));

350

}

351

352

// Send custom commands

353

controller.command("scale-executors", "count", "20").get();

354

controller.command("checkpoint").get();

355

```

356

357

**Cluster Resource Management:**

358

359

```java

360

import co.cask.cdap.app.runtime.spark.distributed.DistributedExecutionContext;

361

362

// Create execution context

363

DistributedExecutionContext context = new DistributedExecutionContext(

364

sparkConf, yarnClient, resourceManager

365

);

366

367

// Get current resource allocation

368

int executors = context.getExecutorInstances();

369

Resources executorResources = context.getExecutorResources();

370

Resources driverResources = context.getDriverResources();

371

372

System.out.println(String.format(

373

"Current allocation: %d executors, %d MB memory each, %d cores each",

374

executors,

375

executorResources.getMemoryMB(),

376

executorResources.getVirtualCores()

377

));

378

379

// Scale based on workload

380

if (workloadSize > threshold) {

381

int targetExecutors = Math.min(workloadSize / batchSize, maxExecutors);

382

context.scaleExecutors(targetExecutors).get();

383

}

384

385

// Monitor executor distribution

386

Set<String> executorHosts = context.getExecutorHosts();

387

System.out.println("Executors running on hosts: " + executorHosts);

388

```

389

390

## Types

391

392

```java { .api }

393

/**

394

* Service state enumeration for execution services

395

*/

396

public enum ServiceState {

397

STARTING, // Service is starting up

398

RUNNING, // Service is running and accepting requests

399

STOPPING, // Service is shutting down

400

STOPPED, // Service has stopped

401

FAILED // Service encountered a fatal error

402

}

403

404

/**

405

* Resource report containing information about distributed resources

406

*/

407

public interface ResourceReport {

408

/**

409

* Gets resources for all instances

410

* @return Collection of TwillRunResources for each instance

411

*/

412

Collection<TwillRunResources> getResources();

413

414

/**

415

* Gets the application master resources

416

* @return TwillRunResources for the application master

417

*/

418

TwillRunResources getAppMasterResources();

419

420

/**

421

* Gets the services information

422

* @return Map of service names to their resource information

423

*/

424

Map<String, Collection<TwillRunResources>> getServices();

425

}

426

427

/**

428

* Resources for a Twill run instance

429

*/

430

public interface TwillRunResources {

431

/**

432

* Gets the instance ID

433

* @return Instance identifier

434

*/

435

int getInstanceId();

436

437

/**

438

* Gets allocated virtual cores

439

* @return Number of virtual cores

440

*/

441

int getVirtualCores();

442

443

/**

444

* Gets allocated memory in MB

445

* @return Memory allocation in megabytes

446

*/

447

int getMemoryMB();

448

449

/**

450

* Gets the host name

451

* @return Host where this instance is running

452

*/

453

String getHost();

454

455

/**

456

* Gets the container ID

457

* @return Container identifier from resource manager

458

*/

459

String getContainerId();

460

}

461

462

/**

463

* Twill controller interface for low-level operations

464

*/

465

public interface TwillController {

466

/**

467

* Starts the Twill application

468

* @return Future indicating start completion

469

*/

470

ListenableFuture<TwillController> start();

471

472

/**

473

* Stops the Twill application

474

* @return Future indicating stop completion

475

*/

476

ListenableFuture<TwillController> terminate();

477

478

/**

479

* Kills the Twill application

480

* @return Future indicating kill completion

481

*/

482

ListenableFuture<TwillController> kill();

483

484

/**

485

* Sends a command to the application

486

* @param command Command to send

487

* @return Future indicating command completion

488

*/

489

ListenableFuture<TwillController> sendCommand(Command command);

490

491

/**

492

* Gets resource report

493

* @return ResourceReport containing current resource usage

494

*/

495

ResourceReport getResourceReport();

496

}

497

498

/**

499

* Command interface for Twill applications

500

*/

501

public interface Command {

502

/**

503

* Gets the command name

504

* @return Command identifier

505

*/

506

String getCommand();

507

508

/**

509

* Gets command options

510

* @return Map of option key-value pairs

511

*/

512

Map<String, String> getOptions();

513

}

514

515

/**

516

* Twill context providing runtime information

517

*/

518

public interface TwillContext {

519

/**

520

* Gets the instance ID

521

* @return Instance identifier (0-based)

522

*/

523

int getInstanceId();

524

525

/**

526

* Gets the total instance count

527

* @return Total number of instances

528

*/

529

int getInstanceCount();

530

531

/**

532

* Gets the host information

533

* @return Host where this instance is running

534

*/

535

String getHost();

536

537

/**

538

* Gets allocated resources

539

* @return TwillRunResources for this instance

540

*/

541

TwillRunResources getResourceAllocation();

542

543

/**

544

* Announces a service endpoint

545

* @param serviceName Name of the service

546

* @param port Port number

547

*/

548

void announce(String serviceName, int port);

549

}

550

551

/**

552

* YARN application ID wrapper

553

*/

554

public class ApplicationId {

555

/**

556

* Gets the cluster timestamp

557

* @return Cluster timestamp component

558

*/

559

public long getClusterTimestamp();

560

561

/**

562

* Gets the application ID

563

* @return Application ID component

564

*/

565

public int getId();

566

567

/**

568

* Gets the string representation

569

* @return Full application ID string

570

*/

571

@Override

572

public String toString();

573

}

574

```