or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnectors.mdevent-time-watermarks.mdexecution-jobs.mdfunctions-and-operators.mdindex.mdstate-management.mdtype-system-serialization.mdutilities.md

execution-jobs.mddocs/

0

# Execution and Jobs

1

2

Apache Flink Core provides comprehensive APIs for job execution, runtime contexts, and job lifecycle management. These components enable applications to interact with the Flink runtime, access execution metadata, and manage distributed job execution.

3

4

## Job Execution

5

6

### JobExecutionResult

7

8

Access results and statistics after job completion.

9

10

```java { .api }

11

import org.apache.flink.api.common.JobExecutionResult;

12

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

13

14

public class JobExecutionExample {

15

16

public static void basicJobExecution() throws Exception {

17

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

18

19

// Build your job pipeline

20

env.fromElements(1, 2, 3, 4, 5)

21

.map(x -> x * 2)

22

.print();

23

24

// Execute and get result

25

JobExecutionResult result = env.execute("My Job");

26

27

// Access execution information

28

long netRuntime = result.getNetRuntime();

29

String jobName = result.getJobName();

30

JobID jobId = result.getJobID();

31

32

System.out.println("Job '" + jobName + "' completed in " + netRuntime + "ms");

33

System.out.println("Job ID: " + jobId);

34

35

// Access accumulators (if any were used)

36

Map<String, Object> accumulators = result.getAllAccumulatorResults();

37

for (Map.Entry<String, Object> entry : accumulators.entrySet()) {

38

System.out.println("Accumulator " + entry.getKey() + ": " + entry.getValue());

39

}

40

}

41

42

public static void detachedJobExecution() throws Exception {

43

Configuration config = new Configuration();

44

config.setBoolean(DeploymentOptions.ATTACHED, false); // Detached mode

45

46

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(config);

47

48

// Build pipeline

49

env.fromElements("hello", "world", "flink")

50

.map(String::toUpperCase)

51

.print();

52

53

// Execute in detached mode

54

JobExecutionResult result = env.execute("Detached Job");

55

56

// In detached mode, result is available immediately but job runs asynchronously

57

if (result instanceof DetachedJobExecutionResult) {

58

System.out.println("Job submitted in detached mode");

59

System.out.println("Job ID: " + result.getJobID());

60

}

61

}

62

}

63

```

64

65

### JobClient Interface

66

67

Interact with running jobs programmatically.

68

69

```java { .api }

70

import org.apache.flink.core.execution.JobClient;

71

import org.apache.flink.api.common.JobStatus;

72

73

public class JobClientExample {

74

75

public static void jobClientUsage() throws Exception {

76

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

77

78

// Build pipeline

79

env.fromElements(1, 2, 3, 4, 5)

80

.map(x -> {

81

// Simulate long-running computation

82

Thread.sleep(1000);

83

return x * 2;

84

})

85

.print();

86

87

// Execute and get JobClient

88

JobClient jobClient = env.executeAsync("Long Running Job");

89

90

// Monitor job status

91

CompletableFuture<JobStatus> statusFuture = jobClient.getJobStatus();

92

JobStatus status = statusFuture.get();

93

System.out.println("Initial job status: " + status);

94

95

// Get job execution result asynchronously

96

CompletableFuture<JobExecutionResult> resultFuture = jobClient.getJobExecutionResult();

97

98

// Cancel job if needed

99

if (shouldCancelJob()) {

100

CompletableFuture<Void> cancelFuture = jobClient.cancel();

101

cancelFuture.get(); // Wait for cancellation

102

System.out.println("Job cancelled");

103

} else {

104

// Wait for job completion

105

JobExecutionResult result = resultFuture.get();

106

System.out.println("Job completed: " + result.getJobName());

107

}

108

}

109

110

public static void jobClientWithTimeout() throws Exception {

111

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

112

113

// Configure job

114

env.fromCollection(generateLargeDataset())

115

.keyBy(Record::getKey)

116

.sum("value")

117

.print();

118

119

JobClient jobClient = env.executeAsync("Batch Processing Job");

120

121

try {

122

// Wait for completion with timeout

123

JobExecutionResult result = jobClient.getJobExecutionResult()

124

.get(10, TimeUnit.MINUTES);

125

126

System.out.println("Job completed successfully");

127

128

} catch (TimeoutException e) {

129

System.out.println("Job is taking too long, cancelling...");

130

jobClient.cancel();

131

132

} catch (ExecutionException e) {

133

System.out.println("Job failed: " + e.getCause().getMessage());

134

}

135

}

136

137

private static boolean shouldCancelJob() {

138

// Implementation-specific logic to determine if job should be cancelled

139

return false;

140

}

141

142

private static List<Record> generateLargeDataset() {

143

// Generate test data

144

return IntStream.range(0, 10000)

145

.mapToObj(i -> new Record("key" + (i % 100), i))

146

.collect(Collectors.toList());

147

}

148

}

149

```

