or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-deployment.mdcli-interface.mdclient-core.mdcluster-management.mdindex.mdprogram-execution.mdrest-client.md

cluster-management.mddocs/

0

# Cluster Management

1

2

Cluster deployment and management functionality for various deployment targets including standalone, containerized, and cloud environments. Provides abstractions for deploying, retrieving, and managing Flink clusters.

3

4

## Capabilities

5

6

### Cluster Client Interface

7

8

Main interface for interacting with Flink clusters, providing job management and cluster operations.

9

10

```java { .api }

11

/**

12

* Main interface for interacting with Flink clusters

13

* @param <T> Type of cluster identifier

14

*/

15

public interface ClusterClient<T> extends AutoCloseable {

16

/**

17

* Returns the cluster identifier

18

* @return Cluster ID of generic type T

19

*/

20

T getClusterId();

21

22

/**

23

* Returns the Flink configuration for this cluster

24

* @return Configuration instance

25

*/

26

Configuration getFlinkConfiguration();

27

28

/**

29

* Shuts down the cluster

30

*/

31

void shutDownCluster();

32

33

/**

34

* Returns the web interface URL for the cluster

35

* @return URL string for web interface, may be null

36

*/

37

String getWebInterfaceURL();

38

39

/**

40

* Lists all jobs running on the cluster

41

* @return CompletableFuture containing collection of job status messages

42

* @throws Exception if listing fails

43

*/

44

CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception;

45

46

/**

47

* Submits a job to the cluster

48

* @param jobGraph JobGraph to submit

49

* @return CompletableFuture containing assigned JobID

50

*/

51

CompletableFuture<JobID> submitJob(JobGraph jobGraph);

52

53

/**

54

* Gets the status of a specific job

55

* @param jobId ID of the job to check

56

* @return CompletableFuture containing job status

57

*/

58

CompletableFuture<JobStatus> getJobStatus(JobID jobId);

59

60

/**

61

* Requests the result of a job execution

62

* @param jobId ID of the job

63

* @return CompletableFuture containing job result

64

*/

65

CompletableFuture<JobResult> requestJobResult(JobID jobId);

66

67

/**

68

* Gets job accumulators with default classloader

69

* @param jobID ID of the job

70

* @return CompletableFuture containing accumulator map

71

*/

72

CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID);

73

74

/**

75

* Gets job accumulators with specific classloader

76

* @param jobID ID of the job

77

* @param loader ClassLoader for deserialization

78

* @return CompletableFuture containing accumulator map

79

*/

80

CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);

81

82

/**

83

* Cancels a running job

84

* @param jobId ID of the job to cancel

85

* @return CompletableFuture containing acknowledgment

86

*/

87

CompletableFuture<Acknowledge> cancel(JobID jobId);

88

89

/**

90

* Cancels a job with savepoint

91

* @param jobId ID of the job to cancel

92

* @param savepointDirectory Directory to store the savepoint (nullable - uses default if null)

93

* @return CompletableFuture containing savepoint path

94

*/

95

CompletableFuture<String> cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory);

96

97

/**

98

* Stops a job with savepoint

99

* @param jobId ID of the job to stop

100

* @param advanceToEndOfEventTime Whether to advance to end of event time

101

* @param savepointDirectory Directory to store the savepoint (nullable - uses default if null)

102

* @return CompletableFuture containing savepoint path

103

*/

104

CompletableFuture<String> stopWithSavepoint(

105

JobID jobId,

106

boolean advanceToEndOfEventTime,

107

@Nullable String savepointDirectory);

108

109

/**

110

* Triggers a savepoint for a running job

111

* @param jobId ID of the job

112

* @param savepointDirectory Directory to store the savepoint (nullable - uses default if null)

113

* @return CompletableFuture containing savepoint path

114

*/

115

CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);

116

117

/**

118

* Disposes a savepoint

119

* @param savepointPath Path to the savepoint to dispose

120

* @return CompletableFuture containing acknowledgment

121

* @throws FlinkException if disposal fails

122

*/

123

CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException;

124

125

/**

126

* Sends a coordination request to an operator

127

* @param jobId ID of the job containing the operator

128

* @param operatorId ID of the operator

129

* @param request Coordination request to send

130

* @return CompletableFuture containing coordination response

131

*/

132

CompletableFuture<CoordinationResponse> sendCoordinationRequest(

133

JobID jobId,

134

OperatorID operatorId,

135

CoordinationRequest request);

136

}

137

```

138

139

**Usage Example:**

140

141

```java

142

import org.apache.flink.client.program.ClusterClient;

143

import org.apache.flink.client.deployment.StandaloneClusterDescriptor;

144

import org.apache.flink.client.deployment.StandaloneClusterId;

145

146

// Connect to standalone cluster

147

StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);

148

try (ClusterClient<StandaloneClusterId> client =

149

descriptor.retrieve(new StandaloneClusterId()).getClusterClient()) {

150

151

// Submit job

152

JobID jobId = client.submitJob(jobGraph).get();

153

154

// Monitor job status

155

JobStatus status = client.getJobStatus(jobId).get();

156

System.out.println("Job status: " + status);

157

158

// Trigger savepoint

159

String savepointPath = client.triggerSavepoint(jobId, "/path/to/savepoints").get();

160

System.out.println("Savepoint created at: " + savepointPath);

161

}

162

```

