or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-exchange.mdexecution-graph.mdhigh-availability.mdindex.mdjob-management.mdmessage-passing.mdmetrics.mdmini-cluster.mdrpc-framework.mdstate-management.mdtask-execution.md

rpc-framework.mddocs/

0

# RPC Framework

1

2

The RPC (Remote Procedure Call) Framework provides the communication infrastructure for distributed components in the Flink cluster. It enables reliable, asynchronous communication between JobManagers, TaskManagers, and client applications across the cluster network.

3

4

## Core RPC Services

5

6

### RpcService

7

8

The main RPC service interface that manages remote communication endpoints and connections.

9

10

```java { .api }

11

public interface RpcService extends AutoCloseable {

12

<C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz);

13

<F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(

14

String address, F fencingToken, Class<C> clazz);

15

16

void stopServer(RpcEndpoint endpoint);

17

18

CompletableFuture<Void> stopService();

19

20

Executor getExecutor();

21

ScheduledExecutor getScheduledExecutor();

22

23

void executeRunnable(Runnable runnable);

24

void execute(Runnable runnable);

25

26

<T> CompletableFuture<T> execute(Callable<T> callable);

27

ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);

28

}

29

```

30

31

### RpcEndpoint

32

33

Abstract base class for all RPC endpoints in the Flink cluster. Components extend this class to expose RPC interfaces.

34

35

```java { .api }

36

public abstract class RpcEndpoint implements AutoCloseable {

37

protected RpcEndpoint(RpcService rpcService);

38

protected RpcEndpoint(RpcService rpcService, String endpointId);

39

40

public void start() throws Exception;

41

public CompletableFuture<Void> closeAsync();

42

43

@Override

44

public final void close() throws Exception;

45

46

protected final String getAddress();

47

protected final String getEndpointId();

48

protected final RpcService getRpcService();

49

50

protected final <C extends RpcGateway> CompletableFuture<C> connectTo(String address, Class<C> clazz);

51

protected final <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connectTo(

52

String address, F fencingToken, Class<C> clazz);

53

54

protected final void scheduleRunAsync(Runnable runnable, long delay, TimeUnit timeUnit);

55

protected final ScheduledFuture<?> scheduleRunAsync(Runnable runnable, Time delay);

56

57

protected final <V> CompletableFuture<V> callAsync(Callable<V> callable, Time timeout);

58

59

protected void onStart() throws Exception;

60

protected CompletableFuture<Void> onStop();

61

62

protected void validateRunsInMainThread();

63

protected void runAsync(Runnable runnable);

64

}

65

```

66

67

## Gateway Interfaces

68

69

### RpcGateway

70

71

Base interface for all RPC gateway implementations. Gateways provide client-side access to remote RPC endpoints.

72

73

```java { .api }

74

public interface RpcGateway {

75

String getAddress();

76

String getHostname();

77

}

78

```

79

80

### FencedRpcGateway

81

82

Extended RPC gateway interface that includes fencing tokens for leader election scenarios.

83

84

```java { .api }

85

public interface FencedRpcGateway<F extends Serializable> extends RpcGateway {

86

F getFencingToken();

87

}

88

```

89

90

### RpcServer

91

92

Interface representing the server-side RPC endpoint that can be stopped and provides address information.

93

94

```java { .api }

95

public interface RpcServer extends AutoCloseable {

96

void start() throws Exception;

97

98

@Override

99

void close() throws Exception;

100

101

String getAddress();

102

int getPort();

103

104

CompletableFuture<Void> getTerminationFuture();

105

}

106

```

107

108

## RPC Method Annotations

109

110

### RpcMethod

111

112

Annotation to mark methods as RPC-callable with timeout specifications.

113

114

```java { .api }

115

@Target(ElementType.METHOD)

116

@Retention(RetentionPolicy.RUNTIME)

117

public @interface RpcMethod {

118

/**

119

* Timeout for the RPC call in milliseconds.

120

*/

121

long timeout() default -1L;

122

}

123

```

124

125

### RpcTimeout

126

127

Annotation to specify timeout for RPC calls at the parameter level.

128

129

