or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-deployment.mdcli-interface.mdclient-core.mdcluster-management.mdindex.mdprogram-execution.mdrest-client.md

rest-client.mddocs/

0

# REST Client

1

2

REST client implementations for communicating with Flink clusters through HTTP APIs, including retry strategies, configuration management, and comprehensive cluster operation support.

3

4

## Capabilities

5

6

### REST Cluster Client

7

8

REST-based implementation of the ClusterClient interface for HTTP communication with Flink clusters.

9

10

```java { .api }

11

/**

12

* REST-based cluster client implementation

13

* @param <T> Type of cluster identifier

14

*/

15

public class RestClusterClient<T> implements ClusterClient<T> {

16

/**

17

* Creates REST cluster client with basic configuration

18

* @param config Flink configuration

19

* @param clusterId Cluster identifier

20

*/

21

public RestClusterClient(Configuration config, T clusterId);

22

23

/**

24

* Creates REST cluster client with high availability services factory

25

* @param config Flink configuration

26

* @param clusterId Cluster identifier

27

* @param factory High availability services factory

28

*/

29

public RestClusterClient(

30

Configuration config,

31

T clusterId,

32

ClientHighAvailabilityServicesFactory factory);

33

34

// Implements all ClusterClient methods with REST-specific implementations

35

@Override

36

public T getClusterId();

37

38

@Override

39

public Configuration getFlinkConfiguration();

40

41

@Override

42

public void shutDownCluster();

43

44

@Override

45

public String getWebInterfaceURL();

46

47

@Override

48

public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception;

49

50

@Override

51

public CompletableFuture<JobID> submitJob(JobGraph jobGraph);

52

53

@Override

54

public CompletableFuture<JobStatus> getJobStatus(JobID jobId);

55

56

@Override

57

public CompletableFuture<JobResult> requestJobResult(JobID jobId);

58

59

@Override

60

public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID);

61

62

@Override

63

public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);

64

65

@Override

66

public CompletableFuture<Acknowledge> cancel(JobID jobId);

67

68

@Override

69

public CompletableFuture<String> cancelWithSavepoint(JobID jobId, String savepointDirectory);

70

71

@Override

72

public CompletableFuture<String> stopWithSavepoint(

73

JobID jobId,

74

boolean advanceToEndOfEventTime,

75

String savepointDirectory);

76

77

@Override

78

public CompletableFuture<String> triggerSavepoint(JobID jobId, String savepointDirectory);

79

80

@Override

81

public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException;

82

83

@Override

84

public CompletableFuture<CoordinationResponse> sendCoordinationRequest(

85

JobID jobId,

86

OperatorID operatorId,

87

CoordinationRequest request);

88

89

@Override

90

public void close();

91

}

92

```

93

94

**Usage Example:**

95

96

```java

97

import org.apache.flink.client.program.rest.RestClusterClient;

98

import org.apache.flink.client.program.rest.RestClusterClientConfiguration;

99

import org.apache.flink.configuration.Configuration;

100

import org.apache.flink.configuration.JobManagerOptions;

101

import org.apache.flink.configuration.RestOptions;

102

103

// Configure REST client

104

Configuration config = new Configuration();

105

config.setString(JobManagerOptions.ADDRESS, "localhost");

106

config.setInteger(JobManagerOptions.PORT, 6123);

107

config.setInteger(RestOptions.PORT, 8081);

108

109

// Create REST client configuration

110

RestClusterClientConfiguration clientConfig = new RestClusterClientConfiguration.RestClusterClientConfigurationBuilder()

111

.setRetryMaxAttempts(10)

112

.setRetryDelay(1000)

113

.setAwaitLeaderTimeout(30000)

114

.build();

115

116

// Create REST cluster client

117

try (RestClusterClient<StandaloneClusterId> client = new RestClusterClient<>(

118

config, clientConfig, new StandaloneClusterId(), new ExponentialWaitStrategy(1000, 10000))) {

119

120

// Submit job via REST

121

JobGraph jobGraph = createJobGraph();

122

JobID jobId = client.submitJob(jobGraph).get();

123

System.out.println("Job submitted via REST: " + jobId);

124

125

// Monitor job status

126

JobStatus status = client.getJobStatus(jobId).get();

127

System.out.println("Job status: " + status);

128

129

// Create savepoint via REST

130

String savepointPath = client.triggerSavepoint(jobId, "/path/to/savepoints").get();

131

System.out.println("Savepoint created: " + savepointPath);

132

}

133

```

134

135

### REST Client Configuration

136

137

Configuration class for REST cluster clients with connection and retry settings.

138

139

