or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-mode.mdartifact-management.mdcli-frontend.mdcluster-client.mddeployment-management.mdindex.mdprogram-packaging.md

application-mode.mddocs/

0

# Application Mode Deployment

1

2

Specialized deployment mode for long-running applications with dedicated cluster resources, lifecycle management, and optimized resource utilization for application-centric workloads.

3

4

## Capabilities

5

6

### Application Runner Interface

7

8

Core interface for running applications in application mode with full lifecycle management.

9

10

```java { .api }

11

/**

12

* Interface for running applications in application mode

13

*/

14

public interface ApplicationRunner {

15

/**

16

* Run application with dispatcher gateway and configuration

17

* @param dispatcherGateway Gateway to the dispatcher

18

* @param scheduledExecutor Executor for scheduled operations

19

* @param applicationConfiguration Application configuration

20

* @return Future completing when application finishes

21

*/

22

CompletableFuture<Void> run(

23

DispatcherGateway dispatcherGateway,

24

ScheduledExecutor scheduledExecutor,

25

ApplicationConfiguration applicationConfiguration

26

);

27

}

28

```

29

30

### Detached Application Runner

31

32

Application runner implementation for detached execution where the client doesn't wait for job completion.

33

34

```java { .api }

35

/**

36

* Application runner for detached execution

37

*/

38

public class DetachedApplicationRunner implements ApplicationRunner {

39

/**

40

* Create detached application runner

41

* @param highAvailabilityServices High availability services

42

* @param configuration Flink configuration

43

* @param mainThreadExecutor Main thread executor

44

*/

45

public DetachedApplicationRunner(

46

HighAvailabilityServices highAvailabilityServices,

47

Configuration configuration,

48

Executor mainThreadExecutor

49

);

50

51

@Override

52

public CompletableFuture<Void> run(

53

DispatcherGateway dispatcherGateway,

54

ScheduledExecutor scheduledExecutor,

55

ApplicationConfiguration applicationConfiguration

56

);

57

}

58

```

59

60

### Application Configuration

61

62

Configuration class for application deployments containing program arguments, entry point information, and execution settings.

63

64

```java { .api }

65

/**

66

* Configuration for application deployments

67

*/

68

public class ApplicationConfiguration {

69

/**

70

* Create configuration builder from Flink configuration

71

* @param configuration Flink configuration

72

* @return Configuration builder

73

*/

74

public static Builder fromConfiguration(Configuration configuration);

75

76

/**

77

* Get program arguments

78

* @return Array of program arguments

79

*/

80

public String[] getProgramArguments();

81

82

/**

83

* Get application arguments

84

* @return Array of application arguments

85

*/

86

public String[] getApplicationArgs();

87

88

/**

89

* Get parallelism setting

90

* @return Parallelism or null if not set

91

*/

92

@Nullable

93

public Integer getParallelism();

94

95

/**

96

* Get savepoint restore settings

97

* @return Savepoint restore settings

98

*/

99

public SavepointRestoreSettings getSavepointRestoreSettings();

100

101

/**

102

* Builder for application configuration

103

*/

104

public static class Builder {

105

/**

106

* Set program arguments

107

* @param programArguments Array of arguments

108

* @return Builder instance

109

*/

110

public Builder setProgramArguments(String... programArguments);

111

112

/**

113

* Set application arguments

114

* @param applicationArgs Array of application arguments

115

* @return Builder instance

116

*/

117

public Builder setApplicationArgs(String... applicationArgs);

118

119

/**

120

* Set parallelism

121

* @param parallelism Parallelism setting

122

* @return Builder instance

123

*/

124

public Builder setParallelism(Integer parallelism);

125

126

/**

127

* Set savepoint restore settings

128

* @param savepointRestoreSettings Savepoint settings

129

* @return Builder instance

130

*/

131

public Builder setSavepointRestoreSettings(

132

SavepointRestoreSettings savepointRestoreSettings

133

);

134

135

/**

136

* Build application configuration

137

* @return Configured application configuration

138

*/

139

public ApplicationConfiguration build();

140

}

141

}

142

```

143

144

**Usage Examples:**

145

146

```java

147

import org.apache.flink.client.deployment.application.ApplicationConfiguration;

148

import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

149

150

// Create basic application configuration

151

ApplicationConfiguration config = ApplicationConfiguration

152

.fromConfiguration(flinkConfig)

153

.setProgramArguments("--input", "input.txt", "--output", "output.txt")

154

.setParallelism(4)

155

.build();

156

157

// Create configuration with savepoint restore

158

SavepointRestoreSettings savepointSettings = SavepointRestoreSettings

159

.forPath("/path/to/savepoint", false);

160

161

ApplicationConfiguration configWithSavepoint = ApplicationConfiguration

162

.fromConfiguration(flinkConfig)

163

.setProgramArguments("--mode", "batch")

164

.setSavepointRestoreSettings(savepointSettings)

165

.build();

166

```