150

151

## Runtime Context

152

153

### Accessing Runtime Information

154

155

```java { .api }

156

import org.apache.flink.api.common.functions.RuntimeContext;

157

import org.apache.flink.api.common.functions.RichMapFunction;

158

import org.apache.flink.api.common.functions.OpenContext;

159

160

public class RuntimeContextExample extends RichMapFunction<String, String> {

161

162

@Override

163

public void open(OpenContext openContext) throws Exception {

164

RuntimeContext ctx = getRuntimeContext();

165

166

// Basic runtime information

167

String taskName = ctx.getTaskName();

168

int subtaskIndex = ctx.getIndexOfThisSubtask();

169

int numberOfParallelSubtasks = ctx.getNumberOfParallelSubtasks();

170

int attemptNumber = ctx.getAttemptNumber();

171

172

System.out.println("Task: " + taskName);

173

System.out.println("Subtask: " + subtaskIndex + "/" + numberOfParallelSubtasks);

174

System.out.println("Attempt: " + attemptNumber);

175

176

// Execution configuration

177

ExecutionConfig execConfig = ctx.getExecutionConfig();

178

int parallelism = execConfig.getParallelism();

179

boolean closureCleanerEnabled = execConfig.isClosureCleanerEnabled();

180

181

// Distributed cache

182

File cachedFile = ctx.getDistributedCache().getFile("my-config-file");

183

if (cachedFile != null && cachedFile.exists()) {

184

// Use cached file

185

loadConfigurationFromFile(cachedFile);

186

}

187

188

// Job information

189

JobInfo jobInfo = ctx.getJobInfo();

190

String jobName = jobInfo.getJobName();

191

JobID jobId = jobInfo.getJobId();

192

193

System.out.println("Job: " + jobName + " (" + jobId + ")");

194

}

195

196

@Override

197

public String map(String value) throws Exception {

198

RuntimeContext ctx = getRuntimeContext();

199

200

// Access runtime context during processing

201

int subtaskIndex = ctx.getIndexOfThisSubtask();

202

203

return "[Subtask-" + subtaskIndex + "] " + value;

204

}

205

206

private void loadConfigurationFromFile(File configFile) {

207

// Load configuration from cached file

208

}

209

}

210

```

211

212

### Metrics and Accumulators

213

214

```java { .api }

215

import org.apache.flink.api.common.accumulators.IntCounter;

216

import org.apache.flink.api.common.accumulators.LongCounter;

217

import org.apache.flink.api.common.functions.OpenContext;

218

import org.apache.flink.metrics.Counter;

219

import org.apache.flink.metrics.Histogram;

220

import org.apache.flink.metrics.Meter;

221

222

public class MetricsAndAccumulatorsFunction extends RichMapFunction<String, String> {

223

224

// Accumulators for job-level statistics

225

private IntCounter processedRecords;

226

private LongCounter totalBytes;

227

228

// Metrics for runtime monitoring

229

private Counter metricsCounter;

230

private Histogram processingLatency;

231

private Meter throughput;

232

233

@Override

234

public void open(OpenContext openContext) throws Exception {

235

RuntimeContext ctx = getRuntimeContext();

236

237

// Initialize accumulators

238

processedRecords = new IntCounter();

239

totalBytes = new LongCounter();

240

241

ctx.addAccumulator("processed-records", processedRecords);

242

ctx.addAccumulator("total-bytes", totalBytes);

243

244

// Initialize metrics

245

MetricGroup metricGroup = ctx.getMetricGroup()

246

.addGroup("my-operator")

247

.addGroup("subtask", String.valueOf(ctx.getIndexOfThisSubtask()));

248

249

metricsCounter = metricGroup.counter("records-processed");

250

processingLatency = metricGroup.histogram("processing-latency");

251

throughput = metricGroup.meter("throughput");

252

253

// Custom gauge

254

metricGroup.gauge("queue-size", () -> getCurrentQueueSize());

255

}

256

257

@Override

258

public String map(String value) throws Exception {

259

long startTime = System.nanoTime();

260

261

try {

262

// Process the value

263

String result = processValue(value);

264

265

// Update accumulators

266

processedRecords.add(1);

267

totalBytes.add(value.getBytes().length);

268

269

// Update metrics

270

metricsCounter.inc();

271

throughput.markEvent();

272

273

return result;

274

275

} finally {

276

// Record processing latency

277

long latency = System.nanoTime() - startTime;

278

processingLatency.update(latency / 1_000_000); // Convert to milliseconds

279

}

280

}

281

282

private String processValue(String value) {

283

// Simulate processing

284

return value.toUpperCase();

285

}

286

287

private int getCurrentQueueSize() {

288

// Return current queue size for gauge metric

289

return 0; // Implementation specific

290

}

291

}

292

```

