or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-mode.mdartifact-management.mdcli-frontend.mdcluster-client.mddeployment-management.mdindex.mdprogram-packaging.md

deployment-management.mddocs/

0

# Deployment Management

1

2

Cluster deployment abstraction supporting multiple deployment targets including standalone, YARN, Kubernetes, and other containerized environments with pluggable factory pattern and resource specification.

3

4

## Capabilities

5

6

### Cluster Descriptor Interface

7

8

Core interface for cluster descriptors that manage cluster lifecycle including deployment, retrieval, and termination.

9

10

```java { .api }

11

/**

12

* Interface for cluster descriptors that manage cluster lifecycle

13

* @param <T> Type of cluster identifier

14

*/

15

public interface ClusterDescriptor<T> extends AutoCloseable {

16

/**

17

* Retrieve an existing cluster by ID

18

* @param clusterId Cluster identifier

19

* @return Cluster client provider for the existing cluster

20

* @throws ClusterRetrieveException if cluster cannot be retrieved

21

*/

22

ClusterClientProvider<T> retrieve(T clusterId) throws ClusterRetrieveException;

23

24

/**

25

* Deploy a new session cluster

26

* @param clusterSpecification Resource specification for the cluster

27

* @return Cluster client provider for the new cluster

28

* @throws ClusterDeploymentException if deployment fails

29

*/

30

ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification)

31

throws ClusterDeploymentException;

32

33

/**

34

* Deploy an application cluster (application mode)

35

* @param clusterSpecification Resource specification for the cluster

36

* @param applicationConfiguration Application configuration

37

* @return Cluster client provider for the application cluster

38

* @throws ClusterDeploymentException if deployment fails

39

*/

40

default ClusterClientProvider<T> deployApplicationCluster(

41

ClusterSpecification clusterSpecification,

42

ApplicationConfiguration applicationConfiguration

43

) throws ClusterDeploymentException {

44

throw new UnsupportedOperationException(

45

"Application mode is not supported by this cluster descriptor."

46

);

47

}

48

49

/**

50

* Terminate the given cluster

51

* @param clusterId Cluster identifier

52

* @throws FlinkException if termination fails

53

*/

54

void terminateCluster(T clusterId) throws FlinkException;

55

56

/**

57

* Close the cluster descriptor and release resources

58

*/

59

@Override

60

void close() throws Exception;

61

}

62

```

63

64

**Usage Examples:**

65

66

```java

67

import org.apache.flink.client.deployment.ClusterDescriptor;

68

import org.apache.flink.client.deployment.ClusterSpecification;

69

import org.apache.flink.client.deployment.StandaloneClusterDescriptor;

70

import org.apache.flink.client.deployment.StandaloneClusterId;

71

72

// Create cluster specification

73

ClusterSpecification spec = new ClusterSpecification.Builder()

74

.setMasterMemoryMB(1024)

75

.setTaskManagerMemoryMB(2048)

76

.setNumberTaskManagers(2)

77

.createClusterSpecification();

78

79

// Deploy session cluster

80

try (ClusterDescriptor<StandaloneClusterId> descriptor =

81

new StandaloneClusterDescriptor(config, highAvailabilityServices, rpcService)) {

82

83

ClusterClientProvider<StandaloneClusterId> provider =

84

descriptor.deploySessionCluster(spec);

85

86

try (ClusterClient<StandaloneClusterId> client = provider.getClusterClient()) {

87

System.out.println("Cluster deployed: " + client.getClusterId());

88

System.out.println("Web UI: " + client.getWebInterfaceURL());

89

90

// Use cluster...

91

92

} finally {

93

provider.close();

94

}

95

}

96

```

97

98

### Cluster Client Factory

99

100

Factory interface for creating cluster clients with automatic deployment target detection based on configuration.

101

102