167

168

### Application Cluster Entry Point

169

170

Entry point for application cluster mode providing cluster initialization and application execution coordination.

171

172

```java { .api }

173

/**

174

* Entry point for application cluster mode

175

*/

176

public class ApplicationClusterEntryPoint extends ClusterEntrypoint {

177

/**

178

* Main method for application cluster entry point

179

* @param args Command-line arguments

180

*/

181

public static void main(String[] args);

182

183

/**

184

* Create application cluster entry point

185

* @param configuration Cluster configuration

186

* @param pluginManager Plugin manager

187

*/

188

protected ApplicationClusterEntryPoint(

189

Configuration configuration,

190

PluginManager pluginManager

191

);

192

193

@Override

194

protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(

195

Configuration configuration

196

);

197

}

198

```

199

200

### Application Job Clients

201

202

Specialized job client implementations for application mode execution.

203

204

```java { .api }

205

/**

206

* Job client for embedded application execution

207

*/

208

public class EmbeddedJobClient implements JobClient {

209

/**

210

* Create embedded job client

211

* @param jobID Job identifier

212

* @param dispatcher Dispatcher gateway

213

* @param executorService Executor service

214

* @param classLoader User code class loader

215

*/

216

public EmbeddedJobClient(

217

JobID jobID,

218

DispatcherGateway dispatcher,

219

ScheduledExecutorService executorService,

220

ClassLoader classLoader

221

);

222

223

@Override

224

public JobID getJobID();

225

226

@Override

227

public CompletableFuture<JobStatus> getJobStatus();

228

229

@Override

230

public CompletableFuture<Void> cancel();

231

232

@Override

233

public CompletableFuture<String> stopWithSavepoint(

234

boolean advanceToEndOfEventTime,

235

@Nullable String savepointDir,

236

SavepointFormatType formatType

237

);

238

239

@Override

240

public CompletableFuture<JobExecutionResult> getJobExecutionResult();

241

}

242

243

/**

244

* Job client for web submission application execution

245

*/

246

public class WebSubmissionJobClient implements JobClient {

247

/**

248

* Create web submission job client

249

* @param jobSubmissionResult Job submission result

250

* @param jobStatusSupplier Supplier for job status

251

* @param jobResultSupplier Supplier for job result

252

* @param classLoader User code class loader

253

*/

254

public WebSubmissionJobClient(

255

JobSubmissionResult jobSubmissionResult,

256

Supplier<CompletableFuture<JobStatus>> jobStatusSupplier,

257

Supplier<CompletableFuture<JobResult>> jobResultSupplier,

258

ClassLoader classLoader

259

);

260

261

@Override

262

public JobID getJobID();

263

264

@Override

265

public CompletableFuture<JobStatus> getJobStatus();

266

267

@Override

268

public CompletableFuture<JobExecutionResult> getJobExecutionResult();

269

}

270

```

271

272

### Entry Class Information Providers

273

274

System for discovering and providing entry class information from JARs and classpaths.

275

276

```java { .api }

277

/**

278

* Provider for entry class information

279

*/

280

public interface EntryClassInformationProvider {

281

/**

282

* Get program entry point class name

283

* @return Entry point class name or null

284

*/

285

@Nullable

286

String getJobClassName();

287

288

/**

289

* Get JAR file location

290

* @return JAR file or null

291

*/

292

@Nullable

293

File getJarFile();

294

}

295

296

/**

297

* Entry class provider from JAR manifest

298

*/

299

public class FromJarEntryClassInformationProvider implements EntryClassInformationProvider {

300

/**

301

* Create provider from JAR file

302

* @param jarFile JAR file to analyze

303

* @param programArguments Program arguments

304

*/

305

public FromJarEntryClassInformationProvider(File jarFile, String[] programArguments);

306

307

@Override

308

public String getJobClassName();

309

310

@Override

311

public File getJarFile();

312

}

313

314

/**

315

* Entry class provider from classpath scanning

316

*/

317

public class FromClasspathEntryClassInformationProvider implements EntryClassInformationProvider {

318

/**

319

* Create provider from classpath

320

* @param jobClassName Job class name

321

* @param programArguments Program arguments

322

*/

323

public FromClasspathEntryClassInformationProvider(

324

String jobClassName,

325

String[] programArguments

326

);

327

328

@Override

329

public String getJobClassName();

330

331

@Override

332

public File getJarFile();

333

}

334

```

335

336

### Application Executors

337

338

Executor implementations specifically designed for application mode execution.

339

340

