0
# Job Graph Management
1
2
Core APIs for defining and configuring Flink jobs as directed acyclic graphs (DAGs) of operations. The JobGraph represents the logical structure of a Flink job, while JobVertex represents individual operations within the graph.
3
4
## Capabilities
5
6
### JobGraph
7
8
The main container representing a complete Flink job with its configuration, vertices, and execution parameters.
9
10
```java { .api }
11
/**
12
* Represents a Flink dataflow program at the low level that the JobManager accepts.
13
* All programs from higher level APIs are transformed into JobGraphs.
14
*/
15
public class JobGraph implements ExecutionPlan {
16
/** Create a new JobGraph with the given name */
17
public JobGraph(String jobName);
18
19
/** Create a new JobGraph with specific ID and name */
20
public JobGraph(JobID jobId, String jobName);
21
22
/** Add a vertex to this job graph */
23
public void addVertex(JobVertex vertex);
24
25
/** Get all vertices in this job graph */
26
public Collection<JobVertex> getVertices();
27
28
/** Get specific vertex by ID */
29
public JobVertex findVertexByID(JobVertexID id);
30
31
/** Get the job ID */
32
public JobID getJobID();
33
34
/** Get the job name */
35
public String getName();
36
37
/** Set the job type (BATCH or STREAMING) */
38
public void setJobType(JobType jobType);
39
40
/** Get the job type */
41
public JobType getJobType();
42
43
/** Set whether this job uses dynamic graph changes */
44
public void setDynamic(boolean dynamic);
45
46
/** Check if this job uses dynamic graph changes */
47
public boolean isDynamic();
48
49
/** Get the job configuration */
50
public Configuration getJobConfiguration();
51
52
/** Set the job configuration */
53
public void setJobConfiguration(Configuration jobConfiguration);
54
55
/** Enable checkpointing for this job */
56
public void setSnapshotSettings(JobCheckpointingSettings settings);
57
58
/** Get checkpointing settings */
59
public SerializedValue<JobCheckpointingSettings> getCheckpointingSettings();
60
61
/** Set savepoint restore settings */
62
public void setSavepointRestoreSettings(SavepointRestoreSettings settings);
63
64
/** Get savepoint restore settings */
65
public SavepointRestoreSettings getSavepointRestoreSettings();
66
}
67
```
68
69
**Usage Examples:**
70
71
```java
72
// Create a basic job graph
73
JobGraph jobGraph = new JobGraph("Data Processing Pipeline");
74
jobGraph.setJobType(JobType.STREAMING);
75
76
// Configure job-level settings
77
Configuration jobConfig = new Configuration();
78
jobConfig.setString(ConfigConstants.FLINK_TM_JVM_PARAMS, "-Xmx1024m");
79
jobGraph.setJobConfiguration(jobConfig);
80
81
// Enable checkpointing
82
JobCheckpointingSettings checkpointSettings = new JobCheckpointingSettings(
83
vertexIdsToTrigger,
84
vertexIdsToWaitFor,
85
vertexIdsToCommitTo,
86
new CheckpointCoordinatorConfiguration.Builder()
87
.setCheckpointInterval(5000L)
88
.setCheckpointTimeout(60000L)
89
.setMaxConcurrentCheckpoints(1)
90
.build(),
91
null
92
);
93
jobGraph.setSnapshotSettings(checkpointSettings);
94
```
95
96
### JobVertex
97
98
Represents a single operation (vertex) in the job graph, such as a source, transformation, or sink.
99
100
```java { .api }
101
/**
102
* The base class for job vertices representing operations in the job graph.
103
*/
104
public class JobVertex implements Serializable {
105
/** Create a job vertex with default name */
106
public JobVertex(String name);
107
108
/** Create a job vertex with specific ID and name */
109
public JobVertex(String name, JobVertexID id);
110
111
/** Get the vertex ID */
112
public JobVertexID getId();
113
114
/** Get the vertex name */
115
public String getName();
116
117
/** Set the name of this vertex */
118
public void setName(String name);
119
120
/** Set the parallelism for this vertex */
121
public void setParallelism(int parallelism);
122
123
/** Get the parallelism of this vertex */
124
public int getParallelism();
125
126
/** Set the maximum parallelism for this vertex */
127
public void setMaxParallelism(int maxParallelism);
128
129
/** Get the maximum parallelism of this vertex */
130
public int getMaxParallelism();
131
132
/** Set the invokable class that implements the vertex logic */
133
public void setInvokableClass(Class<? extends TaskInvokable> invokable);
134
135
/** Get the invokable class */
136
public Class<? extends TaskInvokable> getInvokableClass();
137
138
/** Set minimum resource requirements */
139
public void setResources(ResourceSpec minResources, ResourceSpec preferredResources);
140
141
/** Get minimum resource requirements */
142
public ResourceSpec getMinResources();
143
144
/** Get preferred resource requirements */
145
public ResourceSpec getPreferredResources();
146
147
/** Add a new data set as input from another vertex */
148
public void connectNewDataSetAsInput(
149
JobVertex input,
150
DistributionPattern distPattern,
151
ResultPartitionType partitionType
152
);
153
154
/** Connect to an existing intermediate data set */
155
public void connectDataSetAsInput(
156
IntermediateDataSet dataSet,
157
DistributionPattern distPattern
158
);
159
160
/** Get all input edges */
161
public List<JobEdge> getInputs();
162
163
/** Get all produced data sets */
164
public List<IntermediateDataSet> getProducedDataSets();
165
166
/** Set slot sharing group for co-location */
167
public void setSlotSharingGroup(SlotSharingGroup slotSharingGroup);
168
169
/** Get slot sharing group */
170
public SlotSharingGroup getSlotSharingGroup();
171
172
/** Set co-location group for strict co-location */
173
public void setCoLocationGroup(CoLocationGroup coLocationGroup);
174
175
/** Get co-location group */
176
public CoLocationGroup getCoLocationGroup();
177
178
/** Add operator coordinator */
179
public void addOperatorCoordinator(SerializedValue<OperatorCoordinator.Provider> coordinator);
180
181
/** Get operator coordinators */
182
public List<SerializedValue<OperatorCoordinator.Provider>> getOperatorCoordinators();
183
}
184
```
185
186
**Usage Examples:**
187
188
```java
189
// Create source vertex
190
JobVertex sourceVertex = new JobVertex("Kafka Source");
191
sourceVertex.setParallelism(4);
192
sourceVertex.setInvokableClass(KafkaSourceTask.class);
193
194
// Set resource requirements
195
ResourceSpec minResources = ResourceSpec.newBuilder()
196
.setCpuCores(1.0)
197
.setHeapMemoryInMB(512)
198
.build();
199
ResourceSpec preferredResources = ResourceSpec.newBuilder()
200
.setCpuCores(2.0)
201
.setHeapMemoryInMB(1024)
202
.build();
203
sourceVertex.setResources(minResources, preferredResources);
204
205
// Create transformation vertex
206
JobVertex mapVertex = new JobVertex("Data Transformation");
207
mapVertex.setParallelism(8);
208
mapVertex.setInvokableClass(MapTask.class);
209
210
// Connect vertices
211
mapVertex.connectNewDataSetAsInput(
212
sourceVertex,
213
DistributionPattern.ALL_TO_ALL,
214
ResultPartitionType.PIPELINED
215
);
216
217
// Set up slot sharing for efficient resource usage
218
SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
219
sourceVertex.setSlotSharingGroup(slotSharingGroup);
220
mapVertex.setSlotSharingGroup(slotSharingGroup);
221
222
// Add vertices to job graph
223
jobGraph.addVertex(sourceVertex);
224
jobGraph.addVertex(mapVertex);
225
```
226
227
### JobVertex Configuration
228
229
Additional configuration options for job vertices including input/output formats and custom parameters.
230
231
```java { .api }
232
/**
233
* Set input format for vertices that read from external sources
234
*/
235
public void setInputSplitSource(InputSplitSource<?> inputSplitSource);
236
237
/**
238
* Get input split source
239
*/
240
public InputSplitSource<?> getInputSplitSource();
241
242
/**
243
* Set configuration for this vertex
244
*/
245
public void setConfiguration(Configuration configuration);
246
247
/**
248
* Get vertex configuration
249
*/
250
public Configuration getConfiguration();
251
252
/**
253
* Set jar files needed by this vertex
254
*/
255
public void addJar(Path jarPath);
256
257
/**
258
* Get jar files
259
*/
260
public List<Path> getJarFiles();
261
262
/**
263
* Set classpath for this vertex
264
*/
265
public void addClasspaths(Collection<URL> classpaths);
266
267
/**
268
* Get classpaths
269
*/
270
public Collection<URL> getUserClasspaths();
271
```
272
273
### JobGraph Utilities
274
275
Utility methods for job graph manipulation and validation.
276
277
```java { .api }
278
/**
279
* Utility class for job graph operations
280
*/
281
public class JobGraphUtils {
282
/** Validate job graph structure and configuration */
283
public static void validateJobGraph(JobGraph jobGraph);
284
285
/** Get topologically sorted vertices */
286
public static List<JobVertex> getVerticesTopologically(JobGraph jobGraph);
287
288
/** Check if job graph contains cycles */
289
public static boolean hasCycles(JobGraph jobGraph);
290
291
/** Calculate total resource requirements */
292
public static ResourceProfile calculateTotalResources(JobGraph jobGraph);
293
}
294
```
295
296
## Types
297
298
```java { .api }
299
// Job graph identifiers
300
public class JobVertexID implements Serializable {
301
public JobVertexID();
302
public JobVertexID(byte[] bytes);
303
public static JobVertexID generate();
304
public byte[] getBytes();
305
}
306
307
public class IntermediateDataSetID implements Serializable {
308
public IntermediateDataSetID();
309
public IntermediateDataSetID(byte[] bytes);
310
public static IntermediateDataSetID generate();
311
}
312
313
// Distribution patterns
314
public enum DistributionPattern {
315
/** Each producing task sends data to all consuming tasks */
316
ALL_TO_ALL,
317
/** Each producing task sends data to exactly one consuming task */
318
POINTWISE
319
}
320
321
// Result partition types
322
public enum ResultPartitionType {
323
/** Pipelined partitions for streaming data exchange */
324
PIPELINED,
325
/** Pipelined partitions with bounded buffer */
326
PIPELINED_BOUNDED,
327
/** Blocking partitions for batch processing */
328
BLOCKING,
329
/** Blocking partitions that can be consumed multiple times */
330
BLOCKING_PERSISTENT
331
}
332
333
// Job types
334
public enum JobType {
335
/** Batch processing job with finite input */
336
BATCH("BATCH"),
337
/** Streaming job with continuous data processing */
338
STREAMING("STREAMING");
339
340
private final String name;
341
342
JobType(String name) {
343
this.name = name;
344
}
345
346
public String getName() {
347
return name;
348
}
349
}
350
351
// Savepoint restore settings
352
public class SavepointRestoreSettings implements Serializable {
353
public static SavepointRestoreSettings none();
354
public static SavepointRestoreSettings forPath(String restorePath);
355
public static SavepointRestoreSettings forPath(String restorePath, boolean allowNonRestoredState);
356
357
public boolean restoreSavepoint();
358
public String getRestorePath();
359
public boolean allowNonRestoredState();
360
}
361
```