or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-operations.mdcluster-management.mdindex.mdprogram-execution.mdrest-client-communication.md

cluster-management.mddocs/

0

# Cluster Management

1

2

The Apache Flink Cluster Management module (`org.apache.flink.client.deployment.*`) provides comprehensive deployment and cluster interaction services supporting multiple deployment targets through pluggable factory patterns. This module handles cluster lifecycle operations including deployment, retrieval, and management across different environments like standalone, YARN, and Kubernetes clusters.

3

4

## Core Deployment Interfaces

5

6

### ClusterClientFactory { .api }

7

8

Factory interface for creating cluster-specific clients and descriptors.

9

10

```java

11

public interface ClusterClientFactory<ClusterID> {

12

// Compatibility and configuration

13

boolean isCompatibleWith(Configuration configuration);

14

ClusterID getClusterId(Configuration configuration);

15

ClusterSpecification getClusterSpecification(Configuration configuration);

16

17

// Cluster descriptor creation

18

ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration);

19

}

20

```

21

22

### ClusterDescriptor { .api }

23

24

Interface for describing and managing cluster operations including deployment and retrieval.

25

26

```java

27

public interface ClusterDescriptor<T> extends AutoCloseable {

28

// Cluster information

29

String getClusterDescription();

30

31

// Cluster operations

32

ClusterClientProvider<T> retrieve(T clusterId);

33

ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification);

34

ClusterClientProvider<T> deployApplicationCluster(ClusterSpecification clusterSpecification,

35

ApplicationConfiguration applicationConfiguration);

36

ClusterClientProvider<T> deployJobCluster(ClusterSpecification clusterSpecification,

37

JobGraph jobGraph,

38

boolean detached);

39

40

// Cluster management

41

void killCluster(T clusterId);

42

void close();

43

}

44

```

45

46

### ClusterClientServiceLoader { .api }

47

48

Service loader interface for discovering and loading cluster client factories.

49

50

```java

51

public interface ClusterClientServiceLoader {

52

<ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(Configuration configuration);

53

}

54

```

55

56

## Implementation Classes

57

58

### DefaultClusterClientServiceLoader { .api }

59

60

Default implementation of the cluster client service loader using Java ServiceLoader mechanism.

61

62

```java

63

public class DefaultClusterClientServiceLoader implements ClusterClientServiceLoader {

64

// Service loading implementation

65

public <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(Configuration configuration) { }

66

}

67

```

68

69

### StandaloneClientFactory { .api }

70

71

Factory implementation for standalone Flink clusters.

72

73

```java

74

public class StandaloneClientFactory implements ClusterClientFactory<StandaloneClusterId> {

75

// ClusterClientFactory interface implementations

76

public boolean isCompatibleWith(Configuration configuration) { }

77

public ClusterDescriptor<StandaloneClusterId> createClusterDescriptor(Configuration configuration) { }

78

public StandaloneClusterId getClusterId(Configuration configuration) { }

79

public ClusterSpecification getClusterSpecification(Configuration configuration) { }

80

}

81

```

82

83

### StandaloneClusterDescriptor { .api }

84

85

Cluster descriptor implementation for standalone Flink clusters.

86

87

```java

88

public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterId> {

89

// ClusterDescriptor interface implementations

90

public String getClusterDescription() { }

91

public ClusterClientProvider<StandaloneClusterId> retrieve(StandaloneClusterId clusterId) { }

92

public ClusterClientProvider<StandaloneClusterId> deploySessionCluster(ClusterSpecification clusterSpecification) { }

93

public ClusterClientProvider<StandaloneClusterId> deployApplicationCluster(ClusterSpecification clusterSpecification,

94

ApplicationConfiguration applicationConfiguration) { }

95

public ClusterClientProvider<StandaloneClusterId> deployJobCluster(ClusterSpecification clusterSpecification,

96

JobGraph jobGraph,

97

boolean detached) { }

98

public void killCluster(StandaloneClusterId clusterId) { }

99

public void close() { }

100

}

101

```

102

103

### AbstractContainerizedClusterClientFactory { .api }

104

105

Abstract base class for containerized cluster client factories (YARN, Kubernetes, etc.).

106

107

