or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-deployment.mdcli-interface.mdclient-core.mdcluster-management.mdindex.mdprogram-execution.mdrest-client.md

program-execution.mddocs/

0

# Program Execution

1

2

Program packaging, classloader management, and execution utilities for submitting user applications to Flink clusters. Provides comprehensive support for JAR-based programs and execution environment management.

3

4

## Capabilities

5

6

### Packaged Program

7

8

Represents a program packaged in a JAR file with all necessary dependencies and configuration.

9

10

```java { .api }

11

/**

12

* Represents a program packaged in a JAR file

13

*/

14

public class PackagedProgram implements AutoCloseable {

15

16

// Manifest attribute constants

17

public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";

18

public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";

19

20

/**

21

* Gets savepoint restore settings for this program

22

* @return SavepointRestoreSettings instance

23

*/

24

public SavepointRestoreSettings getSavepointSettings();

25

26

/**

27

* Gets the program arguments

28

* @return Array of program arguments

29

*/

30

public String[] getArguments();

31

32

/**

33

* Gets the main class name for this program

34

* @return Fully qualified main class name

35

*/

36

public String getMainClassName();

37

38

/**

39

* Gets program description by analyzing the JAR

40

* @return Program description string

41

* @throws ProgramInvocationException if description cannot be retrieved

42

*/

43

public String getDescription() throws ProgramInvocationException;

44

45

/**

46

* Invokes the program in interactive mode for execution

47

* @throws ProgramInvocationException if invocation fails

48

*/

49

public void invokeInteractiveModeForExecution() throws ProgramInvocationException;

50

51

/**

52

* Gets the required classpaths for this program

53

* @return List of classpath URLs

54

*/

55

public List<URL> getClasspaths();

56

57

/**

58

* Gets the user code classloader for this program

59

* @return ClassLoader for user code execution

60

*/

61

public ClassLoader getUserCodeClassLoader();

62

63

/**

64

* Gets the job JAR and all dependencies

65

* @return List of JAR and dependency URLs

66

*/

67

public List<URL> getJobJarAndDependencies();

68

69

/**

70

* Static method to get JAR and dependencies from file

71

* @param jarFile JAR file to analyze

72

* @param entryPointClassName Entry point class name

73

* @return List of JAR and dependency URLs

74

* @throws ProgramInvocationException if analysis fails

75

*/

76

public static List<URL> getJobJarAndDependencies(File jarFile, String entryPointClassName)

77

throws ProgramInvocationException;

78

79

/**

80

* Extracts nested libraries from a JAR file

81

* @param jarFile JAR file URL to extract from

82

* @return List of extracted library files

83

* @throws ProgramInvocationException if extraction fails

84

*/

85

public static List<File> extractContainedLibraries(URL jarFile)

86

throws ProgramInvocationException;

87

88

/**

89

* Creates a new builder for PackagedProgram

90

* @return Builder instance

91

*/

92

public static Builder newBuilder();

93

94

/**

95

* Closes the program and releases resources

96

*/

97

@Override

98

public void close();

99

100

/**

101

* Builder pattern for PackagedProgram construction

102

*/

103

public static class Builder {

104

/**

105

* Sets the JAR file for this program

106

* @param jarFile JAR file containing the program

107

* @return This builder instance

108

*/

109

public Builder setJarFile(File jarFile);

110

111

/**

112

* Sets the entry point class name

113

* @param entryPointClassName Fully qualified class name

114

* @return This builder instance

115

*/

116

public Builder setEntryPointClassName(String entryPointClassName);

117

118

/**

119

* Sets the program arguments

120

* @param args Variable arguments for the program

121

* @return This builder instance

122

*/

123

public Builder setArguments(String... args);

124

125

/**

126

* Sets additional user classpaths

127

* @param userClassPaths List of user classpath URLs

128

* @return This builder instance

129

*/

130

public Builder setUserClassPaths(List<URL> userClassPaths);

131

132

/**

133

* Sets the Flink configuration

134

* @param configuration Flink configuration instance

135

* @return This builder instance

136

*/

137

public Builder setConfiguration(Configuration configuration);

138

139

/**

140

* Sets savepoint restore settings

141

* @param savepointRestoreSettings Savepoint settings

142

* @return This builder instance

143

*/

144

public Builder setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings);

145

146

/**

147

* Builds the PackagedProgram instance

148

* @return PackagedProgram instance

149

* @throws ProgramInvocationException if building fails

150

*/

151

public PackagedProgram build() throws ProgramInvocationException;

152

}

153

}

154

```

155

156

