0
# Job Execution and Management
1
2
The job execution and management capabilities provide the core APIs for creating, submitting, monitoring, and controlling Flink dataflow programs. These APIs are essential for any application that needs to execute distributed stream processing or batch jobs on a Flink cluster.
3
4
## Core Job Components
5
6
### JobGraph
7
8
The JobGraph is the central data structure representing a Flink dataflow program at the JobManager level. It defines the topology of operations and data flow between them.
9
10
```java { .api }
11
public class JobGraph implements Serializable {
12
public JobGraph(String jobName);
13
public JobGraph(JobID jobId, String jobName);
14
public JobGraph(JobVertex... vertices);
15
16
public void addVertex(JobVertex vertex);
17
public void addJar(Path jar);
18
public void addBlob(BlobKey key);
19
20
public JobVertex[] getVerticesAsArray();
21
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException;
22
public Iterable<JobVertex> getVertices();
23
public int getNumberOfVertices();
24
25
public String getName();
26
public JobID getJobID();
27
28
public Configuration getJobConfiguration();
29
30
public void setScheduleMode(ScheduleMode scheduleMode);
31
public ScheduleMode getScheduleMode();
32
33
public void setSessionTimeout(long sessionTimeout);
34
public long getSessionTimeout();
35
36
public void setAllowQueuedScheduling(boolean allowQueuedScheduling);
37
public boolean getAllowQueuedScheduling();
38
39
public SavepointRestoreSettings getSavepointRestoreSettings();
40
public void setSavepointRestoreSettings(SavepointRestoreSettings settings);
41
42
public void setExecutionConfig(ExecutionConfig executionConfig) throws IOException;
43
public SerializedValue<ExecutionConfig> getSerializedExecutionConfig();
44
45
public JobCheckpointingSettings getCheckpointingSettings();
46
public void setSnapshotSettings(JobCheckpointingSettings settings);
47
48
public List<URL> getClasspaths();
49
public void setClasspaths(List<URL> paths);
50
}
51
```
52
53
### JobVertex
54
55
Represents individual operations or vertices in the job graph, each corresponding to a task in the execution.
56
57
```java { .api }
58
public class JobVertex implements Serializable {
59
public JobVertex(String name);
60
public JobVertex(String name, JobVertexID id);
61
62
public void setInvokableClass(Class<? extends AbstractInvokable> invokable);
63
public Class<? extends AbstractInvokable> getInvokableClass();
64
65
public void setParallelism(int parallelism);
66
public int getParallelism();
67
68
public void setMaxParallelism(int maxParallelism);
69
public int getMaxParallelism();
70
71
public JobVertexID getID();
72
public String getName();
73
74
public Configuration getConfiguration();
75
public void setConfiguration(Configuration configuration);
76
77
public List<JobEdge> getInputs();
78
public List<IntermediateDataSet> getProducedDataSets();
79
}
80
```
81
82
### JobEdge
83
84
Defines connections between job vertices, specifying how data flows between operations.
85
86
```java { .api }
87
public class JobEdge implements Serializable {
88
public JobEdge(IntermediateDataSet source, JobVertex target, DistributionPattern distributionPattern);
89
90
public IntermediateDataSet getSource();
91
public JobVertex getTarget();
92
public DistributionPattern getDistributionPattern();
93
94
public void setShipStrategy(ShipStrategyType shipStrategy);
95
public ShipStrategyType getShipStrategy();
96
}
97
```
98
99
## Job Client and Execution
100
101
### JobClient
102
103
The main client class providing static methods for job submission, monitoring, and management. JobClient bridges between JobManager's asynchronous actor messages and synchronous method calls.
104
105
```java { .api }
106
public class JobClient {
107
public static ActorSystem startJobClientActorSystem(Configuration config) throws IOException;
108
109
public static JobListeningContext submitJob(
110
ActorSystem actorSystem,
111
Configuration config,
112
HighAvailabilityServices highAvailabilityServices,
113
ActorGateway jobManagerGateway,
114
JobGraph jobGraph,
115
Time timeout,
116
boolean sysoutLogUpdates,
117
ClassLoader userCodeClassLoader
118
) throws JobExecutionException;
119
120
public static JobListeningContext attachToRunningJob(
121
JobID jobID,
122
ActorGateway jobManagerGateway,
123
Configuration configuration,
124
ActorSystem actorSystem,
125
HighAvailabilityServices highAvailabilityServices,
126
Time timeout,
127
boolean sysoutLogUpdates
128
);
129
130
public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext)
131
throws JobExecutionException;
132
133
public static JobExecutionResult submitJobAndWait(
134
ActorSystem actorSystem,
135
Configuration config,
136
HighAvailabilityServices highAvailabilityServices,
137
ActorGateway jobManagerGateway,
138
JobGraph jobGraph,
139
Time timeout,
140
boolean sysoutLogUpdates,
141
ClassLoader userCodeClassLoader
142
) throws JobExecutionException;
143
144
public static void submitJobDetached(
145
ActorGateway jobManagerGateway,
146
Configuration config,
147
JobGraph jobGraph,
148
Time timeout,
149
ClassLoader userCodeClassLoader
150
) throws JobExecutionException;
151
}
152
```
153
154
### JobExecutionResult
155
156
Contains the results and metadata from a completed job execution.
157
158
```java { .api }
159
public class JobExecutionResult implements Serializable {
160
public JobExecutionResult(JobID jobID, long netRuntime, Map<String, Object> accumulatorResults);
161
162
public JobID getJobID();
163
public long getNetRuntime();
164
public Map<String, Object> getAllAccumulatorResults();
165
public <T> T getAccumulatorResult(String accumulatorName);
166
}
167
```
168
169
## Job Status and Lifecycle
170
171
### JobStatus
172
173
Enumeration of all possible job execution states.
174
175
```java { .api }
176
public enum JobStatus {
177
CREATED, // Job has been created but not yet submitted
178
RUNNING, // Job is currently executing
179
FAILING, // Job is in the process of failing
180
FAILED, // Job has failed and is no longer executing
181
CANCELLING, // Job is being cancelled
182
CANCELED, // Job has been cancelled
183
FINISHED, // Job completed successfully
184
RESTARTING, // Job is restarting from a failure
185
SUSPENDED; // Job has been suspended (for savepoints)
186
187
public boolean isGloballyTerminalState();
188
public boolean isTerminalState();
189
}
190
```
191
192
### ScheduleMode
193
194
Defines how job vertices are scheduled for execution.
195
196
```java { .api }
197
public enum ScheduleMode {
198
LAZY_FROM_SOURCES, // Schedule vertices lazily when input data is available
199
EAGER; // Schedule all vertices immediately upon job start
200
}
201
```
202
203
## Exception Handling
204
205
### JobExecutionException
206
207
Base exception for job execution failures.
208
209
```java { .api }
210
public class JobExecutionException extends FlinkException {
211
public JobExecutionException(JobID jobId, String msg);
212
public JobExecutionException(JobID jobId, String msg, Throwable cause);
213
214
public JobID getJobID();
215
}
216
```
217
218
### JobSubmissionException
219
220
Thrown when job submission fails.
221
222
```java { .api }
223
public class JobSubmissionException extends FlinkException {
224
public JobSubmissionException(String message);
225
public JobSubmissionException(String message, Throwable cause);
226
}
227
```
228
229
### JobException
230
231
General job-related exceptions.
232
233
```java { .api }
234
public class JobException extends FlinkException {
235
public JobException(String message);
236
public JobException(String message, Throwable cause);
237
}
238
```
239
240
## Usage Examples
241
242
### Creating and Configuring a JobGraph
243
244
```java
245
import org.apache.flink.runtime.jobgraph.*;
246
import org.apache.flink.configuration.Configuration;
247
248
// Create a new job
249
JobGraph jobGraph = new JobGraph("Data Processing Job");
250
251
// Configure job-level settings
252
Configuration jobConfig = new Configuration();
253
jobConfig.setString("job.checkpoint.dir", "file:///checkpoints");
254
jobGraph.setJobConfiguration(jobConfig);
255
256
// Enable checkpointing
257
jobGraph.setCheckpointingEnabled(true);
258
jobGraph.setCheckpointingInterval(30000); // 30 seconds
259
260
// Set scheduling mode
261
jobGraph.setScheduleMode(ScheduleMode.EAGER);
262
263
// Create vertices
264
JobVertex sourceVertex = new JobVertex("Data Source");
265
sourceVertex.setInvokableClass(MySourceTask.class);
266
sourceVertex.setParallelism(4);
267
268
JobVertex transformVertex = new JobVertex("Data Transform");
269
transformVertex.setInvokableClass(MyTransformTask.class);
270
transformVertex.setParallelism(8);
271
272
JobVertex sinkVertex = new JobVertex("Data Sink");
273
sinkVertex.setInvokableClass(MySinkTask.class);
274
sinkVertex.setParallelism(2);
275
276
// Add vertices to job
277
jobGraph.addVertex(sourceVertex);
278
jobGraph.addVertex(transformVertex);
279
jobGraph.addVertex(sinkVertex);
280
281
// Connect vertices with edges
282
IntermediateDataSet sourceOutput = sourceVertex.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
283
JobEdge sourceToTransform = new JobEdge(sourceOutput, transformVertex, DistributionPattern.ALL_TO_ALL);
284
jobGraph.addEdge(sourceToTransform);
285
286
IntermediateDataSet transformOutput = transformVertex.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
287
JobEdge transformToSink = new JobEdge(transformOutput, sinkVertex, DistributionPattern.FORWARD);
288
jobGraph.addEdge(transformToSink);
289
```
290
291
### Job Submission and Monitoring
292
293
```java
294
import org.apache.flink.runtime.client.JobClient;
295
import org.apache.flink.runtime.jobgraph.JobStatus;
296
297
// Submit job (using a cluster client or mini cluster)
298
JobClient jobClient = clusterClient.submitJob(jobGraph);
299
300
// Monitor job status
301
JobStatus status = jobClient.getJobStatus();
302
System.out.println("Job status: " + status);
303
304
// Wait for completion and get results
305
try {
306
JobExecutionResult result = jobClient.getJobExecutionResult();
307
System.out.println("Job completed in " + result.getNetRuntime() + " ms");
308
309
// Access accumulator results
310
Map<String, Object> accumulators = result.getAllAccumulatorResults();
311
Long recordCount = (Long) accumulators.get("records-processed");
312
313
} catch (JobExecutionException e) {
314
System.err.println("Job failed: " + e.getMessage());
315
// Handle job failure
316
}
317
```
318
319
### Handling Job Cancellation
320
321
```java
322
// Cancel a running job
323
try {
324
jobClient.cancel();
325
System.out.println("Job cancellation initiated");
326
327
// Wait for cancellation to complete
328
while (true) {
329
JobStatus status = jobClient.getJobStatus();
330
if (status == JobStatus.CANCELED || status == JobStatus.FAILED) {
331
System.out.println("Job terminated with status: " + status);
332
break;
333
}
334
Thread.sleep(1000);
335
}
336
337
} catch (JobExecutionException e) {
338
System.err.println("Failed to cancel job: " + e.getMessage());
339
}
340
```
341
342
## Common Patterns
343
344
### Configuring Job Parameters
345
346
```java
347
// Set job-level configuration
348
Configuration jobConfig = new Configuration();
349
jobConfig.setString("state.backend", "filesystem");
350
jobConfig.setString("state.checkpoints.dir", "file:///checkpoints");
351
jobConfig.setLong("execution.checkpointing.interval", 60000L);
352
jobConfig.setInteger("parallelism.default", 4);
353
354
jobGraph.setJobConfiguration(jobConfig);
355
```
356
357
### Setting Up Fault Tolerance
358
359
```java
360
// Enable checkpointing for fault tolerance
361
jobGraph.setCheckpointingEnabled(true);
362
jobGraph.setCheckpointingInterval(30000); // Checkpoint every 30 seconds
363
364
// Configure checkpoint settings in job configuration
365
Configuration jobConfig = jobGraph.getJobConfiguration();
366
jobConfig.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
367
jobConfig.setLong("execution.checkpointing.timeout", 600000L); // 10 minutes
368
jobConfig.setInteger("execution.checkpointing.max-concurrent-checkpoints", 1);
369
```