```java { .api }

130

@Target(ElementType.PARAMETER)

131

@Retention(RetentionPolicy.RUNTIME)

132

public @interface RpcTimeout {

133

// Marker annotation for timeout parameters

134

}

135

```

136

137

## Exception Handling

138

139

### RpcException

140

141

Base exception class for RPC-related failures.

142

143

```java { .api }

144

public class RpcException extends FlinkException {

145

public RpcException(String message);

146

public RpcException(String message, Throwable cause);

147

}

148

```

149

150

### RpcConnectionException

151

152

Exception thrown when RPC connection establishment fails.

153

154

```java { .api }

155

public class RpcConnectionException extends RpcException {

156

public RpcConnectionException(String message);

157

public RpcConnectionException(String message, Throwable cause);

158

159

public RpcConnectionException(String targetAddress, Class<?> rpcGatewayClass, Throwable cause);

160

161

public String getTargetAddress();

162

public Class<?> getRpcGatewayClass();

163

}

164

```

165

166

### RpcRuntimeException

167

168

Runtime exception for RPC failures that don't require explicit handling.

169

170

```java { .api }

171

public class RpcRuntimeException extends FlinkRuntimeException {

172

public RpcRuntimeException(String message);

173

public RpcRuntimeException(String message, Throwable cause);

174

}

175

```

176

177

## Factory and Utils

178

179

### RpcServiceUtils

180

181

Utility class providing factory methods and helper functions for RPC services.

182

183

```java { .api }

184

public class RpcServiceUtils {

185

public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception;

186

187

public static RpcService createRpcService(String hostname, int port, Configuration configuration,

188

HighAvailabilityServices highAvailabilityServices) throws Exception;

189

190

public static int getRandomPort();

191

192

public static String createWildcardAddress();

193

public static String getHostname(RpcService rpcService);

194

195

public static CompletableFuture<Void> terminateRpcEndpoint(RpcEndpoint rpcEndpoint, Time timeout);

196

public static CompletableFuture<Void> terminateRpcService(RpcService rpcService, Time timeout);

197

198

public static CompletableFuture<Void> terminateRpcServices(Time timeout, RpcService... rpcServices);

199

200

public static <T extends RpcGateway> T getSynchronousRpcGateway(

201

T rpcGateway,

202

Class<T> rpcGatewayClass,

203

Time timeout

204

);

205

}

206

```

207

208

## Configuration Options

209

210

### RPC Configuration

211

212

Configuration options for customizing RPC behavior and networking.

213

214

```java { .api }

215

public class RpcOptions {

216

public static final ConfigOption<String> RPC_BIND_ADDRESS =

217

key("rpc.bind-address").defaultValue("");

218

219

public static final ConfigOption<Integer> RPC_PORT =

220

key("rpc.port").defaultValue(0);

221

222

public static final ConfigOption<Duration> RPC_ASK_TIMEOUT =

223

key("rpc.ask-timeout").defaultValue(Duration.ofSeconds(10));

224

225

public static final ConfigOption<Duration> RPC_LOOKUP_TIMEOUT =

226

key("rpc.lookup-timeout").defaultValue(Duration.ofSeconds(10));

227

228

public static final ConfigOption<Integer> RPC_CONNECT_RETRIES =

229

key("rpc.connect.retries").defaultValue(5);

230

231

public static final ConfigOption<Duration> RPC_CONNECT_RETRY_DELAY =

232

key("rpc.connect.retry-delay").defaultValue(Duration.ofSeconds(1));

233

234

public static final ConfigOption<Boolean> RPC_SSL_ENABLED =

235

key("rpc.ssl.enabled").defaultValue(false);

236

}

237

```

238

239

## Usage Examples

240

241

### Creating an RPC Endpoint

242

243

