or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-operations.mdcluster-management.mdindex.mdprogram-execution.mdrest-client-communication.md

rest-client-communication.mddocs/

0

# REST Client Communication

1

2

The Apache Flink REST Client Communication module (`org.apache.flink.client.program.rest.*`) provides REST-based cluster communication capabilities with comprehensive retry logic, SSL support, and configuration management for remote cluster interaction. This module enables reliable communication with Flink clusters over HTTP/HTTPS protocols.

3

4

## Core REST Client Classes

5

6

### RestClusterClient { .api }

7

8

REST-based implementation of the ClusterClient interface for communicating with remote Flink clusters.

9

10

```java

11

public class RestClusterClient<T> implements ClusterClient<T> {

12

// Constructors

13

public RestClusterClient(Configuration configuration,

14

RestClusterClientConfiguration restClusterClientConfiguration,

15

T clusterId) { }

16

17

public RestClusterClient(Configuration configuration,

18

RestClusterClientConfiguration restClusterClientConfiguration,

19

T clusterId,

20

WaitStrategy waitStrategy) { }

21

22

// ClusterClient interface implementation

23

public T getClusterId() { }

24

public Configuration getFlinkConfiguration() { }

25

public String getWebInterfaceURL() { }

26

27

// Job management via REST

28

public CompletableFuture<Collection<JobStatusMessage>> listJobs() { }

29

public CompletableFuture<JobID> submitJob(JobGraph jobGraph) { }

30

public CompletableFuture<JobStatus> getJobStatus(JobID jobId) { }

31

public CompletableFuture<JobResult> requestJobResult(JobID jobId) { }

32

33

// Job control operations

34

public CompletableFuture<Acknowledge> cancel(JobID jobId) { }

35

public CompletableFuture<String> cancelWithSavepoint(JobID jobId, String savepointDirectory) { }

36

public CompletableFuture<String> stopWithSavepoint(JobID jobId,

37

boolean advanceToEndOfEventTime,

38

String savepointDirectory) { }

39

40

// Savepoint operations

41

public CompletableFuture<String> triggerSavepoint(JobID jobId, String savepointDirectory) { }

42

public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) { }

43

44

// Metrics and coordination

45

public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) { }

46

public CompletableFuture<CoordinationResponse> sendCoordinationRequest(JobID jobId,

47

OperatorID operatorId,

48

CoordinationRequest request) { }

49

50

// Cluster management

51

public void shutDownCluster() { }

52

public void close() { }

53

}

54

```

55

56

### RestClusterClientConfiguration { .api }

57

58

Configuration class for REST cluster client settings including timeouts, retries, and SSL parameters.

59

60

```java

61

public class RestClusterClientConfiguration {

62

// Factory method

63

public static RestClusterClientConfiguration fromConfiguration(Configuration config) { }

64

65

// Connection configuration

66

public long getConnectionTimeout() { }

67

public long getIdlenessTimeout() { }

68

public AwaitingTime getAwaitLeaderTimeout() { }

69

70

// Retry configuration

71

public int getMaxRetryAttempts() { }

72

public long getRetryDelay() { }

73

74

// SSL configuration

75

public SSLHandlerFactory getSslHandlerFactory() { }

76

public String[] getTrustStore() { }

77

public String getTrustStorePassword() { }

78

public String[] getKeyStore() { }

79

public String getKeyStorePassword() { }

80

public String getSSLProtocol() { }

81

public String[] getSSLAlgorithms() { }

82

}

83

```

84

85

## Retry Strategy Components

86

87

### WaitStrategy { .api }

88

89

Interface for defining wait strategies in retry mechanisms.

90

91

```java

92

public interface WaitStrategy {

93

// Calculate sleep time for a given attempt

94

long sleepTime(long attempt);

95

}

96

```

97

98

### ExponentialWaitStrategy { .api }

99

100

Exponential backoff implementation of wait strategy for retry operations.