```java

108

public abstract class AbstractContainerizedClusterClientFactory<ClusterID> implements ClusterClientFactory<ClusterID> {

109

// Core factory methods

110

public boolean isCompatibleWith(Configuration configuration) { }

111

public ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration) { }

112

public ClusterID getClusterId(Configuration configuration) { }

113

public ClusterSpecification getClusterSpecification(Configuration configuration) { }

114

115

// Executor factory methods

116

protected abstract PipelineExecutorFactory getExecutorFactory();

117

protected abstract ClusterID getClusterIdFromConfiguration(Configuration configuration);

118

protected abstract ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration, String configurationDirectory);

119

}

120

```

121

122

## Configuration Classes

123

124

### ClusterSpecification { .api }

125

126

Configuration class defining cluster resource specifications.

127

128

```java

129

public class ClusterSpecification {

130

// Constructor

131

public ClusterSpecification(int masterMemoryMB, int taskManagerMemoryMB, int slotsPerTaskManager) { }

132

133

// Resource access methods

134

public int getMasterMemoryMB() { }

135

public int getTaskManagerMemoryMB() { }

136

public int getSlotsPerTaskManager() { }

137

138

// Utility methods

139

public String toString() { }

140

}

141

```

142

143

### StandaloneClusterId { .api }

144

145

Identifier class for standalone clusters.

146

147

```java

148

public class StandaloneClusterId {

149

// Cluster identification for standalone deployments

150

}

151

```

152

153

## Adapter Classes

154

155

### ClusterClientJobClientAdapter { .api }

156

157

Adapter that bridges cluster clients to job clients, providing job-specific operations.

158

159

```java

160

public class ClusterClientJobClientAdapter<ClusterID> implements JobClient, CoordinationRequestGateway {

161

// Constructor

162

public ClusterClientJobClientAdapter(ClusterClientProvider<ClusterID> clusterClientProvider,

163

JobID jobId,

164

ClassLoader userCodeClassloader) { }

165

166

// JobClient interface implementations

167

public JobID getJobId() { }

168

public CompletableFuture<JobStatus> getJobStatus() { }

169

public CompletableFuture<Void> cancel() { }

170

public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, String savepointDirectory) { }

171

public CompletableFuture<String> triggerSavepoint(String savepointDirectory) { }

172

public CompletableFuture<Map<String, Object>> getAccumulators() { }

173

public CompletableFuture<JobExecutionResult> getJobExecutionResult() { }

174

175

// CoordinationRequestGateway interface implementations

176

public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) { }

177

}

178

```

179

180

## Pipeline Executors

181

182

### AbstractJobClusterExecutor { .api }

183

184

Abstract base class for job cluster pipeline executors.

185

186

```java

187

public abstract class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>>

188

implements PipelineExecutor {

189

// Common job cluster execution functionality

190

public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userClassloader) { }

191

}

192

```

193

194

### AbstractSessionClusterExecutor { .api }

195

196

Abstract base class for session cluster pipeline executors.

197

198

```java

199

public abstract class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>>

200

implements PipelineExecutor {

201

// Common session cluster execution functionality

202

public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userClassloader) { }

203

}

204

```

205

206

### LocalExecutor { .api }

207

208

Pipeline executor for local execution environment.

209

210

```java

211

public class LocalExecutor implements PipelineExecutor {

212

// Constants

213

public static final String NAME = "local";

214

215

// PipelineExecutor interface implementation

216

public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userClassloader) { }

217

}

218

```

219

220

### RemoteExecutor { .api }

221

222

Pipeline executor for remote cluster execution.

223

224

```java

225

public class RemoteExecutor extends AbstractSessionClusterExecutor<StandaloneClusterId, StandaloneClientFactory> {

226

// Constants

227

public static final String NAME = "remote";

228

229

// Remote execution implementation

230

}

231

```

232

233

## Executor Factories

234

235

### LocalExecutorFactory { .api }

236

237

Factory for creating local pipeline executors.

238

239

```java

240

public class LocalExecutorFactory implements PipelineExecutorFactory {

241

// PipelineExecutorFactory interface implementation

242

public String getName() { }

243

public boolean isCompatibleWith(Configuration configuration) { }

244

public PipelineExecutor getExecutor(Configuration configuration) { }

245

}

246

```

247

248

### RemoteExecutorFactory { .api }

