or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-mode.mdartifact-management.mdcli-frontend.mdcluster-client.mddeployment-management.mdindex.mdprogram-packaging.md

cluster-client.mddocs/

0

# Cluster Client Management

1

2

Core interface for programmatic cluster interaction, supporting job submission, status monitoring, and cluster lifecycle management across different deployment targets including standalone, REST-based, and mini clusters.

3

4

## Capabilities

5

6

### Cluster Client Interface

7

8

Main interface for communicating with Flink clusters, providing comprehensive job and cluster management capabilities.

9

10

```java { .api }

11

/**

12

* Interface for cluster clients that communicate with Flink clusters

13

* @param <T> Type of cluster identifier

14

*/

15

public interface ClusterClient<T> extends AutoCloseable {

16

/**

17

* Close the cluster client and release resources

18

*/

19

@Override

20

void close();

21

22

/**

23

* Get the cluster identifier

24

* @return Cluster ID identifying the connected cluster

25

*/

26

T getClusterId();

27

28

/**

29

* Get the Flink configuration used by this client

30

* @return Flink configuration object

31

*/

32

Configuration getFlinkConfiguration();

33

34

/**

35

* Shut down the cluster that this client communicates with

36

*/

37

void shutDownCluster();

38

39

/**

40

* Get URL to the cluster web interface

41

* @return Web interface URL as string

42

*/

43

String getWebInterfaceURL();

44

45

/**

46

* List all jobs on the cluster (running and finished)

47

* @return Future collection of job status messages

48

* @throws Exception if connection fails

49

*/

50

CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception;

51

52

/**

53

* Submit an execution plan to the cluster

54

* @param executionPlan Execution plan to submit

55

* @return Future with assigned job ID

56

*/

57

CompletableFuture<JobID> submitJob(ExecutionPlan executionPlan);

58

59

/**

60

* Get status of a specific job

61

* @param jobId ID of the job to query

62

* @return Future with job status

63

*/

64

CompletableFuture<JobStatus> getJobStatus(JobID jobId);

65

66

/**

67

* Cancel a running job

68

* @param jobId ID of the job to cancel

69

* @return Future with acknowledgment

70

*/

71

CompletableFuture<Acknowledge> cancel(JobID jobId);

72

73

/**

74

* Stop a job with optional savepoint

75

* @param jobId ID of the job to stop

76

* @param advanceToEndOfEventTime Whether to advance to end of event time

77

* @param savepointDir Optional savepoint directory

78

* @param formatType Optional savepoint format type

79

* @return Future with savepoint path

80

*/

81

CompletableFuture<String> stopWithSavepoint(

82

JobID jobId,

83

boolean advanceToEndOfEventTime,

84

@Nullable String savepointDir,

85

SavepointFormatType formatType

86

);

87

88

/**

89

* Trigger a savepoint for a running job

90

* @param jobId ID of the job

91

* @param savepointDir Target directory for savepoint

92

* @param formatType Savepoint format type

93

* @return Future with savepoint path

94

*/

95

CompletableFuture<String> triggerSavepoint(

96

JobID jobId,

97

@Nullable String savepointDir,

98

SavepointFormatType formatType

99

);

100

101

/**

102

* Dispose a savepoint

103

* @param savepointPath Path to the savepoint to dispose

104

* @return Future with acknowledgment

105

* @throws FlinkException if disposal fails

106

*/

107

CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException;

108

109

/**

110

* Cancel a job and trigger a savepoint

111

* @param jobId ID of the job to cancel

112

* @param savepointDirectory Directory for the savepoint

113

* @param formatType Savepoint format type

114

* @return Future with savepoint path

115

*/

116

CompletableFuture<String> cancelWithSavepoint(

117

JobID jobId,

118

@Nullable String savepointDirectory,

119

SavepointFormatType formatType

120

);

121

122

/**

123

* Trigger a detached savepoint (returns quickly with trigger ID)

124

* @param jobId ID of the job

125

* @param savepointDirectory Target directory for savepoint

126

* @param formatType Savepoint format type

127

* @return Future with savepoint trigger ID

128

*/

129

CompletableFuture<String> triggerDetachedSavepoint(

130

JobID jobId,

131

@Nullable String savepointDirectory,

132

SavepointFormatType formatType

133

);

134

135

/**

136

* Trigger a checkpoint for a job

137

* @param jobId ID of the job

138

* @param checkpointType Type of checkpoint (configured/full/incremental)

139

* @return Future with checkpoint ID

140

*/

141

CompletableFuture<Long> triggerCheckpoint(JobID jobId, CheckpointType checkpointType);

142

143

/**

144

* Get accumulators for a job

145

* @param jobID Job identifier

146

* @param loader Class loader for deserializing results

147

* @return Future with accumulator map

148

*/

149

CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);

150

}

151

```