**Usage Example:**

157

158

```java

159

import org.apache.flink.client.program.PackagedProgram;

160

import java.io.File;

161

162

// Create a packaged program from JAR

163

PackagedProgram program = PackagedProgram.newBuilder()

164

.setJarFile(new File("/path/to/my-flink-job.jar"))

165

.setEntryPointClassName("com.example.MyFlinkJob")

166

.setArguments("--input", "/data/input", "--output", "/data/output")

167

.build();

168

169

try {

170

System.out.println("Program: " + program.getDescription());

171

System.out.println("Main class: " + program.getMainClassName());

172

173

// Get classpath information

174

List<URL> dependencies = program.getJobJarAndDependencies();

175

System.out.println("Dependencies: " + dependencies.size());

176

177

// Execute the program

178

program.invokeInteractiveModeForExecution();

179

180

} finally {

181

program.close();

182

}

183

```

184

185

### Packaged Program Utilities

186

187

Utility functions for working with packaged programs.

188

189

```java { .api }

190

/**

191

* Utilities for packaged programs

192

*/

193

public class PackagedProgramUtils {

194

/**

195

* Checks if the program is a Python program

196

* @param entryPointClassName Entry point class name to check

197

* @return true if Python program, false otherwise

198

*/

199

public static boolean isPython(String entryPointClassName);

200

201

/**

202

* Gets the Python JAR URL for Python programs

203

* @return URL of the Python JAR

204

*/

205

public static URL getPythonJar();

206

}

207

```

208

209

### Program Retriever Interface

210

211

Interface for retrieving packaged programs from various sources.

212

213

```java { .api }

214

/**

215

* Interface for retrieving packaged programs

216

*/

217

public interface PackagedProgramRetriever {

218

/**

219

* Retrieves a packaged program

220

* @return PackagedProgram instance

221

* @throws ProgramRetrievalException if retrieval fails

222

*/

223

PackagedProgram getPackagedProgram() throws ProgramRetrievalException;

224

}

225

226

/**

227

* Default implementation for retrieving packaged programs

228

*/

229

public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever {

230

@Override

231

public PackagedProgram getPackagedProgram() throws ProgramRetrievalException;

232

}

233

```

234

235

### Execution Environments

236

237

Specialized execution environments that provide context for program execution.

238

239

```java { .api }

240

/**

241

* Execution environment with context for DataSet programs

242

*/

243

public class ContextEnvironment extends ExecutionEnvironment {

244

// Provides execution context for DataSet API programs

245

// Automatically configured when running within Flink client

246

}

247

248

/**

249

* Execution environment for plan optimization

250

*/

251

public class OptimizerPlanEnvironment extends ExecutionEnvironment {

252

// Used for generating execution plans without actual execution

253

// Useful for plan analysis and optimization

254

}

255

256

/**

257

* Stream execution environment with context for DataStream programs

258

*/

259

public class StreamContextEnvironment extends StreamExecutionEnvironment {

260

// Provides execution context for DataStream API programs

261

// Automatically configured when running within Flink client

262

}

263

264

/**

265

* Environment for stream execution plans

266

*/

267

public class StreamPlanEnvironment extends StreamExecutionEnvironment {

268

// Used for generating stream execution plans

269

// Enables plan inspection without execution

270

}

271

```

272

273

**Usage Example:**

274

275

```java

276

import org.apache.flink.client.program.ContextEnvironment;

277

import org.apache.flink.api.java.ExecutionEnvironment;

278

279

// The environment is automatically set up when running through client

280

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

281

282

// If running in client context, this will be a ContextEnvironment

283

if (env instanceof ContextEnvironment) {

284

System.out.println("Running in client context");

285

}

286

287

// Create and execute DataSet program

288

DataSet<String> input = env.readTextFile("/path/to/input");

289

input.flatMap(new Tokenizer())

290

.groupBy(0)

291

.sum(1)

292

.writeAsCsv("/path/to/output");

293

294

env.execute("Word Count Example");

295

```

296

297

### Mini Cluster Factory

298

299

Factory for creating per-job mini clusters for testing and development.

300

301

```java { .api }

302

/**

303

* Factory for per-job mini clusters

304

*/

305

public class PerJobMiniClusterFactory {

306

/**

307

* Creates a mini cluster for running a single job

308

* @param configuration Cluster configuration

309

* @return MiniCluster instance configured for single job execution

310

*/

311

public static MiniCluster createMiniCluster(Configuration configuration);

312

}

313

```

314

315

## Exception Types

316

317