```java

244

import org.apache.flink.runtime.rpc.RpcEndpoint;

245

import org.apache.flink.runtime.rpc.RpcService;

246

import org.apache.flink.runtime.rpc.RpcMethod;

247

248

// Define the RPC gateway interface

249

public interface TaskManagerGateway extends RpcGateway {

250

CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, @RpcTimeout Time timeout);

251

CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);

252

CompletableFuture<TaskExecutorInfo> requestTaskExecutorInfo(@RpcTimeout Time timeout);

253

}

254

255

// Implement the RPC endpoint

256

public class TaskManagerRpcEndpoint extends RpcEndpoint implements TaskManagerGateway {

257

private final TaskManager taskManager;

258

259

public TaskManagerRpcEndpoint(RpcService rpcService, TaskManager taskManager) {

260

super(rpcService, "TaskManager");

261

this.taskManager = taskManager;

262

}

263

264

@Override

265

protected void onStart() throws Exception {

266

System.out.println("TaskManager RPC endpoint started at: " + getAddress());

267

}

268

269

@Override

270

@RpcMethod(timeout = 30000L) // 30 second timeout

271

public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {

272

return CompletableFuture.supplyAsync(() -> {

273

try {

274

taskManager.submitTask(tdd);

275

return Acknowledge.get();

276

} catch (Exception e) {

277

throw new CompletionException(e);

278

}

279

}, getRpcService().getExecutor());

280

}

281

282

@Override

283

@RpcMethod(timeout = 10000L) // 10 second timeout

284

public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {

285

return CompletableFuture.supplyAsync(() -> {

286

try {

287

taskManager.cancelTask(executionAttemptID);

288

return Acknowledge.get();

289

} catch (Exception e) {

290

throw new CompletionException(e);

291

}

292

}, getRpcService().getExecutor());

293

}

294

295

@Override

296

@RpcMethod(timeout = 5000L) // 5 second timeout

297

public CompletableFuture<TaskExecutorInfo> requestTaskExecutorInfo(Time timeout) {

298

return CompletableFuture.supplyAsync(() -> {

299

return taskManager.getTaskExecutorInfo();

300

}, getRpcService().getExecutor());

301

}

302

303

@Override

304

protected CompletableFuture<Void> onStop() {

305

System.out.println("TaskManager RPC endpoint stopping");

306

return CompletableFuture.completedFuture(null);

307

}

308

}

309

```

310

311

### Setting Up RPC Service

312

313

```java

314

import org.apache.flink.runtime.rpc.RpcService;

315

import org.apache.flink.runtime.rpc.RpcServiceUtils;

316

import org.apache.flink.configuration.Configuration;

317

318

// Create RPC service configuration

319

Configuration config = new Configuration();

320

config.setString("rpc.bind-address", "localhost");

321

config.setInteger("rpc.port", 6123);

322

config.setString("rpc.ask-timeout", "10 s");

323

324

// Create RPC service

325

RpcService rpcService = RpcServiceUtils.createRpcService("localhost", 6123, config);

326

327

try {

328

// Start TaskManager RPC endpoint

329

TaskManager taskManager = new TaskManager();

330

TaskManagerRpcEndpoint taskManagerEndpoint = new TaskManagerRpcEndpoint(rpcService, taskManager);

331

taskManagerEndpoint.start();

332

333

System.out.println("TaskManager available at: " + taskManagerEndpoint.getAddress());

334

335

// Keep service running

336

Thread.sleep(60000);

337

338

} finally {

339

// Clean shutdown

340

rpcService.stopService().get();

341

}

342

```

343

344

### Connecting to Remote RPC Endpoints

345

346