249

250

Factory for creating remote pipeline executors.

251

252

```java

253

public class RemoteExecutorFactory implements PipelineExecutorFactory {

254

// PipelineExecutorFactory interface implementation

255

public String getName() { }

256

public boolean isCompatibleWith(Configuration configuration) { }

257

public PipelineExecutor getExecutor(Configuration configuration) { }

258

}

259

```

260

261

## Executor Utilities

262

263

### PipelineExecutorUtils { .api }

264

265

Utility class for pipeline executor operations.

266

267

```java

268

public class PipelineExecutorUtils {

269

// Static utility methods

270

public static CompletableFuture<JobClient> getJobClient(Pipeline pipeline,

271

Configuration configuration,

272

PipelineExecutor executor,

273

ClassLoader userCodeClassLoader) { }

274

}

275

```

276

277

## Exception Classes

278

279

### ClusterDeploymentException { .api }

280

281

Exception thrown during cluster deployment operations.

282

283

```java

284

public class ClusterDeploymentException extends FlinkException {

285

// Constructors

286

public ClusterDeploymentException(String message) { }

287

public ClusterDeploymentException(String message, Throwable cause) { }

288

public ClusterDeploymentException(Throwable cause) { }

289

}

290

```

291

292

### ClusterRetrieveException { .api }

293

294

Exception thrown when retrieving or connecting to existing clusters.

295

296

```java

297

public class ClusterRetrieveException extends FlinkException {

298

// Constructors

299

public ClusterRetrieveException(String message) { }

300

public ClusterRetrieveException(String message, Throwable cause) { }

301

public ClusterRetrieveException(Throwable cause) { }

302

}

303

```

304

305

## Usage Examples

306

307

### Basic Cluster Client Usage

308

309

```java

310

// Load cluster client factory

311

Configuration config = new Configuration();

312

config.setString("execution.target", "remote");

313

314

ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();

315

ClusterClientFactory<StandaloneClusterId> factory = serviceLoader.getClusterClientFactory(config);

316

317

// Create cluster descriptor and retrieve client

318

try (ClusterDescriptor<StandaloneClusterId> descriptor = factory.createClusterDescriptor(config)) {

319

StandaloneClusterId clusterId = factory.getClusterId(config);

320

try (ClusterClient<StandaloneClusterId> client = descriptor.retrieve(clusterId).getClusterClient()) {

321

// Use cluster client for operations

322

CompletableFuture<Collection<JobStatusMessage>> jobs = client.listJobs();

323

}

324

}

325

```

326

327

### Session Cluster Deployment

328

329

```java

330

// Define cluster specification

331

ClusterSpecification spec = new ClusterSpecification(1024, 2048, 4);

332

333

// Deploy session cluster

334

Configuration config = new Configuration();

335

ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();

336

ClusterClientFactory<StandaloneClusterId> factory = serviceLoader.getClusterClientFactory(config);

337

338

try (ClusterDescriptor<StandaloneClusterId> descriptor = factory.createClusterDescriptor(config)) {

339

ClusterClientProvider<StandaloneClusterId> provider = descriptor.deploySessionCluster(spec);

340

try (ClusterClient<StandaloneClusterId> client = provider.getClusterClient()) {

341

// Session cluster is now available for job submission

342

System.out.println("Cluster deployed at: " + client.getWebInterfaceURL());

343

}

344

}

345

```

346

347

### Application Cluster Deployment

348

349

```java

350

// Application configuration

351

ApplicationConfiguration appConfig = new ApplicationConfiguration(

352

new String[]{"--input", "/data/input", "--output", "/data/output"},

353

"com.example.MyFlinkApp"

354

);

355

356

// Cluster specification

357

ClusterSpecification spec = new ClusterSpecification(1024, 2048, 4);

358

359

// Deploy application cluster

360

try (ClusterDescriptor<StandaloneClusterId> descriptor = factory.createClusterDescriptor(config)) {

361

ClusterClientProvider<StandaloneClusterId> provider =

362

descriptor.deployApplicationCluster(spec, appConfig);

363

364

try (ClusterClient<StandaloneClusterId> client = provider.getClusterClient()) {

365

// Application cluster is deployed and running

366

CompletableFuture<JobResult> result = client.requestJobResult(jobId);

367

}

368

}

369

```

