or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-exchange.mdexecution-graph.mdhigh-availability.mdindex.mdjob-management.mdmessage-passing.mdmetrics.mdmini-cluster.mdrpc-framework.mdstate-management.mdtask-execution.md

job-management.mddocs/

0

# Job Execution and Management

1

2

The job execution and management capabilities provide the core APIs for creating, submitting, monitoring, and controlling Flink dataflow programs. These APIs are essential for any application that needs to execute distributed stream processing or batch jobs on a Flink cluster.

3

4

## Core Job Components

5

6

### JobGraph

7

8

The JobGraph is the central data structure representing a Flink dataflow program at the JobManager level. It defines the topology of operations and data flow between them.

9

10

```java { .api }

11

public class JobGraph implements Serializable {

12

public JobGraph(String jobName);

13

public JobGraph(JobID jobId, String jobName);

14

public JobGraph(JobVertex... vertices);

15

16

public void addVertex(JobVertex vertex);

17

public void addJar(Path jar);

18

public void addBlob(BlobKey key);

19

20

public JobVertex[] getVerticesAsArray();

21

public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException;

22

public Iterable<JobVertex> getVertices();

23

public int getNumberOfVertices();

24

25

public String getName();

26

public JobID getJobID();

27

28

public Configuration getJobConfiguration();

29

30

public void setScheduleMode(ScheduleMode scheduleMode);

31

public ScheduleMode getScheduleMode();

32

33

public void setSessionTimeout(long sessionTimeout);

34

public long getSessionTimeout();

35

36

public void setAllowQueuedScheduling(boolean allowQueuedScheduling);

37

public boolean getAllowQueuedScheduling();

38

39

public SavepointRestoreSettings getSavepointRestoreSettings();

40

public void setSavepointRestoreSettings(SavepointRestoreSettings settings);

41

42

public void setExecutionConfig(ExecutionConfig executionConfig) throws IOException;

43

public SerializedValue<ExecutionConfig> getSerializedExecutionConfig();

44

45

public JobCheckpointingSettings getCheckpointingSettings();

46

public void setSnapshotSettings(JobCheckpointingSettings settings);

47

48

public List<URL> getClasspaths();

49

public void setClasspaths(List<URL> paths);

50

}

51

```

52

53

### JobVertex

54

55

Represents individual operations or vertices in the job graph, each corresponding to a task in the execution.

56

57

```java { .api }

58

public class JobVertex implements Serializable {

59

public JobVertex(String name);

60

public JobVertex(String name, JobVertexID id);

61

62

public void setInvokableClass(Class<? extends AbstractInvokable> invokable);

63

public Class<? extends AbstractInvokable> getInvokableClass();

64

65

public void setParallelism(int parallelism);

66

public int getParallelism();

67

68

public void setMaxParallelism(int maxParallelism);

69

public int getMaxParallelism();

70

71

public JobVertexID getID();

72

public String getName();

73

74

public Configuration getConfiguration();

75

public void setConfiguration(Configuration configuration);

76

77

public List<JobEdge> getInputs();

78

public List<IntermediateDataSet> getProducedDataSets();

79

}

80

```

81

82

### JobEdge

83

84

Defines connections between job vertices, specifying how data flows between operations.

85

86

```java { .api }

87

public class JobEdge implements Serializable {

88

public JobEdge(IntermediateDataSet source, JobVertex target, DistributionPattern distributionPattern);

89

90

public IntermediateDataSet getSource();

91

public JobVertex getTarget();

92

public DistributionPattern getDistributionPattern();

93

94

public void setShipStrategy(ShipStrategyType shipStrategy);

95

public ShipStrategyType getShipStrategy();

96

}

97

```

98

99

## Job Client and Execution

100

101

### JobClient

102

103

The main client class providing static methods for job submission, monitoring, and management. JobClient bridges between JobManager's asynchronous actor messages and synchronous method calls.

104

105