```java { .api }

140

/**

141

* Configuration for REST cluster clients

142

*/

143

public final class RestClusterClientConfiguration {

144

/**

145

* Gets the REST client configuration

146

* @return RestClientConfiguration instance

147

*/

148

public RestClientConfiguration getRestClientConfiguration();

149

150

/**

151

* Gets the await leader timeout in milliseconds

152

* @return Timeout for awaiting cluster leader

153

*/

154

public long getAwaitLeaderTimeout();

155

156

/**

157

* Gets the maximum number of retry attempts

158

* @return Maximum retry attempts

159

*/

160

public int getRetryMaxAttempts();

161

162

/**

163

* Gets the retry delay in milliseconds

164

* @return Delay between retry attempts

165

*/

166

public long getRetryDelay();

167

168

/**

169

* Creates RestClusterClientConfiguration from Flink configuration

170

* @param config Flink configuration containing REST options

171

* @return RestClusterClientConfiguration instance

172

* @throws ConfigurationException If configuration is invalid

173

*/

174

public static RestClusterClientConfiguration fromConfiguration(Configuration config)

175

throws ConfigurationException;

176

}

177

```

178

179

**Usage Example:**

180

181

```java

182

import org.apache.flink.client.program.rest.RestClusterClientConfiguration;

183

184

// Create Flink configuration with REST options

185

Configuration flinkConfig = new Configuration();

186

flinkConfig.setLong(RestOptions.AWAIT_LEADER_TIMEOUT, 60000L); // 60 seconds

187

flinkConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 5); // 5 retry attempts

188

flinkConfig.setLong(RestOptions.RETRY_DELAY, 2000L); // 2 second delay

189

190

// Create REST client configuration from Flink configuration

191

RestClusterClientConfiguration config = RestClusterClientConfiguration.fromConfiguration(flinkConfig);

192

193

System.out.println("Leader timeout: " + config.getAwaitLeaderTimeout());

194

System.out.println("Max retries: " + config.getRetryMaxAttempts());

195

System.out.println("Retry delay: " + config.getRetryDelay());

196

```

197

198

### Retry Strategies

199

200

Strategy implementations for handling retry logic in REST client operations.

201

202

```java { .api }

203

/**

204

* Strategy for waiting between retry attempts

205

*/

206

public interface WaitStrategy {

207

/**

208

* Calculates sleep time for the given attempt count

209

* @param attemptCount Number of attempts made (starting from 0)

210

* @return Sleep time in milliseconds

211

*/

212

long sleepTime(long attemptCount);

213

}

214

215

/**

216

* Configuration options for REST client settings

217

*/

218

public class RestOptions {

219

public static final ConfigOption<Long> AWAIT_LEADER_TIMEOUT;

220

public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS;

221

public static final ConfigOption<Long> RETRY_DELAY;

222

}

223

224

/**

225

* Exponential backoff wait strategy

226

*/

227

public class ExponentialWaitStrategy implements WaitStrategy {

228

/**

229

* Creates exponential wait strategy

230

* @param initialDelay Initial delay in milliseconds

231

* @param maxDelay Maximum delay in milliseconds

232

*/

233

public ExponentialWaitStrategy(long initialDelay, long maxDelay);

234

235

/**

236

* Calculates exponential backoff sleep time

237

* @param attemptCount Number of attempts made

238

* @return Sleep time with exponential backoff

239

*/

240

@Override

241

public long sleepTime(long attemptCount);

242

}

243

```

244

245

**Usage Example:**

246

247

```java

248

import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;

249

import org.apache.flink.client.program.rest.retry.WaitStrategy;

250

251

// Create exponential backoff strategy

252

WaitStrategy waitStrategy = new ExponentialWaitStrategy(1000, 30000); // 1s initial, 30s max

253

254

// Calculate wait times for different attempts

255

for (int attempt = 0; attempt < 5; attempt++) {

256

long waitTime = waitStrategy.sleepTime(attempt);

257

System.out.println("Attempt " + attempt + ": wait " + waitTime + "ms");

258

}

259

// Output:

260

// Attempt 0: wait 1000ms

261

// Attempt 1: wait 2000ms

262

// Attempt 2: wait 4000ms

263

// Attempt 3: wait 8000ms

264

// Attempt 4: wait 16000ms

265

```

266

267

### REST Client Integration

268

269

The REST client integrates with the broader Flink client ecosystem through standard interfaces and provides HTTP-based communication with Flink clusters.

270

271

**Key Integration Points:**

272

273

1. **ClusterClient Interface**: RestClusterClient implements the standard ClusterClient interface, making it interchangeable with other client implementations

274

2. **Configuration Integration**: Uses standard Flink Configuration class for setup

275

3. **Retry Strategy**: Pluggable retry strategies for handling network failures and temporary unavailability

276

4. **Async Operations**: All operations return CompletableFuture for non-blocking execution

277

5. **Resource Management**: Implements AutoCloseable for proper resource cleanup

278

279

**REST Endpoints Used:**

280

281

```java { .api }

282

// Common REST endpoints accessed by RestClusterClient (for reference):

283

// GET /jobs - List jobs

284

// POST /jobs - Submit job

285

// GET /jobs/{jobid} - Get job details

286

// PATCH /jobs/{jobid} - Cancel job

287

// POST /jobs/{jobid}/savepoints - Trigger savepoint

288

// DELETE /jobs/{jobid}/savepoints/{savepointId} - Dispose savepoint

289

// POST /jobs/{jobid}/stop - Stop job with savepoint

290

// GET /jobs/{jobid}/accumulators - Get job accumulators

291

// POST /jobs/{jobid}/coordination/{operatorId} - Send coordination request

292

```

