0
# Distributed Execution
1
2
Distributed execution framework built on Apache Twill that provides scalable, fault-tolerant Spark application deployment across YARN clusters with proper resource management, lifecycle control, and integration with CDAP's distributed application infrastructure.
3
4
## Capabilities
5
6
### Spark Execution Service
7
8
Service for managing distributed Spark execution with full lifecycle management and resource allocation across cluster nodes.
9
10
```java { .api }
11
/**
12
* Service for managing distributed Spark execution
13
* Provides scalable deployment and management of Spark applications across clusters
14
*/
15
public class SparkExecutionService {
16
/**
17
* Submits a Spark program for distributed execution
18
* @param programRunId Unique identifier for the program run
19
* @param programOptions Configuration options for program execution
20
* @return Future containing the program controller for managing execution
21
* @throws ExecutionException if submission fails
22
*/
23
public ListenableFuture<ProgramController> submit(ProgramRunId programRunId, ProgramOptions programOptions);
24
25
/**
26
* Stops the execution service and all running programs
27
* Gracefully shuts down all managed Spark applications
28
*/
29
public void stop();
30
31
/**
32
* Gets the current state of the execution service
33
* @return ServiceState indicating current service status
34
*/
35
public ServiceState getState();
36
37
/**
38
* Gets information about running programs
39
* @return Set of ProgramRunId for currently running programs
40
*/
41
public Set<ProgramRunId> getRunningPrograms();
42
43
/**
44
* Gets program controller for a specific run
45
* @param programRunId Program run identifier
46
* @return ProgramController for the specified run, or null if not found
47
*/
48
public ProgramController getProgramController(ProgramRunId programRunId);
49
}
50
```
51
52
### Spark Twill Runnable
53
54
Twill runnable implementation that enables Spark applications to run as distributed applications with proper resource management and fault tolerance.
55
56
```java { .api }
57
/**
58
* Twill runnable for distributed Spark execution
59
* Enables Spark applications to run as distributed services with fault tolerance
60
*/
61
public class SparkTwillRunnable implements TwillRunnable {
62
/**
63
* Main execution method for the runnable
64
* Starts the Spark application and manages its lifecycle
65
*/
66
public void run();
67
68
/**
69
* Stops the running Spark application gracefully
70
* Ensures proper cleanup of resources and state
71
*/
72
public void stop();
73
74
/**
75
* Handles commands sent to the running application
76
* @param command Command to execute
77
* @throws Exception if command execution fails
78
*/
79
public void handleCommand(Command command) throws Exception;
80
81
/**
82
* Initializes the runnable with context
83
* @param context Twill runtime context
84
*/
85
public void initialize(TwillContext context);
86
87
/**
88
* Destroys the runnable and cleans up resources
89
*/
90
public void destroy();
91
92
/**
93
* Gets the Twill context
94
* @return TwillContext for accessing runtime information
95
*/
96
protected TwillContext getContext();
97
}
98
```
99
100
### Spark Twill Program Controller
101
102
Program controller implementation for managing distributed Spark execution through the Twill framework.
103
104
```java { .api }
105
/**
106
* Program controller for distributed Spark execution via Twill
107
* Provides lifecycle management and command interface for distributed Spark programs
108
*/
109
public class SparkTwillProgramController implements ProgramController {
110
/**
111
* Sends a command to the distributed Spark program
112
* @param command Command name to execute
113
* @param args Command arguments
114
* @return Future representing the command execution result
115
* @throws Exception if command execution fails
116
*/
117
public ListenableFuture<ProgramController> command(String command, Object... args) throws Exception;
118
119
/**
120
* Stops the distributed Spark program gracefully
121
* @return Future representing the stop operation
122
* @throws Exception if stop operation fails
123
*/
124
public ListenableFuture<ProgramController> stop() throws Exception;
125
126
/**
127
* Kills the distributed Spark program forcefully
128
* @return Future representing the kill operation
129
*/
130
public ListenableFuture<ProgramController> kill();
131
132
/**
133
* Gets the current state of the program
134
* @return Current program state
135
*/
136
public State getState();
137
138
/**
139
* Gets the program run ID
140
* @return ProgramRunId identifying this program run
141
*/
142
public ProgramRunId getProgramRunId();
143
144
/**
145
* Gets the Twill controller for low-level operations
146
* @return TwillController for direct Twill operations
147
*/
148
public TwillController getTwillController();
149
150
/**
151
* Gets resource report for the running program
152
* @return ResourceReport containing resource usage information
153
*/
154
public ResourceReport getResourceReport();
155
156
/**
157
* Adds a listener for program state changes
158
* @param listener Listener to be notified of state changes
159
*/
160
public void addListener(Listener listener);
161
}
162
```
163
164
### Distributed Execution Context
165
166
Context for distributed execution that provides access to cluster information and resource management.
167
168
```java { .api }
169
/**
170
* Context for distributed Spark execution
171
* Provides access to cluster information and distributed resources
172
*/
173
public class DistributedExecutionContext {
174
/**
175
* Gets the number of executor instances
176
* @return Number of Spark executor instances
177
*/
178
public int getExecutorInstances();
179
180
/**
181
* Gets executor resource allocation
182
* @return Resources allocated to each executor
183
*/
184
public Resources getExecutorResources();
185
186
/**
187
* Gets driver resource allocation
188
* @return Resources allocated to the driver
189
*/
190
public Resources getDriverResources();
191
192
/**
193
* Gets the cluster configuration
194
* @return Configuration for the target cluster
195
*/
196
public Configuration getClusterConfiguration();
197
198
/**
199
* Gets the YARN application ID (if running on YARN)
200
* @return Application ID or null if not running on YARN
201
*/
202
public ApplicationId getYarnApplicationId();
203
204
/**
205
* Gets the list of executor hosts
206
* @return Set of hostnames running executors
207
*/
208
public Set<String> getExecutorHosts();
209
210
/**
211
* Scales the number of executors
212
* @param targetExecutors Desired number of executors
213
* @return Future indicating completion of scaling operation
214
*/
215
public ListenableFuture<Boolean> scaleExecutors(int targetExecutors);
216
}
217
```
218
219
## Usage Examples
220
221
**Basic Distributed Execution:**
222
223
```java
224
import co.cask.cdap.app.runtime.spark.distributed.SparkExecutionService;
225
import co.cask.cdap.app.runtime.ProgramController;
226
import co.cask.cdap.proto.id.ProgramRunId;
227
228
// Create execution service
229
SparkExecutionService executionService = new SparkExecutionService(
230
cConf, locationFactory, discoveryServiceClient
231
);
232
233
// Submit Spark program for distributed execution
234
ProgramRunId runId = new ProgramRunId("namespace", "app", ProgramType.SPARK, "program", "run-1");
235
ListenableFuture<ProgramController> future = executionService.submit(runId, programOptions);
236
237
// Get controller when submission completes
238
ProgramController controller = future.get();
239
240
// Monitor program state
241
System.out.println("Program state: " + controller.getState());
242
243
// Send commands to program
244
controller.command("scale-executors", 10).get();
245
246
// Stop program gracefully
247
controller.stop().get();
248
```
249
250
**Twill Runnable Implementation:**
251
252
```java
253
import co.cask.cdap.app.runtime.spark.distributed.SparkTwillRunnable;
254
import org.apache.twill.api.TwillContext;
255
import org.apache.twill.api.Command;
256
257
public class MySparkTwillRunnable extends SparkTwillRunnable {
258
259
@Override
260
public void initialize(TwillContext context) {
261
super.initialize(context);
262
263
// Get instance information
264
int instanceId = context.getInstanceId();
265
int instanceCount = context.getInstanceCount();
266
267
System.out.println(String.format(
268
"Initializing instance %d of %d", instanceId, instanceCount
269
));
270
}
271
272
@Override
273
public void run() {
274
try {
275
// Initialize Spark context
276
SparkContext sparkContext = createSparkContext();
277
278
// Run Spark application
279
runSparkApplication(sparkContext);
280
281
// Keep running until stopped
282
while (!Thread.currentThread().isInterrupted()) {
283
Thread.sleep(1000);
284
}
285
286
} catch (InterruptedException e) {
287
Thread.currentThread().interrupt();
288
} finally {
289
cleanup();
290
}
291
}
292
293
@Override
294
public void handleCommand(Command command) throws Exception {
295
String commandName = command.getCommand();
296
297
switch (commandName) {
298
case "scale-executors":
299
int targetCount = Integer.parseInt(command.getOptions().get("count"));
300
scaleExecutors(targetCount);
301
break;
302
303
case "checkpoint":
304
checkpointApplication();
305
break;
306
307
default:
308
super.handleCommand(command);
309
}
310
}
311
}
312
```
313
314
**Program Controller Usage:**
315
316
```java
317
import co.cask.cdap.app.runtime.spark.distributed.SparkTwillProgramController;
318
import co.cask.cdap.app.runtime.ProgramController.Listener;
319
320
// Create program controller
321
SparkTwillProgramController controller = new SparkTwillProgramController(
322
twillController, programRunId
323
);
324
325
// Add state change listener
326
controller.addListener(new Listener() {
327
@Override
328
public void init(State currentState, Throwable cause) {
329
System.out.println("Program initialized with state: " + currentState);
330
}
331
332
@Override
333
public void stateChanged(State newState, Throwable cause) {
334
System.out.println("Program state changed to: " + newState);
335
if (cause != null) {
336
System.err.println("State change caused by error: " + cause.getMessage());
337
}
338
}
339
});
340
341
// Monitor resource usage
342
ResourceReport report = controller.getResourceReport();
343
for (TwillRunResources resources : report.getResources()) {
344
System.out.println(String.format(
345
"Instance %d: %d cores, %d MB memory",
346
resources.getInstanceId(),
347
resources.getVirtualCores(),
348
resources.getMemoryMB()
349
));
350
}
351
352
// Send custom commands
353
controller.command("scale-executors", "count", "20").get();
354
controller.command("checkpoint").get();
355
```
356
357
**Cluster Resource Management:**
358
359
```java
360
import co.cask.cdap.app.runtime.spark.distributed.DistributedExecutionContext;
361
362
// Create execution context
363
DistributedExecutionContext context = new DistributedExecutionContext(
364
sparkConf, yarnClient, resourceManager
365
);
366
367
// Get current resource allocation
368
int executors = context.getExecutorInstances();
369
Resources executorResources = context.getExecutorResources();
370
Resources driverResources = context.getDriverResources();
371
372
System.out.println(String.format(
373
"Current allocation: %d executors, %d MB memory each, %d cores each",
374
executors,
375
executorResources.getMemoryMB(),
376
executorResources.getVirtualCores()
377
));
378
379
// Scale based on workload
380
if (workloadSize > threshold) {
381
int targetExecutors = Math.min(workloadSize / batchSize, maxExecutors);
382
context.scaleExecutors(targetExecutors).get();
383
}
384
385
// Monitor executor distribution
386
Set<String> executorHosts = context.getExecutorHosts();
387
System.out.println("Executors running on hosts: " + executorHosts);
388
```
389
390
## Types
391
392
```java { .api }
393
/**
394
* Service state enumeration for execution services
395
*/
396
public enum ServiceState {
397
STARTING, // Service is starting up
398
RUNNING, // Service is running and accepting requests
399
STOPPING, // Service is shutting down
400
STOPPED, // Service has stopped
401
FAILED // Service encountered a fatal error
402
}
403
404
/**
405
* Resource report containing information about distributed resources
406
*/
407
public interface ResourceReport {
408
/**
409
* Gets resources for all instances
410
* @return Collection of TwillRunResources for each instance
411
*/
412
Collection<TwillRunResources> getResources();
413
414
/**
415
* Gets the application master resources
416
* @return TwillRunResources for the application master
417
*/
418
TwillRunResources getAppMasterResources();
419
420
/**
421
* Gets the services information
422
* @return Map of service names to their resource information
423
*/
424
Map<String, Collection<TwillRunResources>> getServices();
425
}
426
427
/**
428
* Resources for a Twill run instance
429
*/
430
public interface TwillRunResources {
431
/**
432
* Gets the instance ID
433
* @return Instance identifier
434
*/
435
int getInstanceId();
436
437
/**
438
* Gets allocated virtual cores
439
* @return Number of virtual cores
440
*/
441
int getVirtualCores();
442
443
/**
444
* Gets allocated memory in MB
445
* @return Memory allocation in megabytes
446
*/
447
int getMemoryMB();
448
449
/**
450
* Gets the host name
451
* @return Host where this instance is running
452
*/
453
String getHost();
454
455
/**
456
* Gets the container ID
457
* @return Container identifier from resource manager
458
*/
459
String getContainerId();
460
}
461
462
/**
463
* Twill controller interface for low-level operations
464
*/
465
public interface TwillController {
466
/**
467
* Starts the Twill application
468
* @return Future indicating start completion
469
*/
470
ListenableFuture<TwillController> start();
471
472
/**
473
* Stops the Twill application
474
* @return Future indicating stop completion
475
*/
476
ListenableFuture<TwillController> terminate();
477
478
/**
479
* Kills the Twill application
480
* @return Future indicating kill completion
481
*/
482
ListenableFuture<TwillController> kill();
483
484
/**
485
* Sends a command to the application
486
* @param command Command to send
487
* @return Future indicating command completion
488
*/
489
ListenableFuture<TwillController> sendCommand(Command command);
490
491
/**
492
* Gets resource report
493
* @return ResourceReport containing current resource usage
494
*/
495
ResourceReport getResourceReport();
496
}
497
498
/**
499
* Command interface for Twill applications
500
*/
501
public interface Command {
502
/**
503
* Gets the command name
504
* @return Command identifier
505
*/
506
String getCommand();
507
508
/**
509
* Gets command options
510
* @return Map of option key-value pairs
511
*/
512
Map<String, String> getOptions();
513
}
514
515
/**
516
* Twill context providing runtime information
517
*/
518
public interface TwillContext {
519
/**
520
* Gets the instance ID
521
* @return Instance identifier (0-based)
522
*/
523
int getInstanceId();
524
525
/**
526
* Gets the total instance count
527
* @return Total number of instances
528
*/
529
int getInstanceCount();
530
531
/**
532
* Gets the host information
533
* @return Host where this instance is running
534
*/
535
String getHost();
536
537
/**
538
* Gets allocated resources
539
* @return TwillRunResources for this instance
540
*/
541
TwillRunResources getResourceAllocation();
542
543
/**
544
* Announces a service endpoint
545
* @param serviceName Name of the service
546
* @param port Port number
547
*/
548
void announce(String serviceName, int port);
549
}
550
551
/**
552
* YARN application ID wrapper
553
*/
554
public class ApplicationId {
555
/**
556
* Gets the cluster timestamp
557
* @return Cluster timestamp component
558
*/
559
public long getClusterTimestamp();
560
561
/**
562
* Gets the application ID
563
* @return Application ID component
564
*/
565
public int getId();
566
567
/**
568
* Gets the string representation
569
* @return Full application ID string
570
*/
571
@Override
572
public String toString();
573
}
574
```