```java

347

import org.apache.flink.runtime.rpc.RpcService;

348

import org.apache.flink.util.concurrent.FutureUtils;

349

350

public class JobManagerClient {

351

private final RpcService rpcService;

352

353

public JobManagerClient(RpcService rpcService) {

354

this.rpcService = rpcService;

355

}

356

357

public void connectAndSubmitJob(String taskManagerAddress, JobGraph jobGraph) {

358

// Connect to remote TaskManager

359

CompletableFuture<TaskManagerGateway> connectionFuture =

360

rpcService.connect(taskManagerAddress, TaskManagerGateway.class);

361

362

connectionFuture.thenCompose(taskManagerGateway -> {

363

System.out.println("Connected to TaskManager at: " + taskManagerGateway.getAddress());

364

365

// Submit tasks to TaskManager

366

List<CompletableFuture<Acknowledge>> taskFutures = new ArrayList<>();

367

368

for (JobVertex vertex : jobGraph.getVertices()) {

369

TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(vertex);

370

CompletableFuture<Acknowledge> taskFuture = taskManagerGateway.submitTask(

371

tdd,

372

Time.seconds(30)

373

);

374

taskFutures.add(taskFuture);

375

}

376

377

// Wait for all tasks to be submitted

378

return FutureUtils.waitForAll(taskFutures);

379

380

}).whenComplete((result, throwable) -> {

381

if (throwable != null) {

382

System.err.println("Failed to submit tasks: " + throwable.getMessage());

383

} else {

384

System.out.println("All tasks submitted successfully");

385

}

386

});

387

}

388

389

private TaskDeploymentDescriptor createTaskDeploymentDescriptor(JobVertex vertex) {

390

// Create task deployment descriptor from job vertex

391

return new TaskDeploymentDescriptor(/* ... */);

392

}

393

}

394

```

395

396

### Fenced RPC with Leader Election

397

398

```java

399

import org.apache.flink.runtime.rpc.FencedRpcGateway;

400

import java.util.UUID;

401

402

// Define fenced RPC gateway for leader election scenarios

403

public interface ResourceManagerGateway extends FencedRpcGateway<UUID> {

404

CompletableFuture<RegistrationResponse> registerTaskManager(

405

UUID leaderSessionId,

406

TaskManagerRegistration registration,

407

@RpcTimeout Time timeout

408

);

409

}

410

411

// Implement fenced RPC endpoint

412

public class ResourceManagerRpcEndpoint extends RpcEndpoint implements ResourceManagerGateway {

413

private volatile UUID leaderSessionId;

414

private volatile boolean isLeader = false;

415

416

public ResourceManagerRpcEndpoint(RpcService rpcService) {

417

super(rpcService, "ResourceManager");

418

}

419

420

@Override

421

public UUID getFencingToken() {

422

return leaderSessionId;

423

}

424

425

@Override

426

@RpcMethod(timeout = 15000L)

427

public CompletableFuture<RegistrationResponse> registerTaskManager(

428

UUID leaderSessionId,

429

TaskManagerRegistration registration,

430

Time timeout) {

431

432

return CompletableFuture.supplyAsync(() -> {

433

// Verify fencing token

434

if (!isLeader || !Objects.equals(this.leaderSessionId, leaderSessionId)) {

435

throw new CompletionException(new RpcException("Invalid leader session ID"));

436

}

437

438

// Process TaskManager registration

439

return processTaskManagerRegistration(registration);

440

441

}, getRpcService().getExecutor());

442

}

443

444

public void becomeLeader(UUID newLeaderSessionId) {

445

runAsync(() -> {

446

this.leaderSessionId = newLeaderSessionId;

447

this.isLeader = true;

448

System.out.println("Became leader with session ID: " + newLeaderSessionId);

449

});

450

}

451

452

public void revokeLeadership() {

453

runAsync(() -> {

454

this.isLeader = false;

455

this.leaderSessionId = null;

456

System.out.println("Leadership revoked");

457

});

458

}

459

460

private RegistrationResponse processTaskManagerRegistration(TaskManagerRegistration registration) {

461

// Implementation for processing registration

462

return new RegistrationResponse.Success();

463

}

464

}

465

```

466

467

### Error Handling and Retries

468

469