```java { .api }

106

public class JobClient {

107

public static ActorSystem startJobClientActorSystem(Configuration config) throws IOException;

108

109

public static JobListeningContext submitJob(

110

ActorSystem actorSystem,

111

Configuration config,

112

HighAvailabilityServices highAvailabilityServices,

113

ActorGateway jobManagerGateway,

114

JobGraph jobGraph,

115

Time timeout,

116

boolean sysoutLogUpdates,

117

ClassLoader userCodeClassLoader

118

) throws JobExecutionException;

119

120

public static JobListeningContext attachToRunningJob(

121

JobID jobID,

122

ActorGateway jobManagerGateway,

123

Configuration configuration,

124

ActorSystem actorSystem,

125

HighAvailabilityServices highAvailabilityServices,

126

Time timeout,

127

boolean sysoutLogUpdates

128

);

129

130

public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext)

131

throws JobExecutionException;

132

133

public static JobExecutionResult submitJobAndWait(

134

ActorSystem actorSystem,

135

Configuration config,

136

HighAvailabilityServices highAvailabilityServices,

137

ActorGateway jobManagerGateway,

138

JobGraph jobGraph,

139

Time timeout,

140

boolean sysoutLogUpdates,

141

ClassLoader userCodeClassLoader

142

) throws JobExecutionException;

143

144

public static void submitJobDetached(

145

ActorGateway jobManagerGateway,

146

Configuration config,

147

JobGraph jobGraph,

148

Time timeout,

149

ClassLoader userCodeClassLoader

150

) throws JobExecutionException;

151

}

152

```

153

154

### JobExecutionResult

155

156

Contains the results and metadata from a completed job execution.

157

158

```java { .api }

159

public class JobExecutionResult implements Serializable {

160

public JobExecutionResult(JobID jobID, long netRuntime, Map<String, Object> accumulatorResults);

161

162

public JobID getJobID();

163

public long getNetRuntime();

164

public Map<String, Object> getAllAccumulatorResults();

165

public <T> T getAccumulatorResult(String accumulatorName);

166

}

167

```

168

169

## Job Status and Lifecycle

170

171

### JobStatus

172

173

Enumeration of all possible job execution states.

174

175

```java { .api }

176

public enum JobStatus {

177

CREATED, // Job has been created but not yet submitted

178

RUNNING, // Job is currently executing

179

FAILING, // Job is in the process of failing

180

FAILED, // Job has failed and is no longer executing

181

CANCELLING, // Job is being cancelled

182

CANCELED, // Job has been cancelled

183

FINISHED, // Job completed successfully

184

RESTARTING, // Job is restarting from a failure

185

SUSPENDED; // Job has been suspended (for savepoints)

186

187

public boolean isGloballyTerminalState();

188

public boolean isTerminalState();

189

}

190

```

191

192

### ScheduleMode

193

194

Defines how job vertices are scheduled for execution.

195

196

```java { .api }

197

public enum ScheduleMode {

198

LAZY_FROM_SOURCES, // Schedule vertices lazily when input data is available

199

EAGER; // Schedule all vertices immediately upon job start

200

}

201

```

202

203

## Exception Handling

204

205

### JobExecutionException

206

207

Base exception for job execution failures.

208

209

```java { .api }

210

public class JobExecutionException extends FlinkException {

211

public JobExecutionException(JobID jobId, String msg);

212

public JobExecutionException(JobID jobId, String msg, Throwable cause);

213

214

public JobID getJobID();

215

}

216

```

217

218

### JobSubmissionException

219

220

Thrown when job submission fails.

221

222

```java { .api }

223

public class JobSubmissionException extends FlinkException {

224

public JobSubmissionException(String message);

225

public JobSubmissionException(String message, Throwable cause);

226

}

227

```

228

229

### JobException

230

231

General job-related exceptions.

232

233

```java { .api }

234

public class JobException extends FlinkException {

235

public JobException(String message);

236

public JobException(String message, Throwable cause);

237

}

238

```

239

240

## Usage Examples

241

242

### Creating and Configuring a JobGraph

243

244

