or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-operations.mdcluster-management.mdindex.mdprogram-execution.mdrest-client-communication.md

index.mddocs/

0

# Apache Flink Client APIs (flink-clients)

1

2

## Package Overview

3

4

The Apache Flink flink-clients module provides comprehensive client-side APIs and utilities for interacting with Apache Flink clusters. This module serves as the primary programmatic interface for submitting and managing Flink jobs, offering functionality for packaging user programs, translating execution plans, managing cluster connections, and handling job lifecycle operations across different deployment scenarios.

5

6

Key features include:

7

- Command-line interface (CLI) for interactive job management

8

- Cluster deployment and management utilities

9

- Program packaging and execution abstractions

10

- Pipeline translation services for converting high-level Flink programs into executable job graphs

11

- REST client communication for remote cluster interaction

12

- Support for standalone, YARN, Kubernetes, and local execution environments

13

14

## Package Information

15

16

**Maven Coordinates:**

17

```xml

18

<dependency>

19

<groupId>org.apache.flink</groupId>

20

<artifactId>flink-clients_2.11</artifactId>

21

<version>1.14.6</version>

22

</dependency>

23

```

24

25

**Requirements:**

26

- Java 8 or higher

27

- Apache Flink runtime (same version)

28

29

**Documentation:** https://flink.apache.org/

30

31

**License:** Apache License 2.0

32

33

## Core Imports

34

35

### Essential Types

36

```java

37

// Core client utilities

38

import org.apache.flink.client.ClientUtils;

39

import org.apache.flink.client.FlinkPipelineTranslationUtil;

40

import org.apache.flink.client.FlinkPipelineTranslator;

41

42

// CLI and deployment

43

import org.apache.flink.client.cli.CliFrontend;

44

import org.apache.flink.client.cli.CustomCommandLine;

45

import org.apache.flink.client.deployment.ClusterClientServiceLoader;

46

import org.apache.flink.client.deployment.ClusterClientFactory;

47

import org.apache.flink.client.deployment.ClusterDescriptor;

48

49

// Program execution

50

import org.apache.flink.client.program.ClusterClient;

51

import org.apache.flink.client.program.PackagedProgram;

52

import org.apache.flink.client.program.ClusterClientProvider;

53

54

// Application deployment

55

import org.apache.flink.client.deployment.application.ApplicationConfiguration;

56

import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint;

57

58

// REST communication

59

import org.apache.flink.client.program.rest.RestClusterClient;

60

import org.apache.flink.client.program.rest.RestClusterClientConfiguration;

61

62

// Configuration and exceptions

63

import org.apache.flink.configuration.Configuration;

64

import org.apache.flink.api.common.JobID;

65

import org.apache.flink.runtime.jobgraph.JobGraph;

66

```

67

68

### Maven Dependency

69

```xml

70

<dependency>

71

<groupId>org.apache.flink</groupId>

72

<artifactId>flink-clients_2.11</artifactId>

73

<version>1.14.6</version>

74

</dependency>

75

```

76

77

## Basic Usage

78

79

### Simple Job Submission

80

```java

81

// Configure Flink cluster connection

82

Configuration config = new Configuration();

83

config.setString("rest.address", "localhost");

84

config.setInteger("rest.port", 8081);

85

86

// Create packaged program

87

PackagedProgram program = PackagedProgram.newBuilder()

88

.setJarFile(new File("my-flink-job.jar"))

89

.setEntryPointClassName("com.example.MyFlinkJob")

90

.setArguments("--input", "/path/to/input")

91

.build();

92

93

// Execute program

94

ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();

95

ClientUtils.executeProgram(serviceLoader, config, program, false, false);

96

```

97

98

### CLI Usage

99

```java

100

// Initialize CLI frontend

101

Configuration config = GlobalConfiguration.loadConfiguration();

102

List<CustomCommandLine> customCommandLines = CliFrontend.loadCustomCommandLines(config, configDir);

103

CliFrontend cli = new CliFrontend(config, customCommandLines);

104

105

// Parse and execute command

106

int exitCode = cli.parseAndRun(new String[]{"run", "my-job.jar", "--parallelism", "4"});

107

```