163

164

### Cluster Descriptor Interface

165

166

Interface for cluster deployment and management operations.

167

168

```java { .api }

169

/**

170

* Descriptor for cluster deployment and management

171

* @param <T> Type of cluster identifier

172

*/

173

public interface ClusterDescriptor<T> extends AutoCloseable {

174

/**

175

* Returns a description of the cluster

176

* @return String description of cluster capabilities

177

*/

178

String getClusterDescription();

179

180

/**

181

* Retrieves an existing cluster

182

* @param clusterId Identifier of the cluster to retrieve

183

* @return ClusterClientProvider for the cluster

184

* @throws ClusterRetrieveException if cluster cannot be retrieved

185

*/

186

ClusterClientProvider<T> retrieve(T clusterId) throws ClusterRetrieveException;

187

188

/**

189

* Deploys a new session cluster

190

* @param clusterSpecification Resource specification for the cluster

191

* @return ClusterClientProvider for the deployed cluster

192

* @throws ClusterDeploymentException if deployment fails

193

*/

194

ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification)

195

throws ClusterDeploymentException;

196

197

/**

198

* Deploys an application cluster

199

* @param clusterSpecification Resource specification for the cluster

200

* @param applicationConfiguration Application-specific configuration

201

* @return ClusterClientProvider for the deployed cluster

202

* @throws ClusterDeploymentException if deployment fails

203

*/

204

ClusterClientProvider<T> deployApplicationCluster(

205

ClusterSpecification clusterSpecification,

206

ApplicationConfiguration applicationConfiguration)

207

throws ClusterDeploymentException;

208

209

/**

210

* Deploys a job-specific cluster

211

* @param clusterSpecification Resource specification for the cluster

212

* @param jobGraph Job graph to deploy

213

* @param detached Whether to run in detached mode

214

* @return ClusterClientProvider for the deployed cluster

215

* @throws ClusterDeploymentException if deployment fails

216

*/

217

ClusterClientProvider<T> deployJobCluster(

218

ClusterSpecification clusterSpecification,

219

JobGraph jobGraph,

220

boolean detached) throws ClusterDeploymentException;

221

222

/**

223

* Kills an existing cluster

224

* @param clusterId Identifier of the cluster to kill

225

* @throws FlinkException if killing fails

226

*/

227

void killCluster(T clusterId) throws FlinkException;

228

}

229

```

230

231

### Cluster Client Factory

232

233

Factory interface for creating cluster clients and descriptors.

234

235

```java { .api }

236

/**

237

* Factory for creating cluster clients and descriptors

238

* @param <ClusterID> Type of cluster identifier

239

*/

240

public interface ClusterClientFactory<ClusterID> {

241

/**

242

* Checks if this factory is compatible with the given configuration

243

* @param configuration Flink configuration to check

244

* @return true if compatible, false otherwise

245

*/

246

boolean isCompatibleWith(Configuration configuration);

247

248

/**

249

* Creates a cluster descriptor for the given configuration

250

* @param configuration Flink configuration

251

* @return ClusterDescriptor instance

252

*/

253

ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration);

254

255

/**

256

* Extracts cluster ID from configuration

257

* @param configuration Flink configuration

258

* @return Cluster ID instance

259

*/

260

ClusterID getClusterId(Configuration configuration);

261

262

/**

263

* Creates cluster specification from configuration

264

* @param configuration Flink configuration

265

* @return ClusterSpecification instance

266

*/

267

ClusterSpecification getClusterSpecification(Configuration configuration);

268

}

269

```

270

271

### Cluster Client Provider

272

273

Provider interface for accessing cluster clients.

274

275

```java { .api }

276

/**

277

* Provider for cluster clients

278

* @param <T> Type of cluster identifier

279

*/

280

public interface ClusterClientProvider<T> {

281

/**

282

* Gets the cluster client instance

283

* @return ClusterClient instance for this cluster

284

*/

285

ClusterClient<T> getClusterClient();

286

287

/**

288

* Closes the provider and releases resources

289

*/

290

void close();

291

}

292

```

293

294

### Cluster Specification

295

296

Specification for cluster resources and configuration.

297

298