370

371

### Pipeline Executor Usage

372

373

```java

374

// Get pipeline executor

375

Configuration config = new Configuration();

376

config.setString("execution.target", "local");

377

378

PipelineExecutorServiceLoader executorLoader =

379

PipelineExecutorServiceLoader.fromConfiguration(config);

380

PipelineExecutor executor = executorLoader.getExecutor(config);

381

382

// Execute pipeline

383

Pipeline pipeline = /* your Flink program pipeline */;

384

CompletableFuture<JobClient> jobClientFuture =

385

PipelineExecutorUtils.getJobClient(pipeline, config, executor, userClassLoader);

386

387

JobClient jobClient = jobClientFuture.get();

388

```

389

390

## Application Deployment Classes

391

392

### ApplicationConfiguration { .api }

393

394

Configuration class for application cluster deployments containing program arguments and entry point information.

395

396

```java

397

public class ApplicationConfiguration {

398

// Constructors

399

public ApplicationConfiguration(String[] programArguments, String entryPointClassName) { }

400

public ApplicationConfiguration(String[] programArguments,

401

String entryPointClassName,

402

SavepointRestoreSettings savepointRestoreSettings) { }

403

404

// Property access methods

405

public String[] getProgramArguments() { }

406

public String getEntryPointClassName() { }

407

public SavepointRestoreSettings getSavepointRestoreSettings() { }

408

409

// Configuration validation

410

public void validate() { }

411

public boolean hasValidEntryPoint() { }

412

}

413

```

414

415

### ApplicationClusterEntryPoint { .api }

416

417

Entry point class for application cluster mode that manages the cluster lifecycle and application execution.

418

419

```java

420

public class ApplicationClusterEntryPoint extends ClusterEntrypoint {

421

// Main entry point

422

public static void main(String[] args) { }

423

424

// Cluster initialization methods

425

protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(Configuration configuration,

426

ScheduledExecutor scheduledExecutor) { }

427

428

protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { }

429

430

// Application-specific setup

431

protected void initializeServices(Configuration configuration) { }

432

protected ApplicationRunner createApplicationRunner() { }

433

}

434

```

435

436

### ApplicationRunner { .api }

437

438

Interface for running applications within application clusters.

439

440

```java

441

public interface ApplicationRunner {

442

// Application execution

443

CompletableFuture<Void> run(DispatcherGateway dispatcherGateway, ScheduledExecutor scheduledExecutor);

444

445

// Lifecycle management

446

void cancel();

447

boolean isCancelled();

448

}

449

```

450

451

### DetachedApplicationRunner { .api }

452

453

Implementation of ApplicationRunner for detached application execution.

454

455

```java

456

public class DetachedApplicationRunner implements ApplicationRunner {

457

// Constructor

458

public DetachedApplicationRunner(boolean enforceSingleJobExecution,

459

PackagedProgram packagedProgram,

460

Configuration configuration) { }

461

462

// ApplicationRunner interface implementation

463

public CompletableFuture<Void> run(DispatcherGateway dispatcherGateway, ScheduledExecutor scheduledExecutor) { }

464

public void cancel() { }

465

public boolean isCancelled() { }

466

467

// Program execution

468

private CompletableFuture<Void> runApplicationEntryPoint(DispatcherGateway dispatcherGateway,

469

ScheduledExecutor scheduledExecutor) { }

470

}

471

```

472

473

### EmbeddedJobClient { .api }

474

475

Job client implementation for embedded execution within application clusters.

476

477

```java

478

public class EmbeddedJobClient implements JobClient, CoordinationRequestGateway {

479

// Constructor

480

public EmbeddedJobClient(JobID jobId,

481

DispatcherGateway dispatcherGateway,

482

ClassLoader userClassloader,

483

ScheduledExecutor scheduledExecutorService) { }

484

485

// JobClient interface implementation

486

public JobID getJobId() { }

487

public CompletableFuture<JobStatus> getJobStatus() { }

488

public CompletableFuture<Void> cancel() { }

489

public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, String savepointDirectory) { }

490

public CompletableFuture<String> triggerSavepoint(String savepointDirectory) { }

491

public CompletableFuture<Map<String, Object>> getAccumulators() { }

492

public CompletableFuture<JobExecutionResult> getJobExecutionResult() { }

493

494

// CoordinationRequestGateway interface implementation

495

public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId,

496

CoordinationRequest request) { }

497

}

498

```