108

109

## Architecture

110

111

The flink-clients module is organized into several key architectural layers:

112

113

### 1. Core Client Layer (`org.apache.flink.client`)

114

The foundation layer providing essential client utilities, pipeline translation interfaces, and common abstractions for interacting with Flink clusters.

115

116

### 2. Command Line Interface (`org.apache.flink.client.cli`)

117

Interactive command-line tools and parsers for job management operations including run, list, cancel, stop, and savepoint commands.

118

119

### 3. Cluster Management (`org.apache.flink.client.deployment.*`)

120

Deployment and cluster interaction services supporting multiple deployment targets through pluggable factory patterns.

121

122

### 4. Program Execution (`org.apache.flink.client.program.*`)

123

Program packaging, execution environments, and cluster client implementations for job lifecycle management.

124

125

### 5. REST Communication (`org.apache.flink.client.program.rest.*`)

126

REST-based cluster communication with retry logic and SSL support for remote cluster interaction.

127

128

## Capabilities

129

130

### CLI Operations { .api }

131

132

Command-line interface functionality for interactive job management.

133

134

```java

135

// Main CLI entry point

136

public class CliFrontend {

137

public CliFrontend(Configuration configuration, List<CustomCommandLine> customCommandLines) { }

138

public int parseAndRun(String[] args) { }

139

public static void main(String[] args) { }

140

}

141

142

// Custom command line interface

143

public interface CustomCommandLine {

144

boolean isActive(CommandLine commandLine);

145

String getId();

146

void addRunOptions(Options baseOptions);

147

Configuration toConfiguration(CommandLine commandLine);

148

}

149

150

// Program execution options

151

public class ProgramOptions extends CommandLineOptions {

152

public ProgramOptions(CommandLine line) { }

153

public String getJarFilePath() { }

154

public String getEntryPointClassName() { }

155

public String[] getProgramArgs() { }

156

public int getParallelism() { }

157

}

158

```

159

160

**Required imports:**

161

```java

162

import org.apache.flink.client.cli.CliFrontend;

163

import org.apache.flink.client.cli.CustomCommandLine;

164

import org.apache.flink.client.cli.ProgramOptions;

165

import org.apache.flink.configuration.Configuration;

166

import org.apache.commons.cli.CommandLine;

167

import org.apache.commons.cli.Options;

168

```

169

170

[CLI Operations Documentation](./cli-operations.md)

171

172

### Cluster Management { .api }

173

174

Deployment and cluster interaction services supporting multiple deployment targets.

175

176

```java

177

// Cluster client factory interface

178

public interface ClusterClientFactory<ClusterID> {

179

boolean isCompatibleWith(Configuration configuration);

180

ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration);

181

ClusterID getClusterId(Configuration configuration);

182

ClusterSpecification getClusterSpecification(Configuration configuration);

183

}

184

185

// Cluster descriptor for deployment operations

186

public interface ClusterDescriptor<T> extends AutoCloseable {

187

String getClusterDescription();

188

ClusterClientProvider<T> retrieve(T clusterId);

189

ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification);

190

ClusterClientProvider<T> deployApplicationCluster(ClusterSpecification clusterSpecification, ApplicationConfiguration applicationConfiguration);

191

}

192

193

// Service loader for cluster clients

194

public interface ClusterClientServiceLoader {

195

<ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(Configuration configuration);

196

}

197

198

// Cluster specification

199

public class ClusterSpecification {

200

public ClusterSpecification(int masterMemoryMB, int taskManagerMemoryMB, int slotsPerTaskManager) { }

201

public int getMasterMemoryMB() { }

202

public int getTaskManagerMemoryMB() { }

203

public int getSlotsPerTaskManager() { }

204

}

205

```

206

207

**Required imports:**

208

```java

209

import org.apache.flink.client.deployment.ClusterClientFactory;

210

import org.apache.flink.client.deployment.ClusterDescriptor;

211

import org.apache.flink.client.deployment.ClusterClientServiceLoader;

212

import org.apache.flink.client.deployment.ClusterSpecification;

213

import org.apache.flink.client.deployment.application.ApplicationConfiguration;

214

import org.apache.flink.client.program.ClusterClientProvider;

215

import org.apache.flink.configuration.Configuration;

216

```