101

102

```java

103

public class ExponentialWaitStrategy implements WaitStrategy {

104

// Constructor

105

public ExponentialWaitStrategy(long initialWait, long maxWait) { }

106

107

// WaitStrategy implementation

108

public long sleepTime(long attempt) { }

109

}

110

```

111

112

## Configuration Integration

113

114

The REST client integrates with Flink's configuration system through several key configuration options:

115

116

### Connection Configuration

117

- `rest.address`: REST endpoint hostname

118

- `rest.port`: REST endpoint port

119

- `rest.connection-timeout`: Connection timeout duration

120

- `rest.idleness-timeout`: Idle connection timeout

121

122

### SSL Configuration

123

- `security.ssl.rest.enabled`: Enable SSL for REST connections

124

- `security.ssl.rest.keystore`: Path to SSL keystore

125

- `security.ssl.rest.keystore-password`: Keystore password

126

- `security.ssl.rest.truststore`: Path to SSL truststore

127

- `security.ssl.rest.truststore-password`: Truststore password

128

129

### Retry Configuration

130

- `rest.retry.max-attempts`: Maximum retry attempts

131

- `rest.retry.delay`: Base retry delay

132

133

## Usage Examples

134

135

### Basic REST Client Usage

136

137

```java

138

// Configure REST connection

139

Configuration config = new Configuration();

140

config.setString("rest.address", "flink-cluster.example.com");

141

config.setInteger("rest.port", 8081);

142

config.setDuration("rest.connection-timeout", Duration.ofSeconds(30));

143

config.setInteger("rest.retry.max-attempts", 3);

144

145

// Create REST client configuration

146

RestClusterClientConfiguration restConfig =

147

RestClusterClientConfiguration.fromConfiguration(config);

148

149

// Create cluster ID (implementation-specific)

150

StandaloneClusterId clusterId = new StandaloneClusterId();

151

152

// Create REST cluster client

153

RestClusterClient<StandaloneClusterId> client =

154

new RestClusterClient<>(config, restConfig, clusterId);

155

156

try {

157

// Use the client for operations

158

String webUrl = client.getWebInterfaceURL();

159

System.out.println("Cluster web interface: " + webUrl);

160

161

// List running jobs

162

CompletableFuture<Collection<JobStatusMessage>> jobsFuture = client.listJobs();

163

Collection<JobStatusMessage> jobs = jobsFuture.get();

164

165

for (JobStatusMessage job : jobs) {

166

System.out.println("Job: " + job.getJobName() + " - " + job.getJobState());

167

}

168

} finally {

169

client.close();

170

}

171

```

172

173

### REST Client with Custom Retry Strategy

174

175

```java

176

// Create custom exponential backoff strategy

177

WaitStrategy waitStrategy = new ExponentialWaitStrategy(

178

1000, // initial wait: 1 second

179

30000 // max wait: 30 seconds

180

);

181

182

// Configuration with retry settings

183

Configuration config = new Configuration();

184

config.setString("rest.address", "localhost");

185

config.setInteger("rest.port", 8081);

186

187

RestClusterClientConfiguration restConfig =

188

RestClusterClientConfiguration.fromConfiguration(config);

189

190

// Create client with custom wait strategy

191

RestClusterClient<StandaloneClusterId> client =

192

new RestClusterClient<>(config, restConfig, new StandaloneClusterId(), waitStrategy);

193

194

try {

195

// Submit job with retry support

196

JobGraph jobGraph = /* create job graph */;

197

CompletableFuture<JobID> submitFuture = client.submitJob(jobGraph);

198

JobID jobId = submitFuture.get();

199

200

// Monitor job with automatic retries

201

CompletableFuture<JobStatus> statusFuture = client.getJobStatus(jobId);

202

JobStatus status = statusFuture.get();

203

204

} finally {

205

client.close();

206

}

207

```

208

209