152

153

**Usage Examples:**

154

155

```java

156

import org.apache.flink.client.program.ClusterClient;

157

import org.apache.flink.client.program.rest.RestClusterClient;

158

import org.apache.flink.client.program.rest.RestClusterClientConfiguration;

159

160

// Create and use a REST cluster client

161

Configuration config = new Configuration();

162

config.setString(RestOptions.ADDRESS, "localhost");

163

config.setInteger(RestOptions.PORT, 8081);

164

165

RestClusterClientConfiguration clientConfig =

166

RestClusterClientConfiguration.fromConfiguration(config);

167

168

try (ClusterClient<?> client = new RestClusterClient<>(clientConfig, "default")) {

169

// List all jobs

170

Collection<JobStatusMessage> jobs = client.listJobs().get();

171

System.out.println("Found " + jobs.size() + " jobs");

172

173

// Get job status

174

JobID jobId = JobID.fromHexString("a1b2c3d4e5f6");

175

JobStatus status = client.getJobStatus(jobId).get();

176

System.out.println("Job status: " + status);

177

178

// Trigger savepoint

179

String savepointPath = client.triggerSavepoint(

180

jobId,

181

"/path/to/savepoints",

182

SavepointFormatType.CANONICAL

183

).get();

184

System.out.println("Savepoint created: " + savepointPath);

185

}

186

```

187

188

### REST Cluster Client

189

190

REST-based implementation of cluster client for communication with Flink clusters via REST API.

191

192

```java { .api }

193

/**

194

* REST-based cluster client implementation

195

*/

196

public class RestClusterClient<T> implements ClusterClient<T> {

197

/**

198

* Create REST cluster client with configuration and cluster ID

199

* @param configuration REST client configuration

200

* @param clusterId Cluster identifier

201

*/

202

public RestClusterClient(

203

RestClusterClientConfiguration configuration,

204

T clusterId

205

);

206

207

/**

208

* Create REST cluster client with configuration, cluster ID, and thread pool size

209

* @param configuration REST client configuration

210

* @param clusterId Cluster identifier

211

* @param maxRetryAttempts Maximum retry attempts for requests

212

*/

213

public RestClusterClient(

214

RestClusterClientConfiguration configuration,

215

T clusterId,

216

int maxRetryAttempts

217

);

218

}

219

220

/**

221

* Configuration for REST cluster clients

222

*/

223

public class RestClusterClientConfiguration {

224

/**

225

* Create configuration from Flink configuration

226

* @param config Flink configuration

227

* @return REST client configuration

228

*/

229

public static RestClusterClientConfiguration fromConfiguration(Configuration config);

230

231

/**

232

* Get REST server endpoint

233

* @return REST endpoint configuration

234

*/

235

public RestServerEndpointConfiguration getRestServerEndpointConfiguration();

236

237

/**

238

* Get executor service for async operations

239

* @return Scheduled executor service

240

*/

241

public ScheduledExecutorService getExecutorService();

242

}

243

```

244

245

### Mini Cluster Client

246

247

Cluster client implementation for mini clusters used in testing and local development.

248

249

```java { .api }

250

/**

251

* Cluster client for mini clusters

252

*/

253

public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniClusterId> {

254

/**

255

* Create mini cluster client

256

* @param configuration Flink configuration

257

* @param miniCluster Mini cluster instance

258

*/

259

public MiniClusterClient(Configuration configuration, MiniCluster miniCluster);

260

261

/**

262

* Cluster ID type for mini clusters

263

*/

264

public static class MiniClusterId {

265

public MiniClusterId();

266

}

267

}

268

```

269

270

### Cluster Client Provider

271

272

Provider interface for cluster clients, enabling lazy creation and resource management.

273

274