217

218

[Cluster Management Documentation](./cluster-management.md)

219

220

### Program Execution { .api }

221

222

Program packaging, execution environments, and job lifecycle management.

223

224

```java

225

// Main cluster client interface

226

public interface ClusterClient<T> extends AutoCloseable {

227

T getClusterId();

228

Configuration getFlinkConfiguration();

229

String getWebInterfaceURL();

230

CompletableFuture<Collection<JobStatusMessage>> listJobs();

231

CompletableFuture<JobID> submitJob(JobGraph jobGraph);

232

CompletableFuture<JobStatus> getJobStatus(JobID jobId);

233

CompletableFuture<JobResult> requestJobResult(JobID jobId);

234

CompletableFuture<Acknowledge> cancel(JobID jobId);

235

CompletableFuture<String> triggerSavepoint(JobID jobId, String savepointDirectory);

236

}

237

238

// Packaged program representation

239

public class PackagedProgram implements AutoCloseable {

240

public String getMainClassName() { }

241

public String[] getArguments() { }

242

public ClassLoader getUserCodeClassLoader() { }

243

public Configuration getConfiguration() { }

244

245

public static Builder newBuilder() { }

246

247

public static class Builder {

248

public Builder setJarFile(File jarFile) { }

249

public Builder setEntryPointClassName(String entryPointClassName) { }

250

public Builder setArguments(String... arguments) { }

251

public Builder setConfiguration(Configuration configuration) { }

252

public PackagedProgram build() { }

253

}

254

}

255

256

// Client utilities

257

public enum ClientUtils {

258

public static void executeProgram(PipelineExecutorServiceLoader executorServiceLoader,

259

Configuration configuration,

260

PackagedProgram program,

261

boolean enforceSingleJobExecution,

262

boolean suppressSysout);

263

264

public static URLClassLoader buildUserCodeClassLoader(List<URL> jars,

265

List<URL> classpaths,

266

ClassLoader parent,

267

Configuration configuration);

268

}

269

```

270

271

**Required imports:**

272

```java

273

import org.apache.flink.client.program.ClusterClient;

274

import org.apache.flink.client.program.PackagedProgram;

275

import org.apache.flink.client.ClientUtils;

276

import org.apache.flink.api.common.JobID;

277

import org.apache.flink.runtime.jobgraph.JobGraph;

278

import org.apache.flink.runtime.jobmaster.JobResult;

279

import org.apache.flink.api.common.JobStatus;

280

import org.apache.flink.runtime.messages.Acknowledge;

281

import org.apache.flink.core.execution.PipelineExecutorServiceLoader;

282

import org.apache.flink.configuration.Configuration;

283

import java.util.concurrent.CompletableFuture;

284

import java.util.Collection;

285

import java.io.File;

286

import java.net.URL;

287

import java.util.List;

288

```

289

290

[Program Execution Documentation](./program-execution.md)

291

292

### REST Client Communication { .api }

293

294

REST-based cluster communication with retry logic and SSL support.

295

296

```java

297

// REST cluster client

298

public class RestClusterClient<T> implements ClusterClient<T> {

299

public RestClusterClient(Configuration configuration,

300

RestClusterClientConfiguration restClusterClientConfiguration,

301

T clusterId) { }

302

303

public RestClusterClient(Configuration configuration,

304

RestClusterClientConfiguration restClusterClientConfiguration,

305

T clusterId,

306

WaitStrategy waitStrategy) { }

307

}

308

309

// REST client configuration

310

public class RestClusterClientConfiguration {

311

public static RestClusterClientConfiguration fromConfiguration(Configuration config) { }

312

public long getConnectionTimeout() { }

313

public long getIdlenessTimeout() { }

314

public int getMaxRetryAttempts() { }

315

public long getRetryDelay() { }

316

public AwaitingTime getAwaitLeaderTimeout() { }

317

}

318

319

// Wait strategy for retry logic

320

public interface WaitStrategy {

321

long sleepTime(long attempt);

322

}

323

324

// Exponential backoff implementation

325

public class ExponentialWaitStrategy implements WaitStrategy {

326

public ExponentialWaitStrategy(long initialWait, long maxWait) { }

327

public long sleepTime(long attempt) { }

328

}

329

```

