or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

execution-scheduling.mdhigh-availability.mdindex.mdjob-graph.mdresource-management.mdstate-checkpointing.mdtask-execution.md

job-graph.mddocs/

0

# Job Graph Management

1

2

Core APIs for defining and configuring Flink jobs as directed acyclic graphs (DAGs) of operations. The JobGraph represents the logical structure of a Flink job, while JobVertex represents individual operations within the graph.

3

4

## Capabilities

5

6

### JobGraph

7

8

The main container representing a complete Flink job with its configuration, vertices, and execution parameters.

9

10

```java { .api }

11

/**

12

* Represents a Flink dataflow program at the low level that the JobManager accepts.

13

* All programs from higher level APIs are transformed into JobGraphs.

14

*/

15

public class JobGraph implements ExecutionPlan {

16

/** Create a new JobGraph with the given name */

17

public JobGraph(String jobName);

18

19

/** Create a new JobGraph with specific ID and name */

20

public JobGraph(JobID jobId, String jobName);

21

22

/** Add a vertex to this job graph */

23

public void addVertex(JobVertex vertex);

24

25

/** Get all vertices in this job graph */

26

public Collection<JobVertex> getVertices();

27

28

/** Get specific vertex by ID */

29

public JobVertex findVertexByID(JobVertexID id);

30

31

/** Get the job ID */

32

public JobID getJobID();

33

34

/** Get the job name */

35

public String getName();

36

37

/** Set the job type (BATCH or STREAMING) */

38

public void setJobType(JobType jobType);

39

40

/** Get the job type */

41

public JobType getJobType();

42

43

/** Set whether this job uses dynamic graph changes */

44

public void setDynamic(boolean dynamic);

45

46

/** Check if this job uses dynamic graph changes */

47

public boolean isDynamic();

48

49

/** Get the job configuration */

50

public Configuration getJobConfiguration();

51

52

/** Set the job configuration */

53

public void setJobConfiguration(Configuration jobConfiguration);

54

55

/** Enable checkpointing for this job */

56

public void setSnapshotSettings(JobCheckpointingSettings settings);

57

58

/** Get checkpointing settings */

59

public SerializedValue<JobCheckpointingSettings> getCheckpointingSettings();

60

61

/** Set savepoint restore settings */

62

public void setSavepointRestoreSettings(SavepointRestoreSettings settings);

63

64

/** Get savepoint restore settings */

65

public SavepointRestoreSettings getSavepointRestoreSettings();

66

}

67

```

68

69

**Usage Examples:**

70

71

```java

72

// Create a basic job graph

73

JobGraph jobGraph = new JobGraph("Data Processing Pipeline");

74

jobGraph.setJobType(JobType.STREAMING);

75

76

// Configure job-level settings

77

Configuration jobConfig = new Configuration();

78

jobConfig.setString(ConfigConstants.FLINK_TM_JVM_PARAMS, "-Xmx1024m");

79

jobGraph.setJobConfiguration(jobConfig);

80

81

// Enable checkpointing

82

JobCheckpointingSettings checkpointSettings = new JobCheckpointingSettings(

83

vertexIdsToTrigger,

84

vertexIdsToWaitFor,

85

vertexIdsToCommitTo,

86

new CheckpointCoordinatorConfiguration.Builder()

87

.setCheckpointInterval(5000L)

88

.setCheckpointTimeout(60000L)

89

.setMaxConcurrentCheckpoints(1)

90

.build(),

91

null

92

);

93

jobGraph.setSnapshotSettings(checkpointSettings);

94

```

95

96

### JobVertex

97

98

Represents a single operation (vertex) in the job graph, such as a source, transformation, or sink.

99

100

