or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-mode.mdartifact-management.mdcli-frontend.mdcluster-client.mddeployment-management.mdindex.mdprogram-packaging.md

program-packaging.mddocs/

0

# Program Packaging and Execution

1

2

Functionality for packaging Flink programs in JAR files, managing classpaths, handling user applications, and executing programs with proper isolation and resource management.

3

4

## Capabilities

5

6

### Packaged Program

7

8

Core class representing a Flink program packaged in a JAR file with full support for classpath management, argument handling, and program execution.

9

10

```java { .api }

11

/**

12

* Represents a Flink program packaged in a JAR file

13

*/

14

public class PackagedProgram implements AutoCloseable {

15

/**

16

* Create a new packaged program builder

17

* @return Builder instance for constructing PackagedProgram

18

*/

19

public static Builder newBuilder();

20

21

/**

22

* Get the main class of the packaged program

23

* @return Main class object

24

* @throws ProgramInvocationException if main class cannot be determined

25

*/

26

public Class<?> getMainClass() throws ProgramInvocationException;

27

28

/**

29

* Get program arguments

30

* @return Array of program arguments

31

*/

32

public String[] getArguments();

33

34

/**

35

* Get JAR file URL

36

* @return URL of the JAR file

37

*/

38

public URL getJarFile();

39

40

/**

41

* Get user code class loader

42

* @return Class loader for user code

43

*/

44

public URLClassLoader getUserCodeClassLoader();

45

46

/**

47

* Get savepoint restore settings

48

* @return Savepoint restore settings

49

*/

50

public SavepointRestoreSettings getSavepointRestoreSettings();

51

52

/**

53

* Check if this is a Python program

54

* @return true if Python program, false otherwise

55

*/

56

public boolean isPython();

57

58

/**

59

* Invoke the program's main method for interactive execution

60

* @throws ProgramInvocationException if execution fails

61

*/

62

public void invokeInteractiveModeForExecution() throws ProgramInvocationException;

63

64

/**

65

* Get list of classpath URLs

66

* @return List of classpath URLs

67

*/

68

public List<URL> getClasspaths();

69

70

/**

71

* Check if program has main method

72

* @return true if main method exists

73

*/

74

public boolean hasMainMethod();

75

76

@Override

77

public void close();

78

79

/**

80

* Builder for creating PackagedProgram instances

81

*/

82

public static class Builder {

83

/**

84

* Set JAR file for the program

85

* @param jarFile JAR file containing the program

86

* @return Builder instance

87

*/

88

public Builder setJarFile(File jarFile);

89

90

/**

91

* Set program arguments

92

* @param arguments Array of arguments

93

* @return Builder instance

94

*/

95

public Builder setArguments(String... arguments);

96

97

/**

98

* Set entry point class name

99

* @param entryPointClassName Fully qualified class name

100

* @return Builder instance

101

*/

102

public Builder setEntryPointClassName(@Nullable String entryPointClassName);

103

104

/**

105

* Set savepoint restore settings

106

* @param savepointRestoreSettings Savepoint settings

107

* @return Builder instance

108

*/

109

public Builder setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings);

110

111

/**

112

* Set additional classpath URLs

113

* @param classpaths List of classpath URLs

114

* @return Builder instance

115

*/

116

public Builder setClasspaths(List<URL> classpaths);

117

118

/**

119

* Set user code class loader

120

* @param userCodeClassLoader Class loader for user code

121

* @return Builder instance

122

*/

123

public Builder setUserCodeClassLoader(URLClassLoader userCodeClassLoader);

124

125

/**

126

* Build the PackagedProgram instance

127

* @return Configured PackagedProgram

128

* @throws ProgramInvocationException if construction fails

129

*/

130

public PackagedProgram build() throws ProgramInvocationException;

131

}

132

}

133

```

134

135

**Usage Examples:**

136

137