### SSL-Enabled REST Client

210

211

```java

212

// Configure SSL settings

213

Configuration config = new Configuration();

214

config.setString("rest.address", "secure-flink.example.com");

215

config.setInteger("rest.port", 8443);

216

config.setBoolean("security.ssl.rest.enabled", true);

217

config.setString("security.ssl.rest.keystore", "/path/to/keystore.jks");

218

config.setString("security.ssl.rest.keystore-password", "keystorePassword");

219

config.setString("security.ssl.rest.truststore", "/path/to/truststore.jks");

220

config.setString("security.ssl.rest.truststore-password", "truststorePassword");

221

222

// Create SSL-enabled REST client

223

RestClusterClientConfiguration restConfig =

224

RestClusterClientConfiguration.fromConfiguration(config);

225

226

RestClusterClient<StandaloneClusterId> client =

227

new RestClusterClient<>(config, restConfig, new StandaloneClusterId());

228

229

try {

230

// All operations now use HTTPS

231

CompletableFuture<Collection<JobStatusMessage>> jobsFuture = client.listJobs();

232

// ... handle response

233

} finally {

234

client.close();

235

}

236

```

237

238

### Job Management via REST

239

240

```java

241

// REST client setup

242

RestClusterClient<StandaloneClusterId> client = /* create client */;

243

244

try {

245

// Submit job

246

JobGraph jobGraph = /* your job graph */;

247

CompletableFuture<JobID> submitFuture = client.submitJob(jobGraph);

248

JobID jobId = submitFuture.get();

249

250

// Wait for job to start

251

CompletableFuture<JobStatus> statusFuture = client.getJobStatus(jobId);

252

JobStatus status = statusFuture.get();

253

254

while (status != JobStatus.RUNNING && status != JobStatus.FINISHED && status != JobStatus.FAILED) {

255

Thread.sleep(1000);

256

statusFuture = client.getJobStatus(jobId);

257

status = statusFuture.get();

258

}

259

260

if (status == JobStatus.RUNNING) {

261

// Trigger savepoint

262

CompletableFuture<String> savepointFuture =

263

client.triggerSavepoint(jobId, "hdfs://namenode:9000/savepoints");

264

String savepointPath = savepointFuture.get();

265

System.out.println("Savepoint created: " + savepointPath);

266

267

// Stop job with savepoint

268

CompletableFuture<String> stopFuture =

269

client.stopWithSavepoint(jobId, false, "hdfs://namenode:9000/final-savepoint");

270

String finalSavepointPath = stopFuture.get();

271

System.out.println("Job stopped with savepoint: " + finalSavepointPath);

272

}

273

274

// Get final job result

275

CompletableFuture<JobResult> resultFuture = client.requestJobResult(jobId);

276

JobResult result = resultFuture.get();

277

System.out.println("Job finished with state: " + result.getJobExecutionResult().getJobExecutionState());

278

279

} finally {

280

client.close();

281

}

282

```

283

284

### Coordination Request Example

285

286

```java

287

// Send coordination request to operator

288

RestClusterClient<StandaloneClusterId> client = /* create client */;

289

290

try {

291

JobID jobId = /* your job ID */;

292

OperatorID operatorId = /* target operator ID */;

293

294

// Create custom coordination request

295

CoordinationRequest request = new CoordinationRequest() {

296

// Implement coordination request logic

297

};

298

299

// Send request to operator

300

CompletableFuture<CoordinationResponse> responseFuture =

301

client.sendCoordinationRequest(jobId, operatorId, request);

302

303

CoordinationResponse response = responseFuture.get();

304

// Handle coordination response

305

306

} finally {

307

client.close();

308

}

309

```

310

311

### Error Handling and Timeouts

312

313