```java

470

public class RobustRpcClient {

471

private final RpcService rpcService;

472

private final ScheduledExecutorService retryExecutor;

473

474

public RobustRpcClient(RpcService rpcService) {

475

this.rpcService = rpcService;

476

this.retryExecutor = Executors.newScheduledThreadPool(1);

477

}

478

479

public <T> CompletableFuture<T> callWithRetry(

480

String address,

481

Class<? extends RpcGateway> gatewayClass,

482

Function<RpcGateway, CompletableFuture<T>> rpcCall,

483

int maxRetries) {

484

485

return callWithRetryInternal(address, gatewayClass, rpcCall, maxRetries, 0);

486

}

487

488

private <T> CompletableFuture<T> callWithRetryInternal(

489

String address,

490

Class<? extends RpcGateway> gatewayClass,

491

Function<RpcGateway, CompletableFuture<T>> rpcCall,

492

int maxRetries,

493

int currentAttempt) {

494

495

return rpcService.connect(address, gatewayClass)

496

.thenCompose(gateway -> {

497

return rpcCall.apply(gateway);

498

})

499

.handle((result, throwable) -> {

500

if (throwable != null && currentAttempt < maxRetries) {

501

System.out.println("RPC call failed (attempt " + (currentAttempt + 1) +

502

"/" + maxRetries + "), retrying...");

503

504

// Exponential backoff

505

long delay = (long) Math.pow(2, currentAttempt) * 1000;

506

507

CompletableFuture<T> retryFuture = new CompletableFuture<>();

508

retryExecutor.schedule(() -> {

509

callWithRetryInternal(address, gatewayClass, rpcCall, maxRetries, currentAttempt + 1)

510

.whenComplete((retryResult, retryThrowable) -> {

511

if (retryThrowable != null) {

512

retryFuture.completeExceptionally(retryThrowable);

513

} else {

514

retryFuture.complete(retryResult);

515

}

516

});

517

}, delay, TimeUnit.MILLISECONDS);

518

519

return retryFuture;

520

521

} else if (throwable != null) {

522

CompletableFuture<T> failedFuture = new CompletableFuture<>();

523

failedFuture.completeExceptionally(throwable);

524

return failedFuture;

525

} else {

526

return CompletableFuture.completedFuture(result);

527

}

528

})

529

.thenCompose(Function.identity());

530

}

531

}

532

```

533

534

## Common Patterns

535

536

### RPC Service Lifecycle Management

537

538

```java

539

public class ClusterRpcManager {

540

private final List<RpcService> rpcServices = new ArrayList<>();

541

private final List<RpcEndpoint> rpcEndpoints = new ArrayList<>();

542

543

public RpcService createRpcService(String hostname, int port, Configuration config) throws Exception {

544

RpcService rpcService = RpcServiceUtils.createRpcService(hostname, port, config);

545

rpcServices.add(rpcService);

546

return rpcService;

547

}

548

549

public <T extends RpcEndpoint> T startRpcEndpoint(T endpoint) throws Exception {

550

endpoint.start();

551

rpcEndpoints.add(endpoint);

552

return endpoint;

553

}

554

555

public void shutdown() throws Exception {

556

// Stop all endpoints first

557

List<CompletableFuture<Void>> endpointFutures = rpcEndpoints.stream()

558

.map(RpcEndpoint::closeAsync)

559

.collect(Collectors.toList());

560

561

CompletableFuture.allOf(endpointFutures.toArray(new CompletableFuture[0])).get();

562

563

// Then stop all services

564

List<CompletableFuture<Void>> serviceFutures = rpcServices.stream()

565

.map(RpcService::stopService)

566

.collect(Collectors.toList());

567

568

CompletableFuture.allOf(serviceFutures.toArray(new CompletableFuture[0])).get();

569

}

570

}

571

```

572

573

### Timeout Handling

574

575

```java

576

public class TimeoutAwareRpcClient {

577

578

public <T> CompletableFuture<T> callWithTimeout(

579

CompletableFuture<T> rpcCall,

580

Duration timeout,

581

ScheduledExecutorService timeoutExecutor) {

582

583

CompletableFuture<T> timeoutFuture = new CompletableFuture<>();

584

585

// Set up timeout

586

ScheduledFuture<?> timeoutTask = timeoutExecutor.schedule(() -> {

587

timeoutFuture.completeExceptionally(

588

new TimeoutException("RPC call timed out after " + timeout)

589

);

590

}, timeout.toMillis(), TimeUnit.MILLISECONDS);

591

592

// Race between RPC call completion and timeout

593

rpcCall.whenComplete((result, throwable) -> {

594

timeoutTask.cancel(false);

595

if (throwable != null) {

596

timeoutFuture.completeExceptionally(throwable);

597

} else {

598

timeoutFuture.complete(result);

599

}

600

});

601

602

return timeoutFuture;

603

}

604

}

605

```