499

500

### WebSubmissionJobClient { .api }

501

502

Job client implementation for web-based job submissions.

503

504

```java

505

public class WebSubmissionJobClient implements JobClient {

506

// Constructor

507

public WebSubmissionJobClient(JobID jobId,

508

String restAddress,

509

int restPort,

510

Configuration configuration) { }

511

512

// JobClient interface implementation

513

public JobID getJobId() { }

514

public CompletableFuture<JobStatus> getJobStatus() { }

515

public CompletableFuture<Void> cancel() { }

516

public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, String savepointDirectory) { }

517

public CompletableFuture<String> triggerSavepoint(String savepointDirectory) { }

518

public CompletableFuture<Map<String, Object>> getAccumulators() { }

519

public CompletableFuture<JobExecutionResult> getJobExecutionResult() { }

520

521

// Web-specific operations

522

public CompletableFuture<String> getWebInterfaceURL() { }

523

public void close() { }

524

}

525

```

526

527

## Required Imports

528

529

```java

530

import org.apache.flink.client.deployment.ClusterClientFactory;

531

import org.apache.flink.client.deployment.ClusterDescriptor;

532

import org.apache.flink.client.deployment.ClusterClientServiceLoader;

533

import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;

534

import org.apache.flink.client.deployment.StandaloneClientFactory;

535

import org.apache.flink.client.deployment.StandaloneClusterDescriptor;

536

import org.apache.flink.client.deployment.AbstractContainerizedClusterClientFactory;

537

import org.apache.flink.client.deployment.ClusterSpecification;

538

import org.apache.flink.client.deployment.StandaloneClusterId;

539

import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;

540

import org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor;

541

import org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor;

542

import org.apache.flink.client.deployment.executors.LocalExecutor;

543

import org.apache.flink.client.deployment.executors.RemoteExecutor;

544

import org.apache.flink.client.deployment.executors.LocalExecutorFactory;

545

import org.apache.flink.client.deployment.executors.RemoteExecutorFactory;

546

import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;

547

import org.apache.flink.client.deployment.ClusterDeploymentException;

548

import org.apache.flink.client.deployment.ClusterRetrieveException;

549

import org.apache.flink.client.deployment.application.ApplicationConfiguration;

550

import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint;

551

import org.apache.flink.client.deployment.application.ApplicationRunner;

552

import org.apache.flink.client.deployment.application.DetachedApplicationRunner;

553

import org.apache.flink.client.deployment.application.EmbeddedJobClient;

554

import org.apache.flink.client.deployment.application.WebSubmissionJobClient;

555

import org.apache.flink.runtime.clusterframework.ClusterEntrypoint;

556

import org.apache.flink.runtime.dispatcher.DispatcherGateway;

557

import org.apache.flink.runtime.dispatcher.DispatcherResourceManagerComponentFactory;

558

import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraphStore;

559

import org.apache.flink.runtime.operators.coordination.CoordinationRequest;

560

import org.apache.flink.runtime.operators.coordination.CoordinationResponse;

561

import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;

562

import org.apache.flink.runtime.state.SavepointRestoreSettings;

563

import org.apache.flink.core.execution.PipelineExecutorFactory;

564

import org.apache.flink.client.program.ClusterClientProvider;

565

import org.apache.flink.core.execution.PipelineExecutor;

566

import org.apache.flink.core.execution.PipelineExecutorFactory;

567

import org.apache.flink.core.execution.PipelineExecutorServiceLoader;

568

import org.apache.flink.api.dag.Pipeline;

569

import org.apache.flink.api.common.JobID;

570

import org.apache.flink.runtime.jobgraph.JobGraph;

571

import org.apache.flink.runtime.jobmaster.JobResult;

572

import org.apache.flink.core.execution.JobClient;

573

import org.apache.flink.configuration.Configuration;

574

import org.apache.flink.util.FlinkException;

575

import java.util.concurrent.CompletableFuture;

576

import java.util.concurrent.ScheduledExecutor;

577

import java.util.Collection;

578

import java.util.Map;

579

```