293

294

## Job Lifecycle Management

295

296

### Job Listeners and Hooks

297

298

```java { .api }

299

import org.apache.flink.core.execution.JobListener;

300

import org.apache.flink.core.execution.JobStatusChangedListener;

301

302

// Job listener for execution events

303

public class CustomJobListener implements JobListener {

304

305

@Override

306

public void onJobSubmitted(JobClient jobClient, Throwable throwable) {

307

if (throwable == null) {

308

System.out.println("Job submitted successfully: " + jobClient.getJobID());

309

310

// Start monitoring job

311

startJobMonitoring(jobClient);

312

313

} else {

314

System.err.println("Job submission failed: " + throwable.getMessage());

315

handleJobSubmissionFailure(throwable);

316

}

317

}

318

319

@Override

320

public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) {

321

if (throwable == null) {

322

System.out.println("Job executed successfully: " + jobExecutionResult.getJobName());

323

System.out.println("Runtime: " + jobExecutionResult.getNetRuntime() + "ms");

324

325

// Process job results

326

processJobResults(jobExecutionResult);

327

328

} else {

329

System.err.println("Job execution failed: " + throwable.getMessage());

330

handleJobExecutionFailure(throwable);

331

}

332

}

333

334

private void startJobMonitoring(JobClient jobClient) {

335

// Start background monitoring

336

CompletableFuture.runAsync(() -> {

337

try {

338

while (true) {

339

JobStatus status = jobClient.getJobStatus().get();

340

System.out.println("Job status: " + status);

341

342

if (status.isTerminalState()) {

343

break;

344

}

345

346

Thread.sleep(5000); // Check every 5 seconds

347

}

348

} catch (Exception e) {

349

System.err.println("Job monitoring failed: " + e.getMessage());

350

}

351

});

352

}

353

354

private void handleJobSubmissionFailure(Throwable throwable) {

355

// Handle submission failure (e.g., notify admin, retry, etc.)

356

}

357

358

private void processJobResults(JobExecutionResult result) {

359

// Process successful job results

360

Map<String, Object> accumulators = result.getAllAccumulatorResults();

361

for (Map.Entry<String, Object> entry : accumulators.entrySet()) {

362

System.out.println("Final " + entry.getKey() + ": " + entry.getValue());

363

}

364

}

365

366

private void handleJobExecutionFailure(Throwable throwable) {

367

// Handle execution failure

368

}

369

}

370

371

// Job status change listener

372

public class JobStatusChangeListener implements JobStatusChangedListener {

373

374

@Override

375

public void onEvent(JobStatusChangedEvent event) {

376

JobID jobId = event.getJobId();

377

JobStatus oldStatus = event.getOldJobStatus();

378

JobStatus newStatus = event.getNewJobStatus();

379

380

System.out.println("Job " + jobId + " status changed: " + oldStatus + " -> " + newStatus);

381

382

switch (newStatus) {

383

case RUNNING:

384

handleJobStarted(jobId);

385

break;

386

case FINISHED:

387

handleJobCompleted(jobId);

388

break;

389

case FAILED:

390

handleJobFailed(jobId, event.getThrowable());

391

break;

392

case CANCELED:

393

handleJobCancelled(jobId);

394

break;

395

}

396

}

397

398

private void handleJobStarted(JobID jobId) {

399

System.out.println("Job " + jobId + " has started execution");

400

// Start monitoring, notifications, etc.

401

}

402

403

private void handleJobCompleted(JobID jobId) {

404

System.out.println("Job " + jobId + " completed successfully");

405

// Clean up resources, send notifications, etc.

406

}

407

408

private void handleJobFailed(JobID jobId, Throwable throwable) {

409

System.err.println("Job " + jobId + " failed: " +

410

(throwable != null ? throwable.getMessage() : "Unknown error"));

411

// Send alerts, trigger retries, etc.

412

}

413

414

private void handleJobCancelled(JobID jobId) {

415

System.out.println("Job " + jobId + " was cancelled");

416

// Clean up resources, update status, etc.

417

}

418

}

419

```

420

421

### Using Job Listeners

422

423