```java

138

import org.apache.flink.client.program.PackagedProgram;

139

import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

140

141

// Create a simple packaged program

142

PackagedProgram program = PackagedProgram.newBuilder()

143

.setJarFile(new File("/path/to/flink-job.jar"))

144

.setArguments("--input", "input.txt", "--output", "output.txt")

145

.build();

146

147

// Create program with custom entry point

148

PackagedProgram programWithEntryPoint = PackagedProgram.newBuilder()

149

.setJarFile(new File("/path/to/job.jar"))

150

.setEntryPointClassName("com.mycompany.MyFlinkJob")

151

.setArguments("--parallelism", "4")

152

.build();

153

154

// Create program with savepoint restore

155

SavepointRestoreSettings savepointSettings = SavepointRestoreSettings

156

.forPath("/path/to/savepoint", false);

157

158

PackagedProgram programWithSavepoint = PackagedProgram.newBuilder()

159

.setJarFile(new File("/path/to/job.jar"))

160

.setSavepointRestoreSettings(savepointSettings)

161

.build();

162

163

// Use the program

164

System.out.println("Main class: " + program.getMainClass().getName());

165

System.out.println("Arguments: " + Arrays.toString(program.getArguments()));

166

167

// Clean up resources

168

program.close();

169

```

170

171

### Packaged Program Utilities

172

173

Utility functions for working with packaged programs, including Python program detection and program creation helpers.

174

175

```java { .api }

176

/**

177

* Utilities for working with packaged programs

178

*/

179

public class PackagedProgramUtils {

180

/**

181

* Check if the program is a Python program

182

* @param program Packaged program to check

183

* @return true if Python program

184

*/

185

public static boolean isPython(PackagedProgram program);

186

187

/**

188

* Check if file path represents a Python program

189

* @param file File to check

190

* @return true if Python file

191

*/

192

public static boolean isPython(File file);

193

194

/**

195

* Get program description from packaged program

196

* @param program Packaged program

197

* @return Program description or null

198

*/

199

@Nullable

200

public static String getProgramDescription(PackagedProgram program);

201

202

/**

203

* Extract nested libraries from JAR file

204

* @param jarFile JAR file to extract from

205

* @param extractionDir Target directory for extraction

206

* @return List of extracted library files

207

* @throws IOException if extraction fails

208

*/

209

public static List<File> extractNestedLibraries(File jarFile, File extractionDir)

210

throws IOException;

211

}

212

```

213

214

### Packaged Program Retriever

215

216

Interface and implementation for retrieving packaged programs, enabling flexible program loading strategies.

217

218

```java { .api }

219

/**

220

* Interface for retrieving packaged programs

221

*/

222

public interface PackagedProgramRetriever {

223

/**

224

* Retrieve a packaged program

225

* @return PackagedProgram instance

226

* @throws ProgramInvocationException if retrieval fails

227

*/

228

PackagedProgram getPackagedProgram() throws ProgramInvocationException;

229

}

230

231

/**

232

* Default implementation of packaged program retriever

233

*/

234

public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever {

235

/**

236

* Create retriever with JAR file and arguments

237

* @param jarFile JAR file containing the program

238

* @param entryPointClassName Optional entry point class name

239

* @param programArguments Program arguments

240

* @param savepointRestoreSettings Savepoint restore settings

241

* @param classpaths Additional classpath URLs

242

*/

243

public DefaultPackagedProgramRetriever(

244

File jarFile,

245

@Nullable String entryPointClassName,

246

String[] programArguments,

247

SavepointRestoreSettings savepointRestoreSettings,

248

List<URL> classpaths

249

);

250

251

@Override

252

public PackagedProgram getPackagedProgram() throws ProgramInvocationException;

253

}

254

```

255

256

### Stream Environment Classes

257

258

Specialized execution environments for stream processing contexts and plan generation.

259

260

