0
# Runtime Job Management
1
2
SPI for launching, monitoring, and managing runtime jobs within provisioned clusters. The runtime job system handles job lifecycle from submission through completion with comprehensive status tracking, resource management, and environment abstraction. This enables pluggable job execution engines across different compute environments.
3
4
## Capabilities
5
6
### Runtime Job Manager Interface
7
8
Main SPI interface for managing the lifecycle of runtime jobs. Implementations provide platform-specific job management logic for environments like Hadoop YARN, Kubernetes, or cloud-native job services.
9
10
```java { .api }
11
/**
12
* Manages runtime job lifecycle with platform-specific implementations
13
* Must be closed to release resources properly
14
*/
15
interface RuntimeJobManager extends Closeable {
16
/**
17
* Launch a new runtime job
18
* @param jobInfo Information needed to launch the job
19
* @throws Exception if job launch fails
20
*/
21
void launch(RuntimeJobInfo jobInfo) throws Exception;
22
23
/**
24
* Get details about a running job
25
* @param programRunInfo Program run information to look up
26
* @return Job details if found, empty if not found
27
* @throws Exception if lookup fails
28
*/
29
Optional<RuntimeJobDetail> getDetail(ProgramRunInfo programRunInfo) throws Exception;
30
31
/**
32
* Stop a running job gracefully
33
* @param programRunInfo Program run information
34
* @throws Exception if stop operation fails
35
*/
36
void stop(ProgramRunInfo programRunInfo) throws Exception;
37
38
/**
39
* Kill a running job immediately
40
* @param runtimeJobDetail Job details including current status
41
* @throws Exception if kill operation fails
42
*/
43
void kill(RuntimeJobDetail runtimeJobDetail) throws Exception;
44
45
/** Clean up manager resources */
46
void close() throws IOException;
47
}
48
```
49
50
**Usage Example:**
51
52
```java
53
public class KubernetesJobManager implements RuntimeJobManager {
54
private final KubernetesClient k8sClient;
55
56
@Override
57
public void launch(RuntimeJobInfo jobInfo) throws Exception {
58
ProgramRunInfo runInfo = jobInfo.getProgramRunInfo();
59
60
// Create Kubernetes Job spec
61
Job job = new JobBuilder()
62
.withNewMetadata()
63
.withName(generateJobName(runInfo))
64
.withLabels(createJobLabels(runInfo))
65
.endMetadata()
66
.withNewSpec()
67
// Configure job spec from jobInfo
68
.withParallelism(1)
69
.withNewTemplate()
70
.withNewSpec()
71
.addNewContainer()
72
.withName("cdap-runtime")
73
.withImage("cdap/runtime:latest")
74
.withArgs(jobInfo.getArguments().toArray(new String[0]))
75
.withNewResources()
76
.addToRequests("cpu", new Quantity(String.valueOf(jobInfo.getVirtualCores())))
77
.addToRequests("memory", new Quantity(jobInfo.getMemoryMb() + "Mi"))
78
.endResources()
79
.endContainer()
80
.endSpec()
81
.endTemplate()
82
.endSpec()
83
.build();
84
85
k8sClient.batch().v1().jobs().create(job);
86
}
87
88
@Override
89
public Optional<RuntimeJobDetail> getDetail(ProgramRunInfo programRunInfo) throws Exception {
90
String jobName = generateJobName(programRunInfo);
91
Job job = k8sClient.batch().v1().jobs().withName(jobName).get();
92
93
if (job == null) {
94
return Optional.empty();
95
}
96
97
RuntimeJobStatus status = mapJobStatus(job.getStatus());
98
return Optional.of(new RuntimeJobDetail(programRunInfo, status));
99
}
100
101
// Other method implementations...
102
}
103
```
104
105
### Runtime Job Interface
106
107
Interface representing an individual job that can be executed in a runtime environment.
108
109
```java { .api }
110
/**
111
* Represents a job executed in a runtime environment
112
* Implementations define the actual job logic
113
*/
114
interface RuntimeJob {
115
/**
116
* Execute the job in the given environment
117
* @param environment Runtime environment providing necessary services
118
* @throws Exception if job execution fails
119
*/
120
void run(RuntimeJobEnvironment environment) throws Exception;
121
122
/**
123
* Request the job to stop gracefully
124
* Implementation should handle this asynchronously
125
*/
126
void requestStop();
127
}
128
```
129
130
### Runtime Job Information
131
132
Interface defining all information needed to launch a runtime job.
133
134
```java { .api }
135
/**
136
* Information needed to launch a runtime job
137
* Provides job configuration, resources, and execution parameters
138
*/
139
interface RuntimeJobInfo {
140
/**
141
* Get files that need to be localized for the job
142
* @return Collection of LocalFile objects for job execution
143
*/
144
Collection<? extends LocalFile> getLocalizeFiles();
145
146
/**
147
* Get the runtime job class name to execute
148
* @return Fully qualified class name implementing RuntimeJob
149
*/
150
String getRuntimeJobClassname();
151
152
/**
153
* Get program run information
154
* @return Program run details
155
*/
156
ProgramRunInfo getProgramRunInfo();
157
158
/**
159
* Get JVM properties for the job process
160
* @return Map of JVM system properties
161
*/
162
Map<String, String> getJvmProperties();
163
164
/**
165
* Get command line arguments for the job process
166
* @return Map of command line arguments
167
*/
168
Map<String, String> getArguments();
169
170
/**
171
* Get number of virtual CPU cores requested
172
* @return Number of virtual cores
173
*/
174
int getVirtualCores();
175
176
/**
177
* Get memory allocation in megabytes
178
* @return Memory in MB
179
*/
180
int getMemoryMb();
181
}
182
```
183
184
### Runtime Job Environment
185
186
Interface providing the runtime environment and services available to executing jobs.
187
188
```java { .api }
189
/**
190
* Runtime environment for job execution
191
* Provides access to services and configuration needed by jobs
192
*/
193
interface RuntimeJobEnvironment {
194
/**
195
* Get location factory for file system operations
196
* @return Location factory instance
197
*/
198
LocationFactory getLocationFactory();
199
200
/**
201
* Get Twill runner for launching distributed applications
202
* @return Twill runner instance
203
*/
204
org.apache.twill.api.TwillRunner getTwillRunner();
205
206
/**
207
* Get environment-specific properties
208
* @return Map of environment properties
209
*/
210
Map<String, String> getProperties();
211
212
/**
213
* Get launch mode for the job
214
* @return Launch mode (CLIENT or CLUSTER)
215
*/
216
LaunchMode getLaunchMode();
217
}
218
```
219
220
### Runtime Job Detail
221
222
Data class containing runtime job information and current status.
223
224
```java { .api }
225
/**
226
* Runtime job details including status information
227
*/
228
class RuntimeJobDetail {
229
/**
230
* Get program run information for this job
231
* @return Program run info
232
*/
233
ProgramRunInfo getRunInfo();
234
235
/**
236
* Get current job status
237
* @return Current runtime job status
238
*/
239
RuntimeJobStatus getStatus();
240
}
241
```
242
243
### Launch Mode
244
245
Enumeration defining how jobs should be launched.
246
247
```java { .api }
248
/**
249
* Program launch mode defining execution environment
250
*/
251
enum LaunchMode {
252
/**
253
* Launch job in-process within the client
254
* Suitable for lightweight jobs or development
255
*/
256
CLIENT,
257
258
/**
259
* Launch job in separate container/process
260
* Suitable for production workloads requiring isolation
261
*/
262
CLUSTER
263
}
264
```
265
266
### Runtime Job Status
267
268
Comprehensive status tracking for runtime jobs with termination state information.
269
270
```java { .api }
271
/**
272
* Status values for runtime jobs with termination information
273
*/
274
enum RuntimeJobStatus {
275
/** Job is starting up */
276
STARTING(false),
277
278
/** Job is running normally */
279
RUNNING(false),
280
281
/** Job is in the process of stopping */
282
STOPPING(false),
283
284
/** Job has stopped (may be restarted) */
285
STOPPED(true),
286
287
/** Job completed successfully */
288
COMPLETED(true),
289
290
/** Job failed with an error */
291
FAILED(true),
292
293
/** Job status is unknown */
294
UNKNOWN(false);
295
296
private final boolean terminated;
297
298
RuntimeJobStatus(boolean terminated) {
299
this.terminated = terminated;
300
}
301
302
/**
303
* Check if this status represents a terminated job
304
* @return true if job is in terminal state
305
*/
306
public boolean isTerminated() {
307
return terminated;
308
}
309
}
310
```
311
312
**Usage Example:**
313
314
```java
315
RuntimeJobDetail jobDetail = jobManager.getDetail(programRunInfo).orElse(null);
316
if (jobDetail != null) {
317
RuntimeJobStatus status = jobDetail.getStatus();
318
319
if (status.isTerminated()) {
320
System.out.println("Job has completed with status: " + status);
321
322
switch (status) {
323
case COMPLETED:
324
System.out.println("Job completed successfully");
325
break;
326
case FAILED:
327
System.out.println("Job failed - check logs");
328
break;
329
case STOPPED:
330
System.out.println("Job was stopped");
331
break;
332
}
333
} else {
334
System.out.println("Job is still running with status: " + status);
335
}
336
}
337
```
338
339
### Exception Types
340
341
Specialized exceptions for runtime job operations.
342
343
```java { .api }
344
/**
345
* Exception for program runs that completed but failed
346
* Indicates the job execution itself failed rather than job management
347
*/
348
class ProgramRunFailureException extends RuntimeException {
349
/**
350
* Create exception with failure message
351
* @param message Description of the failure
352
*/
353
public ProgramRunFailureException(String message);
354
}
355
```
356
357
**Usage Example:**
358
359
```java
360
public class MyRuntimeJob implements RuntimeJob {
361
@Override
362
public void run(RuntimeJobEnvironment environment) throws Exception {
363
try {
364
// Job logic here
365
boolean success = executeJobLogic(environment);
366
367
if (!success) {
368
throw new ProgramRunFailureException("Job logic failed validation");
369
}
370
371
} catch (Exception e) {
372
// Convert to appropriate exception type
373
if (isJobLogicFailure(e)) {
374
throw new ProgramRunFailureException("Job failed: " + e.getMessage());
375
} else {
376
// Re-throw infrastructure failures as-is
377
throw e;
378
}
379
}
380
}
381
382
@Override
383
public void requestStop() {
384
// Implement graceful shutdown logic
385
this.shouldStop = true;
386
}
387
388
private boolean executeJobLogic(RuntimeJobEnvironment env) {
389
// Implementation specific logic
390
return true;
391
}
392
}
393
```