```java { .api }

424

public class JobWithListeners {

425

426

public static void executeWithListeners() throws Exception {

427

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

428

429

// Register job listener

430

env.registerJobListener(new CustomJobListener());

431

432

// Build pipeline

433

env.fromElements("hello", "world", "flink")

434

.map(new MetricsAndAccumulatorsFunction())

435

.print();

436

437

// Execute job (listener will be notified)

438

env.execute("Job with Listeners");

439

}

440

441

public static void manualJobLifecycleManagement() throws Exception {

442

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

443

444

// Build pipeline

445

DataStream<String> stream = env.fromElements("data1", "data2", "data3")

446

.map(new ProcessingFunction());

447

448

stream.print();

449

450

// Execute asynchronously

451

JobClient jobClient = env.executeAsync("Manual Lifecycle Job");

452

453

// Manual lifecycle management

454

try {

455

// Monitor job progress

456

CompletableFuture<JobStatus> statusFuture = jobClient.getJobStatus();

457

JobStatus initialStatus = statusFuture.get();

458

459

if (initialStatus == JobStatus.RUNNING) {

460

System.out.println("Job is running, monitoring progress...");

461

462

// Set up periodic status checks

463

ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

464

ScheduledFuture<?> monitoring = scheduler.scheduleWithFixedDelay(() -> {

465

try {

466

JobStatus currentStatus = jobClient.getJobStatus().get();

467

System.out.println("Current status: " + currentStatus);

468

} catch (Exception e) {

469

System.err.println("Failed to get job status: " + e.getMessage());

470

}

471

}, 0, 5, TimeUnit.SECONDS);

472

473

// Wait for completion

474

JobExecutionResult result = jobClient.getJobExecutionResult().get();

475

monitoring.cancel(true);

476

scheduler.shutdown();

477

478

System.out.println("Job completed: " + result.getJobName());

479

480

} else {

481

System.err.println("Job failed to start, status: " + initialStatus);

482

}

483

484

} catch (Exception e) {

485

System.err.println("Job management failed: " + e.getMessage());

486

487

// Attempt to cancel job on error

488

try {

489

jobClient.cancel().get(30, TimeUnit.SECONDS);

490

System.out.println("Job cancelled due to management failure");

491

} catch (Exception cancelException) {

492

System.err.println("Failed to cancel job: " + cancelException.getMessage());

493

}

494

}

495

}

496

497

private static class ProcessingFunction extends RichMapFunction<String, String> {

498

499

@Override

500

public String map(String value) throws Exception {

501

// Simulate processing time

502

Thread.sleep(1000);

503

return "Processed: " + value;

504

}

505

}

506

}

507

```

508

509

## Pipeline Execution

510

511

### Custom Pipeline Executors

512

513

```java { .api }

514

import org.apache.flink.core.execution.PipelineExecutor;

515

import org.apache.flink.core.execution.PipelineExecutorFactory;

516

517

// Custom pipeline executor factory

518

public class CustomPipelineExecutorFactory implements PipelineExecutorFactory {

519

520

public static final String CUSTOM_EXECUTOR_NAME = "custom";

521

522

@Override

523

public String getName() {

524

return CUSTOM_EXECUTOR_NAME;

525

}

526

527

@Override

528

public boolean isCompatibleWith(Configuration configuration) {

529

// Check if this executor is compatible with the configuration

530

String executorName = configuration.getString(DeploymentOptions.TARGET);

531

return CUSTOM_EXECUTOR_NAME.equals(executorName);

532

}

533

534

@Override

535

public PipelineExecutor getExecutor(Configuration configuration) {

536

return new CustomPipelineExecutor(configuration);

537

}

538

}

539

540

// Custom pipeline executor implementation

541

public class CustomPipelineExecutor implements PipelineExecutor {

542

543

private final Configuration configuration;

544

545

public CustomPipelineExecutor(Configuration configuration) {

546

this.configuration = configuration;

547

}

548

549

@Override

550

public CompletableFuture<JobClient> execute(Pipeline pipeline,

551

Configuration configuration,

552

ClassLoader userClassloader) throws Exception {

553

554

System.out.println("Executing pipeline with custom executor");

555

556

// Custom execution logic

557

JobGraph jobGraph = buildJobGraph(pipeline, configuration);

558

559

// Submit job to custom runtime

560

return submitJob(jobGraph);

561

}

562

563

private JobGraph buildJobGraph(Pipeline pipeline, Configuration config) {

564

// Convert pipeline to JobGraph

565

// This would typically involve more complex logic

566

return new JobGraph("Custom Job");

567

}

568

569

private CompletableFuture<JobClient> submitJob(JobGraph jobGraph) {

570

// Submit job to execution runtime

571

return CompletableFuture.supplyAsync(() -> {

572

// Simulate job submission

573

JobID jobId = JobID.generate();

574

return new CustomJobClient(jobId);

575

});

576

}

577

578

private static class CustomJobClient implements JobClient {

579

private final JobID jobId;

580

581

public CustomJobClient(JobID jobId) {

582

this.jobId = jobId;

583

}

584

585

@Override

586

public JobID getJobID() {

587

return jobId;

588

}

589

590

@Override

591

public CompletableFuture<JobStatus> getJobStatus() {

592

return CompletableFuture.completedFuture(JobStatus.RUNNING);

593

}

594

595

@Override

596

public CompletableFuture<Void> cancel() {

597

return CompletableFuture.completedFuture(null);

598

}

599

600

@Override

601

public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime,

602

String savepointDirectory,

603

SavepointFormatType formatType) {

604

return CompletableFuture.completedFuture("savepoint-path");

605

}

606

607

@Override

608

public CompletableFuture<String> triggerSavepoint(String savepointDirectory,

609

SavepointFormatType formatType) {

610

return CompletableFuture.completedFuture("savepoint-path");

611

}

612

613

@Override

614

public CompletableFuture<Map<String, Object>> getAccumulators() {

615

return CompletableFuture.completedFuture(Collections.emptyMap());

616

}

617

618

@Override

619

public CompletableFuture<JobExecutionResult> getJobExecutionResult() {

620

// Simulate job completion

621

return CompletableFuture.supplyAsync(() -> {

622

try {

623

Thread.sleep(5000); // Simulate execution time

624

return new JobExecutionResult(jobId, 5000, Collections.emptyMap());

625

} catch (InterruptedException e) {

626

throw new RuntimeException(e);

627

}

628

});

629

}

630

}

631

}

632

```