```java { .api }

101

/**

102

* The base class for job vertices representing operations in the job graph.

103

*/

104

public class JobVertex implements Serializable {

105

/** Create a job vertex with default name */

106

public JobVertex(String name);

107

108

/** Create a job vertex with specific ID and name */

109

public JobVertex(String name, JobVertexID id);

110

111

/** Get the vertex ID */

112

public JobVertexID getId();

113

114

/** Get the vertex name */

115

public String getName();

116

117

/** Set the name of this vertex */

118

public void setName(String name);

119

120

/** Set the parallelism for this vertex */

121

public void setParallelism(int parallelism);

122

123

/** Get the parallelism of this vertex */

124

public int getParallelism();

125

126

/** Set the maximum parallelism for this vertex */

127

public void setMaxParallelism(int maxParallelism);

128

129

/** Get the maximum parallelism of this vertex */

130

public int getMaxParallelism();

131

132

/** Set the invokable class that implements the vertex logic */

133

public void setInvokableClass(Class<? extends TaskInvokable> invokable);

134

135

/** Get the invokable class */

136

public Class<? extends TaskInvokable> getInvokableClass();

137

138

/** Set minimum resource requirements */

139

public void setResources(ResourceSpec minResources, ResourceSpec preferredResources);

140

141

/** Get minimum resource requirements */

142

public ResourceSpec getMinResources();

143

144

/** Get preferred resource requirements */

145

public ResourceSpec getPreferredResources();

146

147

/** Add a new data set as input from another vertex */

148

public void connectNewDataSetAsInput(

149

JobVertex input,

150

DistributionPattern distPattern,

151

ResultPartitionType partitionType

152

);

153

154

/** Connect to an existing intermediate data set */

155

public void connectDataSetAsInput(

156

IntermediateDataSet dataSet,

157

DistributionPattern distPattern

158

);

159

160

/** Get all input edges */

161

public List<JobEdge> getInputs();

162

163

/** Get all produced data sets */

164

public List<IntermediateDataSet> getProducedDataSets();

165

166

/** Set slot sharing group for co-location */

167

public void setSlotSharingGroup(SlotSharingGroup slotSharingGroup);

168

169

/** Get slot sharing group */

170

public SlotSharingGroup getSlotSharingGroup();

171

172

/** Set co-location group for strict co-location */

173

public void setCoLocationGroup(CoLocationGroup coLocationGroup);

174

175

/** Get co-location group */

176

public CoLocationGroup getCoLocationGroup();

177

178

/** Add operator coordinator */

179

public void addOperatorCoordinator(SerializedValue<OperatorCoordinator.Provider> coordinator);

180

181

/** Get operator coordinators */

182

public List<SerializedValue<OperatorCoordinator.Provider>> getOperatorCoordinators();

183

}

184

```

185

186

**Usage Examples:**

187

188

```java

189

// Create source vertex

190

JobVertex sourceVertex = new JobVertex("Kafka Source");

191

sourceVertex.setParallelism(4);

192

sourceVertex.setInvokableClass(KafkaSourceTask.class);

193

194

// Set resource requirements

195

ResourceSpec minResources = ResourceSpec.newBuilder()

196

.setCpuCores(1.0)

197

.setHeapMemoryInMB(512)

198

.build();

199

ResourceSpec preferredResources = ResourceSpec.newBuilder()

200

.setCpuCores(2.0)

201

.setHeapMemoryInMB(1024)

202

.build();

203

sourceVertex.setResources(minResources, preferredResources);

204

205

// Create transformation vertex

206

JobVertex mapVertex = new JobVertex("Data Transformation");

207

mapVertex.setParallelism(8);

208

mapVertex.setInvokableClass(MapTask.class);

209

210

// Connect vertices

211

mapVertex.connectNewDataSetAsInput(

212

sourceVertex,

213

DistributionPattern.ALL_TO_ALL,

214

ResultPartitionType.PIPELINED

215

);

216

217

// Set up slot sharing for efficient resource usage

218

SlotSharingGroup slotSharingGroup = new SlotSharingGroup();

219

sourceVertex.setSlotSharingGroup(slotSharingGroup);

220

mapVertex.setSlotSharingGroup(slotSharingGroup);

221

222

// Add vertices to job graph

223

jobGraph.addVertex(sourceVertex);

224

jobGraph.addVertex(mapVertex);

225

```