```java { .api }

261

/**

262

* Execution environment for stream processing contexts

263

*/

264

public class StreamContextEnvironment extends StreamExecutionEnvironment {

265

/**

266

* Create stream context environment

267

* @param client Cluster client for job submission

268

* @param parallelism Default parallelism

269

* @param userCodeClassLoader User code class loader

270

* @param savepointRestoreSettings Savepoint restore settings

271

*/

272

public StreamContextEnvironment(

273

ClusterClient<?> client,

274

int parallelism,

275

ClassLoader userCodeClassLoader,

276

SavepointRestoreSettings savepointRestoreSettings

277

);

278

279

@Override

280

public JobExecutionResult execute(String jobName) throws Exception;

281

282

@Override

283

public JobClient executeAsync(String jobName) throws Exception;

284

}

285

286

/**

287

* Environment for stream plan generation

288

*/

289

public class StreamPlanEnvironment extends StreamExecutionEnvironment {

290

/**

291

* Create stream plan environment

292

* @param executorServiceLoader Executor service loader

293

* @param configuration Flink configuration

294

* @param userCodeClassLoader User code class loader

295

* @param savepointRestoreSettings Savepoint restore settings

296

*/

297

public StreamPlanEnvironment(

298

PipelineExecutorServiceLoader executorServiceLoader,

299

Configuration configuration,

300

ClassLoader userCodeClassLoader,

301

SavepointRestoreSettings savepointRestoreSettings

302

);

303

304

@Override

305

public JobExecutionResult execute(String jobName) throws Exception;

306

307

/**

308

* Get the execution plan as JSON

309

* @return Execution plan JSON string

310

*/

311

public String getExecutionPlan();

312

}

313

```

314

315

### Mini Cluster Factory

316

317

Factory for creating per-job mini clusters for testing and local development.

318

319

```java { .api }

320

/**

321

* Factory for per-job mini clusters

322

*/

323

public class PerJobMiniClusterFactory {

324

/**

325

* Create per-job mini cluster factory

326

* @param miniClusterConfiguration Mini cluster configuration

327

*/

328

public PerJobMiniClusterFactory(MiniClusterConfiguration miniClusterConfiguration);

329

330

/**

331

* Create mini cluster for job execution

332

* @param jobGraph Job graph to execute

333

* @return Mini cluster instance

334

* @throws Exception if creation fails

335

*/

336

public MiniCluster create(JobGraph jobGraph) throws Exception;

337

}

338

```

339

340

## Types

341

342

```java { .api }

343

/**

344

* Settings for savepoint restoration

345

*/

346

public class SavepointRestoreSettings {

347

/**

348

* Create settings for restoring from savepoint path

349

* @param savepointPath Path to savepoint

350

* @param allowNonRestoredState Whether to allow non-restored state

351

* @return Savepoint restore settings

352

*/

353

public static SavepointRestoreSettings forPath(

354

String savepointPath,

355

boolean allowNonRestoredState

356

);

357

358

/**

359

* Create settings with no savepoint restoration

360

* @return Settings for no restoration

361

*/

362

public static SavepointRestoreSettings none();

363

364

/**

365

* Check if restoration is enabled

366

* @return true if restoration enabled

367

*/

368

public boolean restoreSavepoint();

369

370

/**

371

* Get savepoint path

372

* @return Savepoint path or null

373

*/

374

@Nullable

375

public String getRestorePath();

376

377

/**

378

* Check if non-restored state is allowed

379

* @return true if allowed

380

*/

381

public boolean allowNonRestoredState();

382

}

383

```

384

385

## Exception Handling

386

387

Program packaging operations handle various error conditions:

388

389

- **File Errors**: `FileNotFoundException` for missing JAR files, `IOException` for file access issues

390

- **Class Loading Errors**: `ClassNotFoundException` for missing main classes, `NoClassDefFoundError` for dependencies

391

- **Program Errors**: `ProgramInvocationException` for program execution failures

392

- **Configuration Errors**: `ProgramParametrizationException` for invalid program parameters

393

- **Missing Job Errors**: `ProgramMissingJobException` when program doesn't define a Flink job

394

395

**Common Error Patterns:**

396

397

```java

398

try {

399

PackagedProgram program = PackagedProgram.newBuilder()

400

.setJarFile(new File("job.jar"))

401

.build();

402

403

// Use program...

404

program.close();

405

406

} catch (ProgramInvocationException e) {

407

// Handle program construction/execution errors

408

System.err.println("Program error: " + e.getMessage());

409

} catch (FileNotFoundException e) {

410

// Handle missing JAR file

411

System.err.println("JAR file not found: " + e.getMessage());

412

}

413

```