```java { .api }

275

/**

276

* Provider interface for cluster clients

277

* @param <T> Type of cluster identifier

278

*/

279

public interface ClusterClientProvider<T> extends AutoCloseable {

280

/**

281

* Get the cluster client instance

282

* @return Cluster client

283

*/

284

ClusterClient<T> getClusterClient();

285

286

/**

287

* Get the cluster ID

288

* @return Cluster identifier

289

*/

290

T getClusterId();

291

292

/**

293

* Close the provider and release resources

294

*/

295

@Override

296

void close() throws Exception;

297

}

298

```

299

300

### Cluster Client Job Client Adapter

301

302

Adapter that bridges between cluster client and job client interfaces for unified job management.

303

304

```java { .api }

305

/**

306

* Adapter between cluster client and job client interfaces

307

* @param <T> Type of cluster identifier

308

*/

309

public class ClusterClientJobClientAdapter<T> implements JobClient {

310

/**

311

* Create adapter with cluster client and job ID

312

* @param clusterClient Cluster client instance

313

* @param jobID Job identifier

314

* @param userCodeClassLoader User code class loader

315

*/

316

public ClusterClientJobClientAdapter(

317

ClusterClientProvider<T> clusterClient,

318

JobID jobID,

319

ClassLoader userCodeClassLoader

320

);

321

322

@Override

323

public JobID getJobID();

324

325

@Override

326

public CompletableFuture<JobStatus> getJobStatus();

327

328

@Override

329

public CompletableFuture<Void> cancel();

330

331

@Override

332

public CompletableFuture<String> stopWithSavepoint(

333

boolean advanceToEndOfEventTime,

334

@Nullable String savepointDir,

335

SavepointFormatType formatType

336

);

337

338

@Override

339

public CompletableFuture<String> triggerSavepoint(

340

@Nullable String savepointDir,

341

SavepointFormatType formatType

342

);

343

344

@Override

345

public CompletableFuture<JobExecutionResult> getJobExecutionResult();

346

}

347

```

348

349

### Retry Strategies

350

351

Configurable retry mechanisms for REST operations with exponential backoff support.

352

353

```java { .api }

354

/**

355

* Interface for wait strategies in retry logic

356

*/

357

public interface WaitStrategy {

358

/**

359

* Calculate wait time for given attempt

360

* @param attempt Current attempt number (starting from 1)

361

* @return Wait duration

362

*/

363

Duration calculateWaitTime(int attempt);

364

}

365

366

/**

367

* Exponential backoff wait strategy

368

*/

369

public class ExponentialWaitStrategy implements WaitStrategy {

370

/**

371

* Create exponential wait strategy

372

* @param initialWait Initial wait duration

373

* @param maxWait Maximum wait duration

374

* @param multiplier Backoff multiplier

375

*/

376

public ExponentialWaitStrategy(

377

Duration initialWait,

378

Duration maxWait,

379

double multiplier

380

);

381

382

@Override

383

public Duration calculateWaitTime(int attempt);

384

}

385

```

386

387

## Types

388

389

```java { .api }

390

/**

391

* Job status message containing job information

392

*/

393

public class JobStatusMessage {

394

/**

395

* Get job ID

396

* @return Job identifier

397

*/

398

public JobID getJobId();

399

400

/**

401

* Get job name

402

* @return Job name

403

*/

404

public String getJobName();

405

406

/**

407

* Get current job state

408

* @return Job status enum

409

*/

410

public JobStatus getJobState();

411

412

/**

413

* Get job start time

414

* @return Start time as timestamp

415

*/

416

public long getStartTime();

417

}

418

419

/**

420

* Job execution result containing final job information

421

*/

422

public class JobExecutionResult {

423

public JobID getJobID();

424

public long getNetRuntime();

425

public Map<String, Object> getAllAccumulatorResults();

426

}

427

```

428

429

## Error Handling

430

431

Cluster client operations handle various error conditions:

432

433

- **Connection Errors**: Network failures, cluster unavailability

434

- **Authentication Errors**: Security configuration issues

435

- **Job Errors**: Invalid job submissions, job not found

436

- **Resource Errors**: Insufficient cluster resources

437

- **Timeout Errors**: Long-running operations exceeding timeouts

438

439

All asynchronous operations return `CompletableFuture` instances that can be composed and handle exceptions appropriately using `handle()`, `exceptionally()`, or `whenComplete()` methods.