```java { .api }

318

/**

319

* Exception for program invocation errors

320

*/

321

public class ProgramInvocationException extends Exception {

322

/**

323

* Creates exception with message

324

* @param message Error message

325

*/

326

public ProgramInvocationException(String message);

327

328

/**

329

* Creates exception with message and job ID

330

* @param message Error message

331

* @param jobID Associated job ID

332

*/

333

public ProgramInvocationException(String message, JobID jobID);

334

335

/**

336

* Creates exception with cause

337

* @param cause Root cause throwable

338

*/

339

public ProgramInvocationException(Throwable cause);

340

341

/**

342

* Creates exception with message and cause

343

* @param message Error message

344

* @param cause Root cause throwable

345

*/

346

public ProgramInvocationException(String message, Throwable cause);

347

348

/**

349

* Creates exception with message, job ID and cause

350

* @param message Error message

351

* @param jobID Associated job ID

352

* @param cause Root cause throwable

353

*/

354

public ProgramInvocationException(String message, JobID jobID, Throwable cause);

355

}

356

357

/**

358

* Exception when program doesn't contain a job

359

*/

360

public class ProgramMissingJobException extends FlinkException {

361

/**

362

* Creates exception with message

363

* @param message Error message describing missing job

364

*/

365

public ProgramMissingJobException(String message);

366

}

367

368

/**

369

* Exception for program parameterization errors

370

*/

371

public class ProgramParametrizationException extends RuntimeException {

372

/**

373

* Creates exception with message

374

* @param message Error message

375

*/

376

public ProgramParametrizationException(String message);

377

378

/**

379

* Creates exception with message and cause

380

* @param message Error message

381

* @param cause Root cause throwable

382

*/

383

public ProgramParametrizationException(String message, Throwable cause);

384

}

385

386

/**

387

* Exception for program abortion (extends Error for immediate termination)

388

*/

389

public class ProgramAbortException extends Error {

390

/**

391

* Creates default abort exception

392

*/

393

public ProgramAbortException();

394

395

/**

396

* Creates abort exception with message

397

* @param message Abort message

398

*/

399

public ProgramAbortException(String message);

400

401

/**

402

* Creates abort exception with message and cause

403

* @param message Abort message

404

* @param cause Root cause throwable

405

*/

406

public ProgramAbortException(String message, Throwable cause);

407

}

408

409

/**

410

* Exception for program retrieval errors

411

*/

412

public class ProgramRetrievalException extends Exception {

413

public ProgramRetrievalException(String message);

414

public ProgramRetrievalException(String message, Throwable cause);

415

}

416

```

417

418

## Types

419

420

```java { .api }

421

public class SavepointRestoreSettings {

422

public static SavepointRestoreSettings none();

423

public static SavepointRestoreSettings forPath(String savepointPath);

424

public static SavepointRestoreSettings forPath(String savepointPath, boolean allowNonRestoredState);

425

426

public boolean restoreSavepoint();

427

public String getRestorePath();

428

public boolean allowNonRestoredState();

429

}

430

431

public abstract class ExecutionEnvironment {

432

public static ExecutionEnvironment getExecutionEnvironment();

433

public abstract JobExecutionResult execute(String jobName) throws Exception;

434

435

// DataSet API methods

436

public <X> DataSet<X> fromCollection(Collection<X> data);

437

public DataSet<String> readTextFile(String filePath);

438

}

439

440

public abstract class StreamExecutionEnvironment {

441

public static StreamExecutionEnvironment getExecutionEnvironment();

442

public JobExecutionResult execute(String jobName) throws Exception;

443

444

// DataStream API methods

445

public <T> DataStreamSource<T> fromCollection(Collection<T> data);

446

public DataStreamSource<String> socketTextStream(String hostname, int port);

447

}

448

449

public interface DataSet<T> {

450

<R> DataSet<R> map(MapFunction<T, R> mapper);

451

<R> DataSet<R> flatMap(FlatMapFunction<T, R> flatMapper);

452

DataSet<T> filter(FilterFunction<T> filter);

453

UnsortedGrouping<T> groupBy(int... fields);

454

DataSink<T> writeAsCsv(String filePath);

455

}

456

457

public interface DataStream<T> {

458

<R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper);

459

<R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper);

460

SingleOutputStreamOperator<T> filter(FilterFunction<T> filter);

461

DataStreamSink<T> print();

462

}

463

464

public class MiniCluster implements AutoCloseable {

465

public void start() throws Exception;

466

public void close() throws Exception;

467

public CompletableFuture<JobResult> executeJobBlocking(JobGraph job);

468

}

469

```