293

294

**Configuration Options:**

295

296

```java { .api }

297

// Important configuration keys for REST client (from Flink configuration):

298

// rest.address - REST server address

299

// rest.port - REST server port

300

// rest.connection-timeout - Connection timeout

301

// rest.idleness-timeout - Idleness timeout

302

// rest.await-leader-timeout - Leader election timeout

303

// rest.retry.max-attempts - Maximum retry attempts

304

// rest.retry.delay - Delay between retries

305

```

306

307

**Usage in Different Deployment Modes:**

308

309

```java

310

import org.apache.flink.client.program.rest.RestClusterClient;

311

312

// For standalone cluster

313

Configuration standaloneConfig = new Configuration();

314

standaloneConfig.setString("rest.address", "flink-master");

315

standaloneConfig.setInteger("rest.port", 8081);

316

317

RestClusterClient<StandaloneClusterId> standaloneClient =

318

new RestClusterClient<>(standaloneConfig, new StandaloneClusterId());

319

320

// For session cluster (e.g., YARN session)

321

Configuration sessionConfig = new Configuration();

322

sessionConfig.setString("yarn.application.id", "application_123456_0001");

323

sessionConfig.setString("rest.address", "yarn-session-master");

324

325

RestClusterClient<YarnClusterId> yarnClient =

326

new RestClusterClient<>(sessionConfig, yarnClusterId);

327

```

328

329

## Types

330

331

```java { .api }

332

/**

333

* REST client configuration for low-level HTTP settings

334

*/

335

public class RestClientConfiguration {

336

public long getConnectionTimeout();

337

public long getIdlenessTimeout();

338

public int getMaxContentLength();

339

public SSLHandlerFactory getSslHandlerFactory();

340

}

341

342

/**

343

* Retry strategy interface

344

*/

345

public interface RetryStrategy {

346

boolean canRetry(int attemptCount);

347

long getRetryDelay(int attemptCount);

348

}

349

350

/**

351

* HTTP-specific exceptions (typically from underlying REST framework)

352

*/

353

public class RestClientException extends Exception {

354

public RestClientException(String message);

355

public RestClientException(String message, Throwable cause);

356

}

357

358

/**

359

* Connection timeout exception

360

*/

361

public class ConnectionTimeoutException extends RestClientException {

362

public ConnectionTimeoutException(String message);

363

}

364

365

/**

366

* Leader retrieval exception for REST clients

367

*/

368

public class LeaderRetrievalException extends RestClientException {

369

public LeaderRetrievalException(String message);

370

public LeaderRetrievalException(String message, Throwable cause);

371

}

372

```

373

374

## Advanced Usage Patterns

375

376

### Custom Retry Strategy

377

378

```java

379

import org.apache.flink.client.program.rest.retry.WaitStrategy;

380

381

// Implement custom retry strategy

382

public class LinearWaitStrategy implements WaitStrategy {

383

private final long baseDelay;

384

private final long increment;

385

386

public LinearWaitStrategy(long baseDelay, long increment) {

387

this.baseDelay = baseDelay;

388

this.increment = increment;

389

}

390

391

@Override

392

public long sleepTime(long attemptCount) {

393

return baseDelay + (attemptCount * increment);

394

}

395

}

396

397

// Use custom strategy

398

WaitStrategy customStrategy = new LinearWaitStrategy(1000, 500);

399

RestClusterClient<T> client = new RestClusterClient<>(

400

config, clientConfig, clusterId, customStrategy);

401

```

402

403

### Monitoring and Error Handling

404

405

```java

406

import org.apache.flink.client.program.rest.RestClusterClient;

407

import java.util.concurrent.CompletionException;

408

409

try (RestClusterClient<StandaloneClusterId> client = createRestClient()) {

410

// Submit job with error handling

411

CompletableFuture<JobID> submitFuture = client.submitJob(jobGraph);

412

413

JobID jobId = submitFuture.handle((result, throwable) -> {

414

if (throwable != null) {

415

if (throwable instanceof CompletionException) {

416

Throwable cause = throwable.getCause();

417

if (cause instanceof RestClientException) {

418

System.err.println("REST communication failed: " + cause.getMessage());

419

} else {

420

System.err.println("Job submission failed: " + cause.getMessage());

421

}

422

}

423

return null;

424

}

425

return result;

426

}).get();

427

428

if (jobId != null) {

429

System.out.println("Job submitted successfully: " + jobId);

430

431

// Monitor job with periodic status checks

432

CompletableFuture<JobResult> resultFuture = client.requestJobResult(jobId);

433

JobResult result = resultFuture.get();

434

System.out.println("Job completed with status: " + result.getJobExecutionResult());

435

}

436

}

437

```