```java { .api }

103

/**

104

* Factory for creating cluster clients

105

* @param <T> Type of cluster identifier

106

*/

107

public interface ClusterClientFactory<T> {

108

/**

109

* Check if this factory is compatible with the given configuration

110

* @param configuration Flink configuration

111

* @return true if compatible

112

*/

113

boolean isCompatibleWith(Configuration configuration);

114

115

/**

116

* Create cluster descriptor from configuration

117

* @param configuration Flink configuration

118

* @return Cluster descriptor instance

119

*/

120

ClusterDescriptor<T> createClusterDescriptor(Configuration configuration);

121

122

/**

123

* Get cluster ID from configuration

124

* @param configuration Flink configuration

125

* @return Cluster identifier or null if not specified

126

*/

127

@Nullable

128

T getClusterId(Configuration configuration);

129

130

/**

131

* Get cluster specification from configuration

132

* @param configuration Flink configuration

133

* @return Cluster specification

134

*/

135

ClusterSpecification getClusterSpecification(Configuration configuration);

136

}

137

```

138

139

### Cluster Client Service Loader

140

141

Service loader for discovering and loading cluster client factories dynamically.

142

143

```java { .api }

144

/**

145

* Service loader for cluster client factories

146

*/

147

public interface ClusterClientServiceLoader {

148

/**

149

* Get cluster client factory compatible with configuration

150

* @param configuration Flink configuration

151

* @return Compatible cluster client factory

152

* @throws UnsupportedOperationException if no compatible factory found

153

*/

154

<T> ClusterClientFactory<T> getClusterClientFactory(Configuration configuration);

155

}

156

157

/**

158

* Default implementation of cluster client service loader

159

*/

160

public class DefaultClusterClientServiceLoader implements ClusterClientServiceLoader {

161

/**

162

* Create service loader instance

163

*/

164

public DefaultClusterClientServiceLoader();

165

166

@Override

167

public <T> ClusterClientFactory<T> getClusterClientFactory(Configuration configuration);

168

}

169

```

170

171

### Standalone Deployment

172

173

Implementations for standalone cluster deployment and management.

174

175

```java { .api }

176

/**

177

* Cluster descriptor for standalone clusters

178

*/

179

public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterId> {

180

/**

181

* Create standalone cluster descriptor

182

* @param flinkConfig Flink configuration

183

* @param haServices High availability services

184

* @param rpcService RPC service

185

*/

186

public StandaloneClusterDescriptor(

187

Configuration flinkConfig,

188

HighAvailabilityServices haServices,

189

RpcService rpcService

190

);

191

192

@Override

193

public ClusterClientProvider<StandaloneClusterId> retrieve(StandaloneClusterId clusterId)

194

throws ClusterRetrieveException;

195

196

@Override

197

public ClusterClientProvider<StandaloneClusterId> deploySessionCluster(

198

ClusterSpecification clusterSpecification

199

) throws ClusterDeploymentException;

200

}

201

202

/**

203

* Factory for standalone cluster clients

204

*/

205

public class StandaloneClientFactory implements ClusterClientFactory<StandaloneClusterId> {

206

@Override

207

public boolean isCompatibleWith(Configuration configuration);

208

209

@Override

210

public ClusterDescriptor<StandaloneClusterId> createClusterDescriptor(

211

Configuration configuration

212

);

213

214

@Override

215

public StandaloneClusterId getClusterId(Configuration configuration);

216

}

217

218

/**

219

* Cluster ID for standalone clusters

220

*/

221

public class StandaloneClusterId {

222

/**

223

* Create standalone cluster ID

224

*/

225

public StandaloneClusterId();

226

227

@Override

228

public String toString();

229

230

@Override

231

public boolean equals(Object obj);

232

233

@Override

234

public int hashCode();

235

}

236

```

237

238

### Abstract Containerized Factory

239

240

Base class for containerized cluster client factories providing common functionality for Docker, Kubernetes, and other container-based deployments.

241

242

```java { .api }

243

/**

244

* Base factory for containerized cluster clients

245

* @param <ClusterID> Type of cluster identifier

246

* @param <ApplicationClusterID> Type of application cluster identifier

247

*/

248

public abstract class AbstractContainerizedClusterClientFactory<

249

ClusterID, ApplicationClusterID> implements ClusterClientFactory<ClusterID> {

250

251

/**

252

* Get deployment target for this factory

253

* @return Deployment target string

254

*/

255

protected abstract String getDeploymentTargetName();

256

257

/**

258

* Check if cluster ID is compatible

259

* @param clusterId Cluster ID to check

260

* @return true if compatible

261

*/

262

protected abstract boolean isCompatibleWith(ClusterID clusterId);

263

264

@Override

265

public boolean isCompatibleWith(Configuration configuration);

266

267

@Override

268

public ClusterSpecification getClusterSpecification(Configuration configuration);

269

}

270

```

