or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-operations.mdcluster-management.mdindex.mdprogram-execution.mdrest-client-communication.md

program-execution.mddocs/

0

# Program Execution

1

2

The Apache Flink Program Execution module (`org.apache.flink.client.program.*`) provides comprehensive program packaging, execution environments, and job lifecycle management capabilities. This module handles the packaging of user programs, provides cluster client implementations, and manages execution contexts for both batch and streaming applications.

3

4

## Core Program Interfaces

5

6

### ClusterClient { .api }

7

8

Main interface for interacting with Flink clusters, providing job submission and management capabilities.

9

10

```java

11

public interface ClusterClient<T> extends AutoCloseable {

12

// Cluster information

13

T getClusterId();

14

Configuration getFlinkConfiguration();

15

String getWebInterfaceURL();

16

17

// Job listing and status

18

CompletableFuture<Collection<JobStatusMessage>> listJobs();

19

CompletableFuture<JobStatus> getJobStatus(JobID jobId);

20

CompletableFuture<JobResult> requestJobResult(JobID jobId);

21

22

// Job submission and management

23

CompletableFuture<JobID> submitJob(JobGraph jobGraph);

24

CompletableFuture<Acknowledge> cancel(JobID jobId);

25

CompletableFuture<String> cancelWithSavepoint(JobID jobId, String savepointDirectory);

26

CompletableFuture<String> stopWithSavepoint(JobID jobId, boolean advanceToEndOfEventTime, String savepointDirectory);

27

28

// Savepoint operations

29

CompletableFuture<String> triggerSavepoint(JobID jobId, String savepointDirectory);

30

CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath);

31

32

// Accumulators and metrics

33

default CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID) { }

34

CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);

35

36

// Coordination requests

37

CompletableFuture<CoordinationResponse> sendCoordinationRequest(JobID jobId,

38

OperatorID operatorId,

39

CoordinationRequest request);

40

41

// Cluster management

42

void shutDownCluster();

43

void close();

44

}

45

```

46

47

### PackagedProgram { .api }

48

49

Represents a packaged Flink program with all necessary metadata and dependencies.

50

51

```java

52

public class PackagedProgram implements AutoCloseable {

53

// Constants

54

public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";

55

public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";

56

57

// Program information

58

public String getMainClassName() { }

59

public String[] getArguments() { }

60

public ClassLoader getUserCodeClassLoader() { }

61

public Configuration getConfiguration() { }

62

public SavepointRestoreSettings getSavepointSettings() { }

63

64

// Dependencies and resources

65

public List<URL> getJobJarAndDependencies() { }

66

public static List<URL> getJobJarAndDependencies(File jarFile, String entryPointClassName) { }

67

68

// Program execution

69

public void invokeInteractiveModeForExecution() { }

70

public String getDescription() { }

71

public boolean isPython() { }

72

73

// Resource management

74

public void close() { }

75

76

// Builder pattern

77

public static Builder newBuilder() { }

78

79

public static class Builder {

80

public Builder setJarFile(File jarFile) { }

81

public Builder setUserClassPaths(List<URL> userClassPaths) { }

82

public Builder setEntryPointClassName(String entryPointClassName) { }

83

public Builder setConfiguration(Configuration configuration) { }

84

public Builder setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) { }

85

public Builder setArguments(String... arguments) { }

86

public PackagedProgram build() { }

87

}

88

}

89

```

90

91

### ClusterClientProvider { .api }

92

93

Provider interface for obtaining cluster clients.

94

95

```java

96

public interface ClusterClientProvider<T> {

97

ClusterClient<T> getClusterClient();

98

}

99

```

100

101

## Execution Environment Classes

102

103

### ContextEnvironment { .api }

104

105

Execution environment that delegates to a configured pipeline executor.

106

107

```java

108

public class ContextEnvironment extends ExecutionEnvironment {

109

// Static context management

110

public static void setAsContext(PipelineExecutorServiceLoader executorServiceLoader,

111

Configuration configuration,

112

ClassLoader userCodeClassLoader,

113

boolean enforceSingleJobExecution,

114

boolean suppressSysout) { }

115

116

public static void unsetAsContext() { }

117

118

// ExecutionEnvironment implementation with delegation to executor

119

}

120

```