330

331

**Required imports:**

332

```java

333

import org.apache.flink.client.program.rest.RestClusterClient;

334

import org.apache.flink.client.program.rest.RestClusterClientConfiguration;

335

import org.apache.flink.client.program.rest.retry.WaitStrategy;

336

import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;

337

import org.apache.flink.configuration.Configuration;

338

import org.apache.flink.util.concurrent.ScheduledExecutor;

339

```

340

341

[REST Client Communication Documentation](./rest-client-communication.md)

342

343

### Application Deployment { .api }

344

345

Specialized deployment utilities for application cluster mode.

346

347

```java

348

// Application configuration

349

public class ApplicationConfiguration {

350

public ApplicationConfiguration(String[] programArguments, String entryPointClassName) { }

351

public ApplicationConfiguration(String[] programArguments,

352

String entryPointClassName,

353

SavepointRestoreSettings savepointRestoreSettings) { }

354

355

public String[] getProgramArguments() { }

356

public String getEntryPointClassName() { }

357

public SavepointRestoreSettings getSavepointRestoreSettings() { }

358

}

359

360

// Application deployer interface

361

public interface ApplicationDeployer {

362

void run(Configuration effectiveConfiguration, ApplicationConfiguration applicationConfiguration);

363

}

364

365

// Application cluster deployer

366

public class ApplicationClusterDeployer implements ApplicationDeployer {

367

public ApplicationClusterDeployer(ClusterClientServiceLoader clusterClientServiceLoader) { }

368

public void run(Configuration configuration, ApplicationConfiguration applicationConfiguration) { }

369

}

370

371

// Application cluster entry point

372

public class ApplicationClusterEntryPoint extends ClusterEntrypoint {

373

public static void main(String[] args) { }

374

}

375

```

376

377

**Required imports:**

378

```java

379

import org.apache.flink.client.deployment.application.ApplicationConfiguration;

380

import org.apache.flink.client.deployment.application.ApplicationClusterDeployer;

381

import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint;

382

import org.apache.flink.client.cli.ApplicationDeployer;

383

import org.apache.flink.client.deployment.ClusterClientServiceLoader;

384

import org.apache.flink.runtime.state.StateBackend;

385

import org.apache.flink.configuration.Configuration;

386

```

387

388

### Pipeline Translation { .api }

389

390

Pipeline translation services for converting high-level Flink programs into executable job graphs.

391

392

```java

393

// Pipeline translator interface

394

public interface FlinkPipelineTranslator {

395

JobGraph translateToJobGraph(Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism);

396

String translateToJSONExecutionPlan(Pipeline pipeline);

397

boolean canTranslate(Pipeline pipeline);

398

}

399

400

// Pipeline translation utilities

401

public final class FlinkPipelineTranslationUtil {

402

public static JobGraph getJobGraph(Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) { }

403

public static JobGraph getJobGraphUnderUserClassLoader(ClassLoader userClassloader,

404

Pipeline pipeline,

405

Configuration configuration,

406

int defaultParallelism) { }

407

public static String translateToJSONExecutionPlan(Pipeline pipeline) { }

408

}

409

410

// Stream graph translator

411

public class StreamGraphTranslator implements FlinkPipelineTranslator {

412

public JobGraph translateToJobGraph(Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) { }

413

public String translateToJSONExecutionPlan(Pipeline pipeline) { }

414

public boolean canTranslate(Pipeline pipeline) { }

415

}

416

```

417

418

**Required imports:**

419

```java

420

import org.apache.flink.client.FlinkPipelineTranslator;

421

import org.apache.flink.client.FlinkPipelineTranslationUtil;

422

import org.apache.flink.client.StreamGraphTranslator;

423

import org.apache.flink.api.dag.Pipeline;

424

import org.apache.flink.runtime.jobgraph.JobGraph;

425

import org.apache.flink.configuration.Configuration;

426

```