271

272

### Cluster Specification

273

274

Resource specification for cluster deployment including memory, CPU, and scaling parameters.

275

276

```java { .api }

277

/**

278

* Specification for cluster resource requirements

279

*/

280

public class ClusterSpecification {

281

/**

282

* Get master memory in MB

283

* @return Master memory size

284

*/

285

public int getMasterMemoryMB();

286

287

/**

288

* Get task manager memory in MB

289

* @return Task manager memory size

290

*/

291

public int getTaskManagerMemoryMB();

292

293

/**

294

* Get number of task managers

295

* @return Number of task managers

296

*/

297

public int getNumberTaskManagers();

298

299

/**

300

* Get slots per task manager

301

* @return Number of slots per task manager

302

*/

303

public int getSlotsPerTaskManager();

304

305

/**

306

* Builder for creating cluster specifications

307

*/

308

public static class Builder {

309

/**

310

* Set master memory size

311

* @param masterMemoryMB Memory in MB

312

* @return Builder instance

313

*/

314

public Builder setMasterMemoryMB(int masterMemoryMB);

315

316

/**

317

* Set task manager memory size

318

* @param taskManagerMemoryMB Memory in MB

319

* @return Builder instance

320

*/

321

public Builder setTaskManagerMemoryMB(int taskManagerMemoryMB);

322

323

/**

324

* Set number of task managers

325

* @param numberTaskManagers Number of task managers

326

* @return Builder instance

327

*/

328

public Builder setNumberTaskManagers(int numberTaskManagers);

329

330

/**

331

* Set slots per task manager

332

* @param slotsPerTaskManager Number of slots

333

* @return Builder instance

334

*/

335

public Builder setSlotsPerTaskManager(int slotsPerTaskManager);

336

337

/**

338

* Create cluster specification

339

* @return Configured cluster specification

340

*/

341

public ClusterSpecification createClusterSpecification();

342

}

343

}

344

```

345

346

## Types

347

348

```java { .api }

349

/**

350

* Provider for cluster clients with resource management

351

* @param <T> Type of cluster identifier

352

*/

353

public interface ClusterClientProvider<T> extends AutoCloseable {

354

/**

355

* Get cluster client instance

356

* @return Cluster client

357

*/

358

ClusterClient<T> getClusterClient();

359

360

/**

361

* Get cluster identifier

362

* @return Cluster ID

363

*/

364

T getClusterId();

365

366

/**

367

* Check if cluster is in per-job mode

368

* @return true if per-job mode

369

*/

370

default boolean isPerJobMode() {

371

return false;

372

}

373

374

@Override

375

void close() throws Exception;

376

}

377

```

378

379

## Exception Handling

380

381

Deployment operations handle various error conditions:

382

383

- **Deployment Errors**: `ClusterDeploymentException` for failed cluster deployments

384

- **Retrieval Errors**: `ClusterRetrieveException` for failed cluster retrieval

385

- **Configuration Errors**: Invalid cluster specifications or missing configuration

386

- **Resource Errors**: Insufficient resources for cluster deployment

387

- **Network Errors**: Communication failures with cluster management systems

388

389

**Error Handling Examples:**

390

391

```java

392

try {

393

ClusterClientProvider<StandaloneClusterId> provider =

394

descriptor.deploySessionCluster(spec);

395

396

// Use cluster...

397

398

} catch (ClusterDeploymentException e) {

399

System.err.println("Failed to deploy cluster: " + e.getMessage());

400

// Handle deployment failure

401

} catch (ClusterRetrieveException e) {

402

System.err.println("Failed to retrieve cluster: " + e.getMessage());

403

// Handle retrieval failure

404

} finally {

405

descriptor.close();

406

}

407

```

408

409

The deployment management system provides a pluggable architecture that allows Flink to support multiple deployment targets through a consistent interface, enabling seamless switching between different cluster types based on configuration.