121

122

### StreamContextEnvironment { .api }

123

124

Stream execution environment for context-based execution.

125

126

```java

127

public class StreamContextEnvironment extends StreamExecutionEnvironment {

128

// Static context management

129

public static void setAsContext(PipelineExecutorServiceLoader executorServiceLoader,

130

Configuration configuration,

131

ClassLoader userCodeClassLoader,

132

boolean enforceSingleJobExecution,

133

boolean suppressSysout) { }

134

135

public static void unsetAsContext() { }

136

137

// StreamExecutionEnvironment implementation with delegation

138

}

139

```

140

141

### OptimizerPlanEnvironment { .api }

142

143

Execution environment for creating execution plans without execution.

144

145

```java

146

public class OptimizerPlanEnvironment extends ExecutionEnvironment {

147

// Constructor

148

public OptimizerPlanEnvironment(Optimizer optimizer, int defaultParallelism) { }

149

150

// Plan generation methods

151

public OptimizedPlan getOptimizedPlan(Program program) { }

152

public OptimizedPlan getOptimizedPlan(List<DataSinkNode> sinks) { }

153

public String getExecutionPlan() { }

154

155

// ExecutionEnvironment overrides for optimization

156

public JobExecutionResult execute() { }

157

public JobExecutionResult execute(String jobName) { }

158

159

// Plan access

160

public Plan createProgramPlan() { }

161

public Plan createProgramPlan(String jobName) { }

162

}

163

```

164

165

### StreamPlanEnvironment { .api }

166

167

Stream execution environment for plan generation.

168

169

```java

170

public class StreamPlanEnvironment extends StreamExecutionEnvironment {

171

// Constructor

172

public StreamPlanEnvironment(PipelineExecutorServiceLoader executorServiceLoader,

173

Configuration configuration,

174

ClassLoader userClassLoader) { }

175

176

// Stream plan generation methods

177

public StreamGraph getStreamGraph() { }

178

public StreamGraph getStreamGraph(boolean clearTransformations) { }

179

public String getExecutionPlan() { }

180

181

// StreamExecutionEnvironment overrides for planning

182

public JobExecutionResult execute() { }

183

public JobExecutionResult execute(String jobName) { }

184

public JobExecutionResult execute(StreamGraph streamGraph) { }

185

186

// Plan conversion utilities

187

public JobGraph getJobGraph() { }

188

public JobGraph getJobGraph(String jobName) { }

189

}

190

```

191

192

## Cluster Client Implementations

193

194

### MiniClusterClient { .api }

195

196

Cluster client implementation for local mini clusters.

197

198

```java

199

public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniClusterId> {

200

// ClusterClient interface implementation for mini clusters

201

202

// Nested cluster ID class

203

public static class MiniClusterId {

204

// Mini cluster identification

205

}

206

}

207

```

208

209

## Program Utilities

210

211

### PackagedProgramRetriever { .api }

212

213

Interface for retrieving packaged programs.

214

215

```java

216

public interface PackagedProgramRetriever {

217

PackagedProgram getPackagedProgram();

218

}

219

```

220

221

### DefaultPackagedProgramRetriever { .api }

222

223

Default implementation of packaged program retriever.

224

225

```java

226

public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever {

227

public PackagedProgram getPackagedProgram() { }

228

}

229

```

230

231

### PackagedProgramUtils { .api }

232

233

Utility enum with static methods for working with packaged programs.

234

235

```java

236

public enum PackagedProgramUtils {

237

// Static utility methods

238

public static boolean isPython(String entryPointClassName) { }

239

public static Pipeline getPipelineFromProgram(PackagedProgram program,

240

Configuration configuration,

241

int parallelism,

242

boolean suppressOutput) { }

243

public static URI resolveURI(String path) { }

244

}

245

```

246

247

### PerJobMiniClusterFactory { .api }

248

249

Factory for creating per-job mini clusters.

250

251