```java

314

// Configure timeout and retry settings

315

Configuration config = new Configuration();

316

config.setString("rest.address", "localhost");

317

config.setInteger("rest.port", 8081);

318

config.setDuration("rest.connection-timeout", Duration.ofSeconds(10));

319

config.setDuration("rest.idleness-timeout", Duration.ofMinutes(5));

320

config.setInteger("rest.retry.max-attempts", 5);

321

config.setDuration("rest.retry.delay", Duration.ofSeconds(2));

322

323

RestClusterClientConfiguration restConfig =

324

RestClusterClientConfiguration.fromConfiguration(config);

325

326

RestClusterClient<StandaloneClusterId> client =

327

new RestClusterClient<>(config, restConfig, new StandaloneClusterId());

328

329

try {

330

// Operation with timeout handling

331

CompletableFuture<Collection<JobStatusMessage>> jobsFuture = client.listJobs();

332

333

Collection<JobStatusMessage> jobs = jobsFuture

334

.orTimeout(30, TimeUnit.SECONDS)

335

.exceptionally(throwable -> {

336

System.err.println("Failed to list jobs: " + throwable.getMessage());

337

return Collections.emptyList();

338

})

339

.get();

340

341

} catch (Exception e) {

342

System.err.println("REST operation failed: " + e.getMessage());

343

} finally {

344

client.close();

345

}

346

```

347

348

## Required Imports

349

350

```java

351

import org.apache.flink.client.program.rest.RestClusterClient;

352

import org.apache.flink.client.program.rest.RestClusterClientConfiguration;

353

import org.apache.flink.client.program.rest.retry.WaitStrategy;

354

import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;

355

import org.apache.flink.client.program.ClusterClient;

356

import org.apache.flink.client.deployment.StandaloneClusterId;

357

import org.apache.flink.api.common.JobID;

358

import org.apache.flink.api.common.JobStatus;

359

import org.apache.flink.runtime.jobgraph.JobGraph;

360

import org.apache.flink.runtime.jobmaster.JobResult;

361

import org.apache.flink.runtime.messages.Acknowledge;

362

import org.apache.flink.runtime.jobgraph.OperatorID;

363

import org.apache.flink.runtime.operators.coordination.CoordinationRequest;

364

import org.apache.flink.runtime.operators.coordination.CoordinationResponse;

365

import org.apache.flink.runtime.client.JobStatusMessage;

366

import org.apache.flink.configuration.Configuration;

367

import org.apache.flink.runtime.security.contexts.SecurityContext;

368

import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverview;

369

import org.apache.flink.util.concurrent.ScheduledExecutor;

370

import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;

371

import java.util.concurrent.CompletableFuture;

372

import java.util.concurrent.TimeUnit;

373

import java.util.Collection;

374

import java.util.Collections;

375

import java.util.Map;

376

import java.time.Duration;

377

```

378

379

## Configuration Options Reference

380

381

### Essential REST Configuration

382

```java

383

// Basic connection settings

384

config.setString("rest.address", "localhost");

385

config.setInteger("rest.port", 8081);

386

config.setDuration("rest.connection-timeout", Duration.ofSeconds(30));

387

config.setDuration("rest.idleness-timeout", Duration.ofMinutes(5));

388

389

// Retry configuration

390

config.setInteger("rest.retry.max-attempts", 3);

391

config.setDuration("rest.retry.delay", Duration.ofSeconds(1));

392

393

// SSL configuration (when needed)

394

config.setBoolean("security.ssl.rest.enabled", true);

395

config.setString("security.ssl.rest.keystore", "/path/to/keystore.jks");

396

config.setString("security.ssl.rest.keystore-password", "password");

397

config.setString("security.ssl.rest.truststore", "/path/to/truststore.jks");

398

config.setString("security.ssl.rest.truststore-password", "password");

399

```

400

401

The REST Client Communication module provides robust, production-ready HTTP/HTTPS communication with Flink clusters, supporting advanced features like automatic retries, SSL encryption, and comprehensive timeout handling for reliable distributed system interactions.