```java

245

import org.apache.flink.runtime.jobgraph.*;

246

import org.apache.flink.configuration.Configuration;

247

248

// Create a new job

249

JobGraph jobGraph = new JobGraph("Data Processing Job");

250

251

// Configure job-level settings

252

Configuration jobConfig = new Configuration();

253

jobConfig.setString("job.checkpoint.dir", "file:///checkpoints");

254

jobGraph.setJobConfiguration(jobConfig);

255

256

// Enable checkpointing

257

jobGraph.setCheckpointingEnabled(true);

258

jobGraph.setCheckpointingInterval(30000); // 30 seconds

259

260

// Set scheduling mode

261

jobGraph.setScheduleMode(ScheduleMode.EAGER);

262

263

// Create vertices

264

JobVertex sourceVertex = new JobVertex("Data Source");

265

sourceVertex.setInvokableClass(MySourceTask.class);

266

sourceVertex.setParallelism(4);

267

268

JobVertex transformVertex = new JobVertex("Data Transform");

269

transformVertex.setInvokableClass(MyTransformTask.class);

270

transformVertex.setParallelism(8);

271

272

JobVertex sinkVertex = new JobVertex("Data Sink");

273

sinkVertex.setInvokableClass(MySinkTask.class);

274

sinkVertex.setParallelism(2);

275

276

// Add vertices to job

277

jobGraph.addVertex(sourceVertex);

278

jobGraph.addVertex(transformVertex);

279

jobGraph.addVertex(sinkVertex);

280

281

// Connect vertices with edges

282

IntermediateDataSet sourceOutput = sourceVertex.createAndAddResultDataSet(ResultPartitionType.PIPELINED);

283

JobEdge sourceToTransform = new JobEdge(sourceOutput, transformVertex, DistributionPattern.ALL_TO_ALL);

284

jobGraph.addEdge(sourceToTransform);

285

286

IntermediateDataSet transformOutput = transformVertex.createAndAddResultDataSet(ResultPartitionType.PIPELINED);

287

JobEdge transformToSink = new JobEdge(transformOutput, sinkVertex, DistributionPattern.FORWARD);

288

jobGraph.addEdge(transformToSink);

289

```

290

291

### Job Submission and Monitoring

292

293

```java

294

import org.apache.flink.runtime.client.JobClient;

295

import org.apache.flink.runtime.jobgraph.JobStatus;

296

297

// Submit job (using a cluster client or mini cluster)

298

JobClient jobClient = clusterClient.submitJob(jobGraph);

299

300

// Monitor job status

301

JobStatus status = jobClient.getJobStatus();

302

System.out.println("Job status: " + status);

303

304

// Wait for completion and get results

305

try {

306

JobExecutionResult result = jobClient.getJobExecutionResult();

307

System.out.println("Job completed in " + result.getNetRuntime() + " ms");

308

309

// Access accumulator results

310

Map<String, Object> accumulators = result.getAllAccumulatorResults();

311

Long recordCount = (Long) accumulators.get("records-processed");

312

313

} catch (JobExecutionException e) {

314

System.err.println("Job failed: " + e.getMessage());

315

// Handle job failure

316

}

317

```

318

319

### Handling Job Cancellation

320

321

```java

322

// Cancel a running job

323

try {

324

jobClient.cancel();

325

System.out.println("Job cancellation initiated");

326

327

// Wait for cancellation to complete

328

while (true) {

329

JobStatus status = jobClient.getJobStatus();

330

if (status == JobStatus.CANCELED || status == JobStatus.FAILED) {

331

System.out.println("Job terminated with status: " + status);

332

break;

333

}

334

Thread.sleep(1000);

335

}

336

337

} catch (JobExecutionException e) {

338

System.err.println("Failed to cancel job: " + e.getMessage());

339

}

340

```

341

342

## Common Patterns

343

344

### Configuring Job Parameters

345

346

```java

347

// Set job-level configuration

348

Configuration jobConfig = new Configuration();

349

jobConfig.setString("state.backend", "filesystem");

350

jobConfig.setString("state.checkpoints.dir", "file:///checkpoints");

351

jobConfig.setLong("execution.checkpointing.interval", 60000L);

352

jobConfig.setInteger("parallelism.default", 4);

353

354

jobGraph.setJobConfiguration(jobConfig);

355

```

356

357

### Setting Up Fault Tolerance

358

359

```java

360

// Enable checkpointing for fault tolerance

361

jobGraph.setCheckpointingEnabled(true);

362

jobGraph.setCheckpointingInterval(30000); // Checkpoint every 30 seconds

363

364

// Configure checkpoint settings in job configuration

365

Configuration jobConfig = jobGraph.getJobConfiguration();

366

jobConfig.setString("execution.checkpointing.mode", "EXACTLY_ONCE");

367

jobConfig.setLong("execution.checkpointing.timeout", 600000L); // 10 minutes

368

jobConfig.setInteger("execution.checkpointing.max-concurrent-checkpoints", 1);

369

```