```java

252

public class PerJobMiniClusterFactory {

253

// Constructor

254

public PerJobMiniClusterFactory(Configuration configuration) { }

255

256

// Mini cluster creation methods for per-job execution

257

public MiniCluster createMiniCluster(JobGraph jobGraph,

258

Configuration configuration) { }

259

260

public MiniClusterConfiguration createMiniClusterConfiguration(JobGraph jobGraph,

261

Configuration configuration) { }

262

263

// Resource calculation methods

264

public int calculateNumberOfTaskManagers(JobGraph jobGraph) { }

265

public int calculateTaskSlotsPerTaskManager(Configuration configuration) { }

266

267

// Configuration setup

268

public Configuration setupConfiguration(Configuration originalConfig,

269

JobGraph jobGraph) { }

270

}

271

```

272

273

## Exception Classes

274

275

### ProgramInvocationException { .api }

276

277

Exception thrown during program invocation.

278

279

```java

280

public class ProgramInvocationException extends Exception {

281

// Constructors

282

public ProgramInvocationException(String message) { }

283

public ProgramInvocationException(String message, Throwable cause) { }

284

public ProgramInvocationException(Throwable cause) { }

285

}

286

```

287

288

### ProgramParametrizationException { .api }

289

290

Runtime exception for program parametrization errors.

291

292

```java

293

public class ProgramParametrizationException extends RuntimeException {

294

// Constructors

295

public ProgramParametrizationException(String message) { }

296

public ProgramParametrizationException(String message, Throwable cause) { }

297

public ProgramParametrizationException(Throwable cause) { }

298

}

299

```

300

301

### ProgramMissingJobException { .api }

302

303

Exception thrown when a program doesn't define any jobs.

304

305

```java

306

public class ProgramMissingJobException extends FlinkException {

307

// Constructors

308

public ProgramMissingJobException(String message) { }

309

public ProgramMissingJobException(String message, Throwable cause) { }

310

}

311

```

312

313

### ProgramAbortException { .api }

314

315

Error thrown to abort program execution.

316

317

```java

318

public class ProgramAbortException extends Error {

319

// Constructors

320

public ProgramAbortException(String message) { }

321

public ProgramAbortException(String message, Throwable cause) { }

322

}

323

```

324

325

## Client Utilities

326

327

### ClientUtils Core Methods { .api }

328

329

Detailed implementation of core ClientUtils methods for program execution and job management.

330

331

#### buildUserCodeClassLoader Method

332

333

```java

334

public static URLClassLoader buildUserCodeClassLoader(List<URL> jars,

335

List<URL> classpaths,

336

ClassLoader parent,

337

Configuration configuration) {

338

// Combines JAR files and classpath URLs into a single URLClassLoader

339

// Ensures proper isolation of user code from system classes

340

// Handles parent-first or child-first delegation based on configuration

341

// Returns URLClassLoader configured for user code execution

342

}

343

```

344

345

**Parameters:**

346

- `jars`: List of JAR file URLs containing user program code

347

- `classpaths`: Additional classpath URLs for dependencies

348

- `parent`: Parent ClassLoader for delegation

349

- `configuration`: Flink configuration containing classloader settings

350

351

**Returns:** URLClassLoader configured for user code isolation

352

353

#### executeProgram Method

354

355

```java

356

public static void executeProgram(PipelineExecutorServiceLoader executorServiceLoader,

357

Configuration configuration,

358

PackagedProgram program,

359

boolean enforceSingleJobExecution,

360

boolean suppressSysout) {

361

// Sets up execution environment contexts

362

// Loads and executes the packaged program

363

// Handles both batch and streaming execution modes

364

// Manages job lifecycle including submission and monitoring

365

// Cleans up resources after execution completion

366

}

367

```

368

369

**Parameters:**

370

- `executorServiceLoader`: Service loader for pipeline executors

371

- `configuration`: Execution configuration

372

- `program`: Packaged program to execute

373

- `enforceSingleJobExecution`: Whether to enforce single job execution

374

- `suppressSysout`: Whether to suppress system output during execution

375

376

#### waitUntilJobInitializationFinished Method

377

378

```java

379

public static void waitUntilJobInitializationFinished(SupplierWithException<JobStatus, Exception> jobStatusSupplier,

380

SupplierWithException<JobResult, Exception> jobResultSupplier,

381

ClassLoader userCodeClassloader) {

382

// Polls job status until initialization is complete

383

// Handles various job states during startup phase

384

// Manages timeout and retry logic for status checks

385

// Switches context to user classloader for status operations

386

// Throws appropriate exceptions for initialization failures

387

}

388

```