633

634

## Distributed Cache

635

636

### Using Distributed Cache

637

638

```java { .api }

639

import org.apache.flink.api.common.cache.DistributedCache;

640

import org.apache.flink.api.common.functions.OpenContext;

641

642

public class DistributedCacheExample extends RichMapFunction<String, String> {

643

644

private Properties configProperties;

645

private List<String> referenceData;

646

647

@Override

648

public void open(OpenContext openContext) throws Exception {

649

RuntimeContext ctx = getRuntimeContext();

650

DistributedCache cache = ctx.getDistributedCache();

651

652

// Access cached files

653

File configFile = cache.getFile("config.properties");

654

if (configFile != null && configFile.exists()) {

655

configProperties = new Properties();

656

try (FileInputStream fis = new FileInputStream(configFile)) {

657

configProperties.load(fis);

658

}

659

}

660

661

// Access cached directory

662

File dataDir = cache.getFile("reference-data");

663

if (dataDir != null && dataDir.isDirectory()) {

664

referenceData = loadReferenceData(dataDir);

665

}

666

}

667

668

@Override

669

public String map(String value) throws Exception {

670

// Use cached configuration and data

671

String prefix = configProperties.getProperty("output.prefix", "");

672

673

// Lookup reference data

674

boolean isValid = referenceData.contains(value);

675

676

return prefix + value + (isValid ? " [VALID]" : " [INVALID]");

677

}

678

679

private List<String> loadReferenceData(File dataDir) {

680

List<String> data = new ArrayList<>();

681

682

File[] files = dataDir.listFiles((dir, name) -> name.endsWith(".txt"));

683

if (files != null) {

684

for (File file : files) {

685

try (BufferedReader reader = Files.newBufferedReader(file.toPath())) {

686

String line;

687

while ((line = reader.readLine()) != null) {

688

data.add(line.trim());

689

}

690

} catch (IOException e) {

691

System.err.println("Error reading reference data file: " + e.getMessage());

692

}

693

}

694

}

695

696

return data;

697

}

698

699

public static void registerCachedFiles() throws Exception {

700

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

701

702

// Register files with distributed cache

703

env.registerCachedFile("/path/to/config.properties", "config.properties");

704

env.registerCachedFile("/path/to/reference-data/", "reference-data", true); // true for directory

705

706

// Use the function that accesses cached files

707

env.fromElements("item1", "item2", "item3")

708

.map(new DistributedCacheExample())

709

.print();

710

711

env.execute("Distributed Cache Example");

712

}

713

}

714

```

715

716

Apache Flink's execution and job management APIs provide comprehensive control over job lifecycle, runtime information access, and execution monitoring. By leveraging these capabilities, you can build robust applications with proper monitoring, error handling, and resource management.