```java { .api }

299

/**

300

* Specification for cluster resources

301

*/

302

public class ClusterSpecification {

303

/**

304

* Gets the master/JobManager memory in MB

305

* @return Memory in MB

306

*/

307

public int getMasterMemoryMB();

308

309

/**

310

* Gets the TaskManager memory in MB

311

* @return Memory in MB

312

*/

313

public int getTaskManagerMemoryMB();

314

315

/**

316

* Gets the number of slots per TaskManager

317

* @return Number of slots

318

*/

319

public int getSlotsPerTaskManager();

320

321

/**

322

* Builder for creating ClusterSpecification instances

323

*/

324

public static class ClusterSpecificationBuilder {

325

/**

326

* Sets master memory in MB

327

* @param masterMemoryMB Memory in MB

328

* @return This builder instance

329

*/

330

public ClusterSpecificationBuilder setMasterMemoryMB(int masterMemoryMB);

331

332

/**

333

* Sets TaskManager memory in MB

334

* @param taskManagerMemoryMB Memory in MB

335

* @return This builder instance

336

*/

337

public ClusterSpecificationBuilder setTaskManagerMemoryMB(int taskManagerMemoryMB);

338

339

/**

340

* Sets slots per TaskManager

341

* @param slotsPerTaskManager Number of slots

342

* @return This builder instance

343

*/

344

public ClusterSpecificationBuilder setSlotsPerTaskManager(int slotsPerTaskManager);

345

346

/**

347

* Creates the ClusterSpecification

348

* @return ClusterSpecification instance

349

*/

350

public ClusterSpecification createClusterSpecification();

351

}

352

}

353

```

354

355

**Usage Example:**

356

357

```java

358

import org.apache.flink.client.deployment.ClusterSpecification;

359

360

// Create cluster specification

361

ClusterSpecification spec = new ClusterSpecification.ClusterSpecificationBuilder()

362

.setMasterMemoryMB(1024)

363

.setTaskManagerMemoryMB(2048)

364

.setSlotsPerTaskManager(4)

365

.createClusterSpecification();

366

367

// Deploy session cluster

368

ClusterDescriptor<StandaloneClusterId> descriptor =

369

new StandaloneClusterDescriptor(config);

370

ClusterClientProvider<StandaloneClusterId> provider =

371

descriptor.deploySessionCluster(spec);

372

```

373

374

### Standalone Cluster Support

375

376

Implementations for standalone cluster deployment and management.

377

378

```java { .api }

379

/**

380

* Factory for standalone cluster clients

381

*/

382

public class StandaloneClientFactory implements ClusterClientFactory<StandaloneClusterId> {

383

@Override

384

public boolean isCompatibleWith(Configuration configuration);

385

386

@Override

387

public ClusterDescriptor<StandaloneClusterId> createClusterDescriptor(Configuration configuration);

388

389

@Override

390

public StandaloneClusterId getClusterId(Configuration configuration);

391

392

@Override

393

public ClusterSpecification getClusterSpecification(Configuration configuration);

394

}

395

396

/**

397

* Descriptor for standalone clusters

398

*/

399

public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterId> {

400

public StandaloneClusterDescriptor(Configuration flinkConfiguration);

401

402

// Implements all ClusterDescriptor methods

403

}

404

405

/**

406

* Identifier for standalone clusters

407

*/

408

public class StandaloneClusterId {

409

public StandaloneClusterId();

410

}

411

```

412

413

### Mini Cluster Support

414

415

Client implementation for mini clusters used in testing and local development.

416

417

```java { .api }

418

/**

419

* Client for mini clusters (testing/local development)

420

*/

421

public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniClusterId> {

422

423

/**

424

* Identifier for mini clusters

425

*/

426

public static class MiniClusterId {

427

// Mini cluster identifier implementation

428

}

429

430

// Implements all ClusterClient methods for mini cluster

431

}

432

```

433

434

## Types

435

436

```java { .api }

437

public interface ClusterClientServiceLoader {

438

Stream<ClusterClientFactory<ClusterID>> getClusterClientFactories();

439

}

440

441

public class DefaultClusterClientServiceLoader implements ClusterClientServiceLoader {

442

@Override

443

public Stream<ClusterClientFactory<ClusterID>> getClusterClientFactories();

444

}

445

446

public class JobStatusMessage {

447

public JobID getJobId();

448

public String getJobName();

449

public JobStatus getJobStatus();

450

public long getStartTime();

451

}

452

453

public interface Acknowledge {

454

// Acknowledgment marker interface

455

}

456

457

public interface CoordinationRequest {

458

// Base interface for coordination requests

459

}

460

461

public interface CoordinationResponse {

462

// Base interface for coordination responses

463

}

464

465

public class OperatorID {

466

public static OperatorID generate();

467

public static OperatorID fromHexString(String hexString);

468

}

469

470

// Exception Types

471

public class ClusterDeploymentException extends FlinkException {

472

public ClusterDeploymentException(String message);

473

public ClusterDeploymentException(String message, Throwable cause);

474

}

475

476

public class ClusterRetrieveException extends FlinkException {

477

public ClusterRetrieveException(String message);

478

public ClusterRetrieveException(String message, Throwable cause);

479

}

480

481

public abstract class AbstractContainerizedClusterClientFactory<ClusterID>

482

implements ClusterClientFactory<ClusterID> {

483

// Base class for containerized cluster client factories

484

}

485

486

public class ClusterClientJobClientAdapter<ClusterID> implements JobClient {

487

public ClusterClientJobClientAdapter(

488

ClusterClientProvider<ClusterID> clusterClientProvider,

489

JobID jobId,

490

ClassLoader userCodeClassloader);

491

492

// Implements JobClient interface by adapting ClusterClient calls

493

}

494

```