389

390

**Parameters:**

391

- `jobStatusSupplier`: Supplier for retrieving current job status

392

- `jobResultSupplier`: Supplier for retrieving job result when available

393

- `userCodeClassloader`: User code classloader for context switching

394

395

## Usage Examples

396

397

### Basic Program Execution

398

399

```java

400

// Create packaged program

401

PackagedProgram program = PackagedProgram.newBuilder()

402

.setJarFile(new File("my-flink-job.jar"))

403

.setEntryPointClassName("com.example.MyFlinkJob")

404

.setArguments("--input", "/data/input", "--output", "/data/output")

405

.setConfiguration(new Configuration())

406

.build();

407

408

// Execute program using ClientUtils

409

Configuration config = new Configuration();

410

config.setString("execution.target", "local");

411

412

PipelineExecutorServiceLoader executorLoader =

413

PipelineExecutorServiceLoader.fromConfiguration(config);

414

415

try {

416

ClientUtils.executeProgram(executorLoader, config, program, false, false);

417

} finally {

418

program.close();

419

}

420

```

421

422

### Cluster Client Usage

423

424

```java

425

// Get cluster client through factory

426

Configuration config = new Configuration();

427

config.setString("rest.address", "localhost");

428

config.setInteger("rest.port", 8081);

429

430

ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();

431

ClusterClientFactory<StandaloneClusterId> factory = serviceLoader.getClusterClientFactory(config);

432

433

try (ClusterDescriptor<StandaloneClusterId> descriptor = factory.createClusterDescriptor(config)) {

434

StandaloneClusterId clusterId = factory.getClusterId(config);

435

436

try (ClusterClient<StandaloneClusterId> client = descriptor.retrieve(clusterId).getClusterClient()) {

437

// Submit job

438

JobGraph jobGraph = /* create job graph */;

439

CompletableFuture<JobID> jobIdFuture = client.submitJob(jobGraph);

440

JobID jobId = jobIdFuture.get();

441

442

// Monitor job status

443

CompletableFuture<JobStatus> statusFuture = client.getJobStatus(jobId);

444

JobStatus status = statusFuture.get();

445

System.out.println("Job status: " + status);

446

447

// Get job result

448

CompletableFuture<JobResult> resultFuture = client.requestJobResult(jobId);

449

JobResult result = resultFuture.get();

450

}

451

}

452

```

453

454

### Context Environment Usage

455

456

```java

457

// Set up context environment

458

Configuration config = new Configuration();

459

PipelineExecutorServiceLoader executorLoader =

460

PipelineExecutorServiceLoader.fromConfiguration(config);

461

ClassLoader userClassLoader = Thread.currentThread().getContextClassLoader();

462

463

ContextEnvironment.setAsContext(executorLoader, config, userClassLoader, false, false);

464

StreamContextEnvironment.setAsContext(executorLoader, config, userClassLoader, false, false);

465

466

try {

467

// Your Flink program code here

468

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

469

// ... define your job ...

470

env.execute("My Flink Job");

471

} finally {

472

// Clean up context

473

ContextEnvironment.unsetAsContext();

474

StreamContextEnvironment.unsetAsContext();

475

}

476

```

477

478

### Mini Cluster Usage

479

480

```java

481

// Create mini cluster configuration

482

Configuration config = new Configuration();

483

config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);

484

485

// Create and start mini cluster

486

MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration(

487

config,

488

1, // number of task managers

489

RpcServiceUtils.createRemoteRpcService(/* configuration */),

490

"localhost"

491

));

492

493

miniCluster.start();

494

495

try {

496

// Create mini cluster client

497

MiniClusterClient client = new MiniClusterClient(config, miniCluster);

498

499

// Use client for job operations

500

CompletableFuture<Collection<JobStatusMessage>> jobs = client.listJobs();

501

502

} finally {

503

miniCluster.close();

504

}

505

```

506

507

### Savepoint Operations

508

509