226

227

### JobVertex Configuration

228

229

Additional configuration options for job vertices including input/output formats and custom parameters.

230

231

```java { .api }

232

/**

233

* Set input format for vertices that read from external sources

234

*/

235

public void setInputSplitSource(InputSplitSource<?> inputSplitSource);

236

237

/**

238

* Get input split source

239

*/

240

public InputSplitSource<?> getInputSplitSource();

241

242

/**

243

* Set configuration for this vertex

244

*/

245

public void setConfiguration(Configuration configuration);

246

247

/**

248

* Get vertex configuration

249

*/

250

public Configuration getConfiguration();

251

252

/**

253

* Set jar files needed by this vertex

254

*/

255

public void addJar(Path jarPath);

256

257

/**

258

* Get jar files

259

*/

260

public List<Path> getJarFiles();

261

262

/**

263

* Set classpath for this vertex

264

*/

265

public void addClasspaths(Collection<URL> classpaths);

266

267

/**

268

* Get classpaths

269

*/

270

public Collection<URL> getUserClasspaths();

271

```

272

273

### JobGraph Utilities

274

275

Utility methods for job graph manipulation and validation.

276

277

```java { .api }

278

/**

279

* Utility class for job graph operations

280

*/

281

public class JobGraphUtils {

282

/** Validate job graph structure and configuration */

283

public static void validateJobGraph(JobGraph jobGraph);

284

285

/** Get topologically sorted vertices */

286

public static List<JobVertex> getVerticesTopologically(JobGraph jobGraph);

287

288

/** Check if job graph contains cycles */

289

public static boolean hasCycles(JobGraph jobGraph);

290

291

/** Calculate total resource requirements */

292

public static ResourceProfile calculateTotalResources(JobGraph jobGraph);

293

}

294

```

295

296

## Types

297

298

```java { .api }

299

// Job graph identifiers

300

public class JobVertexID implements Serializable {

301

public JobVertexID();

302

public JobVertexID(byte[] bytes);

303

public static JobVertexID generate();

304

public byte[] getBytes();

305

}

306

307

public class IntermediateDataSetID implements Serializable {

308

public IntermediateDataSetID();

309

public IntermediateDataSetID(byte[] bytes);

310

public static IntermediateDataSetID generate();

311

}

312

313

// Distribution patterns

314

public enum DistributionPattern {

315

/** Each producing task sends data to all consuming tasks */

316

ALL_TO_ALL,

317

/** Each producing task sends data to exactly one consuming task */

318

POINTWISE

319

}

320

321

// Result partition types

322

public enum ResultPartitionType {

323

/** Pipelined partitions for streaming data exchange */

324

PIPELINED,

325

/** Pipelined partitions with bounded buffer */

326

PIPELINED_BOUNDED,

327

/** Blocking partitions for batch processing */

328

BLOCKING,

329

/** Blocking partitions that can be consumed multiple times */

330

BLOCKING_PERSISTENT

331

}

332

333

// Job types

334

public enum JobType {

335

/** Batch processing job with finite input */

336

BATCH("BATCH"),

337

/** Streaming job with continuous data processing */

338

STREAMING("STREAMING");

339

340

private final String name;

341

342

JobType(String name) {

343

this.name = name;

344

}

345

346

public String getName() {

347

return name;

348

}

349

}

350

351

// Savepoint restore settings

352

public class SavepointRestoreSettings implements Serializable {

353

public static SavepointRestoreSettings none();

354

public static SavepointRestoreSettings forPath(String restorePath);

355

public static SavepointRestoreSettings forPath(String restorePath, boolean allowNonRestoredState);

356

357

public boolean restoreSavepoint();

358

public String getRestorePath();

359

public boolean allowNonRestoredState();

360

}

361

```