```java { .api }

341

/**

342

* Executor for embedded application execution

343

*/

344

public class EmbeddedExecutor implements PipelineExecutor {

345

/**

346

* Create embedded executor

347

* @param dispatcherGateway Dispatcher gateway

348

* @param executorService Executor service

349

* @param configuration Flink configuration

350

* @param userCodeClassLoader User code class loader

351

*/

352

public EmbeddedExecutor(

353

DispatcherGateway dispatcherGateway,

354

ScheduledExecutorService executorService,

355

Configuration configuration,

356

ClassLoader userCodeClassLoader

357

);

358

359

@Override

360

public CompletableFuture<JobClient> execute(

361

Pipeline pipeline,

362

Configuration configuration,

363

ClassLoader userCodeClassloader

364

) throws Exception;

365

}

366

367

/**

368

* Factory for embedded executors

369

*/

370

public class EmbeddedExecutorFactory implements PipelineExecutorFactory {

371

@Override

372

public String getName();

373

374

@Override

375

public boolean isCompatibleWith(Configuration configuration);

376

377

@Override

378

public PipelineExecutor getExecutor(Configuration configuration);

379

}

380

```

381

382

### Utility Classes

383

384

Utility classes for application mode operations and status monitoring.

385

386

```java { .api }

387

/**

388

* Utilities for polling job status in application mode

389

*/

390

public class JobStatusPollingUtils {

391

/**

392

* Poll job status until completion

393

* @param jobClient Job client

394

* @param scheduledExecutor Scheduled executor

395

* @param timeout Polling timeout

396

* @return Future with final job result

397

*/

398

public static CompletableFuture<JobExecutionResult> pollJobStatusUntilFinished(

399

JobClient jobClient,

400

ScheduledExecutorService scheduledExecutor,

401

Duration timeout

402

);

403

404

/**

405

* Poll job status with custom polling interval

406

* @param jobStatusSupplier Job status supplier

407

* @param scheduledExecutor Scheduled executor

408

* @param pollingInterval Polling interval

409

* @param timeout Total timeout

410

* @return Future with final job status

411

*/

412

public static CompletableFuture<JobStatus> pollJobStatus(

413

Supplier<CompletableFuture<JobStatus>> jobStatusSupplier,

414

ScheduledExecutorService scheduledExecutor,

415

Duration pollingInterval,

416

Duration timeout

417

);

418

}

419

420

/**

421

* Parser for JAR manifest files

422

*/

423

public class JarManifestParser {

424

/**

425

* Find entry class from JAR manifest

426

* @param jarFile JAR file to analyze

427

* @return Entry class name or null

428

*/

429

@Nullable

430

public static String findEntryClass(File jarFile);

431

432

/**

433

* Check if JAR contains main class

434

* @param jarFile JAR file to check

435

* @return true if main class found

436

*/

437

public static boolean hasMainClass(File jarFile);

438

}

439

```

440

441

## Types

442

443

```java { .api }

444

/**

445

* Job submission result for application mode

446

*/

447

public class JobSubmissionResult {

448

/**

449

* Get submitted job ID

450

* @return Job identifier

451

*/

452

public JobID getJobID();

453

454

/**

455

* Check if submission was successful

456

* @return true if successful

457

*/

458

public boolean isSuccess();

459

}

460

461

/**

462

* Embedded job client creator

463

*/

464

public class EmbeddedJobClientCreator {

465

/**

466

* Create embedded job client

467

* @param dispatcherGateway Dispatcher gateway

468

* @param executorService Executor service

469

* @param archivedExecutionGraph Archived execution graph

470

* @param userCodeClassLoader User code class loader

471

* @return Embedded job client

472

*/

473

public static EmbeddedJobClient create(

474

DispatcherGateway dispatcherGateway,

475

ScheduledExecutorService executorService,

476

ArchivedExecutionGraph archivedExecutionGraph,

477

ClassLoader userCodeClassLoader

478

);

479

}

480

```

481

482

## Exception Handling

483

484

Application mode operations handle specific error conditions:

485

486

- **Application Execution Errors**: `ApplicationExecutionException` for application runtime failures

487

- **Unsuccessful Execution**: `UnsuccessfulExecutionException` for failed job executions

488

- **Configuration Errors**: Invalid application configurations or missing parameters

489

- **Resource Errors**: Insufficient resources for application cluster deployment

490

- **Job Client Errors**: Communication failures with job management systems

491

492

**Error Handling Examples:**

493

494

```java

495

try {

496

ApplicationConfiguration config = ApplicationConfiguration

497

.fromConfiguration(flinkConfig)

498

.setProgramArguments("--input", "data.txt")

499

.build();

500

501

CompletableFuture<Void> applicationResult = runner.run(

502

dispatcherGateway,

503

scheduledExecutor,

504

config

505

);

506

507

applicationResult.get(); // Wait for completion

508

509

} catch (ApplicationExecutionException e) {

510

System.err.println("Application execution failed: " + e.getMessage());

511

} catch (UnsuccessfulExecutionException e) {

512

System.err.println("Job execution unsuccessful: " + e.getMessage());

513

}

514

```

515

516

Application mode provides dedicated cluster resources for long-running applications, enabling better resource isolation, simplified deployment, and optimized lifecycle management compared to session mode deployments.