```java

510

// Trigger savepoint

511

try (ClusterClient<StandaloneClusterId> client = /* get cluster client */) {

512

JobID jobId = /* your job ID */;

513

String savepointDirectory = "hdfs://namenode:port/savepoints";

514

515

CompletableFuture<String> savepointFuture =

516

client.triggerSavepoint(jobId, savepointDirectory);

517

String savepointPath = savepointFuture.get();

518

519

System.out.println("Savepoint created at: " + savepointPath);

520

521

// Later, dispose the savepoint if no longer needed

522

CompletableFuture<Acknowledge> disposeFuture = client.disposeSavepoint(savepointPath);

523

disposeFuture.get();

524

}

525

```

526

527

### Pipeline from Program

528

529

```java

530

// Extract pipeline from packaged program

531

PackagedProgram program = /* create packaged program */;

532

Configuration config = new Configuration();

533

int parallelism = 4;

534

boolean suppressOutput = true;

535

536

Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(

537

program, config, parallelism, suppressOutput);

538

539

// Use pipeline with executor

540

PipelineExecutor executor = /* get executor */;

541

CompletableFuture<JobClient> jobClientFuture =

542

executor.execute(pipeline, config, program.getUserCodeClassLoader());

543

```

544

545

## Required Imports

546

547

```java

548

import org.apache.flink.client.program.ClusterClient;

549

import org.apache.flink.client.program.PackagedProgram;

550

import org.apache.flink.client.program.ClusterClientProvider;

551

import org.apache.flink.client.program.ContextEnvironment;

552

import org.apache.flink.client.program.StreamContextEnvironment;

553

import org.apache.flink.client.program.OptimizerPlanEnvironment;

554

import org.apache.flink.client.program.StreamPlanEnvironment;

555

import org.apache.flink.client.program.MiniClusterClient;

556

import org.apache.flink.client.program.PackagedProgramRetriever;

557

import org.apache.flink.client.program.DefaultPackagedProgramRetriever;

558

import org.apache.flink.client.program.PackagedProgramUtils;

559

import org.apache.flink.client.program.PerJobMiniClusterFactory;

560

import org.apache.flink.client.program.ProgramInvocationException;

561

import org.apache.flink.client.program.ProgramParametrizationException;

562

import org.apache.flink.client.program.ProgramMissingJobException;

563

import org.apache.flink.client.program.ProgramAbortException;

564

import org.apache.flink.client.ClientUtils;

565

import org.apache.flink.optimizer.Optimizer;

566

import org.apache.flink.optimizer.plan.OptimizedPlan;

567

import org.apache.flink.optimizer.dag.DataSinkNode;

568

import org.apache.flink.api.common.Plan;

569

import org.apache.flink.streaming.api.graph.StreamGraph;

570

import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;

571

import org.apache.flink.util.function.SupplierWithException;

572

import org.apache.flink.api.common.ExecutionConfig;

573

import org.apache.flink.api.java.ExecutionEnvironment;

574

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

575

import org.apache.flink.api.common.JobID;

576

import org.apache.flink.api.common.JobStatus;

577

import org.apache.flink.runtime.jobgraph.JobGraph;

578

import org.apache.flink.runtime.jobmaster.JobResult;

579

import org.apache.flink.runtime.messages.Acknowledge;

580

import org.apache.flink.runtime.jobgraph.OperatorID;

581

import org.apache.flink.runtime.operators.coordination.CoordinationRequest;

582

import org.apache.flink.runtime.operators.coordination.CoordinationResponse;

583

import org.apache.flink.core.execution.JobClient;

584

import org.apache.flink.core.execution.PipelineExecutor;

585

import org.apache.flink.core.execution.PipelineExecutorServiceLoader;

586

import org.apache.flink.api.dag.Pipeline;

587

import org.apache.flink.configuration.Configuration;

588

import org.apache.flink.runtime.state.StateBackend;

589

import org.apache.flink.runtime.minicluster.MiniCluster;

590

import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;

591

import org.apache.flink.util.FlinkException;

592

import java.util.concurrent.CompletableFuture;

593

import java.util.Collection;

594

import java.util.Map;

595

import java.util.List;

596

import java.io.File;

597

import java.net.URL;

598

import java.net.URI;

599

import java.net.URLClassLoader;

600

```