0
# RPC Framework
1
2
The RPC (Remote Procedure Call) Framework provides the communication infrastructure for distributed components in the Flink cluster. It enables reliable, asynchronous communication between JobManagers, TaskManagers, and client applications across the cluster network.
3
4
## Core RPC Services
5
6
### RpcService
7
8
The main RPC service interface that manages remote communication endpoints and connections.
9
10
```java { .api }
11
public interface RpcService extends AutoCloseable {
12
<C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz);
13
<F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(
14
String address, F fencingToken, Class<C> clazz);
15
16
void stopServer(RpcEndpoint endpoint);
17
18
CompletableFuture<Void> stopService();
19
20
Executor getExecutor();
21
ScheduledExecutor getScheduledExecutor();
22
23
void executeRunnable(Runnable runnable);
24
void execute(Runnable runnable);
25
26
<T> CompletableFuture<T> execute(Callable<T> callable);
27
ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);
28
}
29
```
30
31
### RpcEndpoint
32
33
Abstract base class for all RPC endpoints in the Flink cluster. Components extend this class to expose RPC interfaces.
34
35
```java { .api }
36
public abstract class RpcEndpoint implements AutoCloseable {
37
protected RpcEndpoint(RpcService rpcService);
38
protected RpcEndpoint(RpcService rpcService, String endpointId);
39
40
public void start() throws Exception;
41
public CompletableFuture<Void> closeAsync();
42
43
@Override
44
public final void close() throws Exception;
45
46
protected final String getAddress();
47
protected final String getEndpointId();
48
protected final RpcService getRpcService();
49
50
protected final <C extends RpcGateway> CompletableFuture<C> connectTo(String address, Class<C> clazz);
51
protected final <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connectTo(
52
String address, F fencingToken, Class<C> clazz);
53
54
protected final void scheduleRunAsync(Runnable runnable, long delay, TimeUnit timeUnit);
55
protected final ScheduledFuture<?> scheduleRunAsync(Runnable runnable, Time delay);
56
57
protected final <V> CompletableFuture<V> callAsync(Callable<V> callable, Time timeout);
58
59
protected void onStart() throws Exception;
60
protected CompletableFuture<Void> onStop();
61
62
protected void validateRunsInMainThread();
63
protected void runAsync(Runnable runnable);
64
}
65
```
66
67
## Gateway Interfaces
68
69
### RpcGateway
70
71
Base interface for all RPC gateway implementations. Gateways provide client-side access to remote RPC endpoints.
72
73
```java { .api }
74
public interface RpcGateway {
75
String getAddress();
76
String getHostname();
77
}
78
```
79
80
### FencedRpcGateway
81
82
Extended RPC gateway interface that includes fencing tokens for leader election scenarios.
83
84
```java { .api }
85
public interface FencedRpcGateway<F extends Serializable> extends RpcGateway {
86
F getFencingToken();
87
}
88
```
89
90
### RpcServer
91
92
Interface representing the server-side RPC endpoint that can be stopped and provides address information.
93
94
```java { .api }
95
public interface RpcServer extends AutoCloseable {
96
void start() throws Exception;
97
98
@Override
99
void close() throws Exception;
100
101
String getAddress();
102
int getPort();
103
104
CompletableFuture<Void> getTerminationFuture();
105
}
106
```
107
108
## RPC Method Annotations
109
110
### RpcMethod
111
112
Annotation to mark methods as RPC-callable with timeout specifications.
113
114
```java { .api }
115
@Target(ElementType.METHOD)
116
@Retention(RetentionPolicy.RUNTIME)
117
public @interface RpcMethod {
118
/**
119
* Timeout for the RPC call in milliseconds.
120
*/
121
long timeout() default -1L;
122
}
123
```
124
125
### RpcTimeout
126
127
Annotation to specify timeout for RPC calls at the parameter level.
128
129
```java { .api }
130
@Target(ElementType.PARAMETER)
131
@Retention(RetentionPolicy.RUNTIME)
132
public @interface RpcTimeout {
133
// Marker annotation for timeout parameters
134
}
135
```
136
137
## Exception Handling
138
139
### RpcException
140
141
Base exception class for RPC-related failures.
142
143
```java { .api }
144
public class RpcException extends FlinkException {
145
public RpcException(String message);
146
public RpcException(String message, Throwable cause);
147
}
148
```
149
150
### RpcConnectionException
151
152
Exception thrown when RPC connection establishment fails.
153
154
```java { .api }
155
public class RpcConnectionException extends RpcException {
156
public RpcConnectionException(String message);
157
public RpcConnectionException(String message, Throwable cause);
158
159
public RpcConnectionException(String targetAddress, Class<?> rpcGatewayClass, Throwable cause);
160
161
public String getTargetAddress();
162
public Class<?> getRpcGatewayClass();
163
}
164
```
165
166
### RpcRuntimeException
167
168
Runtime exception for RPC failures that don't require explicit handling.
169
170
```java { .api }
171
public class RpcRuntimeException extends FlinkRuntimeException {
172
public RpcRuntimeException(String message);
173
public RpcRuntimeException(String message, Throwable cause);
174
}
175
```
176
177
## Factory and Utils
178
179
### RpcServiceUtils
180
181
Utility class providing factory methods and helper functions for RPC services.
182
183
```java { .api }
184
public class RpcServiceUtils {
185
public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception;
186
187
public static RpcService createRpcService(String hostname, int port, Configuration configuration,
188
HighAvailabilityServices highAvailabilityServices) throws Exception;
189
190
public static int getRandomPort();
191
192
public static String createWildcardAddress();
193
public static String getHostname(RpcService rpcService);
194
195
public static CompletableFuture<Void> terminateRpcEndpoint(RpcEndpoint rpcEndpoint, Time timeout);
196
public static CompletableFuture<Void> terminateRpcService(RpcService rpcService, Time timeout);
197
198
public static CompletableFuture<Void> terminateRpcServices(Time timeout, RpcService... rpcServices);
199
200
public static <T extends RpcGateway> T getSynchronousRpcGateway(
201
T rpcGateway,
202
Class<T> rpcGatewayClass,
203
Time timeout
204
);
205
}
206
```
207
208
## Configuration Options
209
210
### RPC Configuration
211
212
Configuration options for customizing RPC behavior and networking.
213
214
```java { .api }
215
public class RpcOptions {
216
public static final ConfigOption<String> RPC_BIND_ADDRESS =
217
key("rpc.bind-address").defaultValue("");
218
219
public static final ConfigOption<Integer> RPC_PORT =
220
key("rpc.port").defaultValue(0);
221
222
public static final ConfigOption<Duration> RPC_ASK_TIMEOUT =
223
key("rpc.ask-timeout").defaultValue(Duration.ofSeconds(10));
224
225
public static final ConfigOption<Duration> RPC_LOOKUP_TIMEOUT =
226
key("rpc.lookup-timeout").defaultValue(Duration.ofSeconds(10));
227
228
public static final ConfigOption<Integer> RPC_CONNECT_RETRIES =
229
key("rpc.connect.retries").defaultValue(5);
230
231
public static final ConfigOption<Duration> RPC_CONNECT_RETRY_DELAY =
232
key("rpc.connect.retry-delay").defaultValue(Duration.ofSeconds(1));
233
234
public static final ConfigOption<Boolean> RPC_SSL_ENABLED =
235
key("rpc.ssl.enabled").defaultValue(false);
236
}
237
```
238
239
## Usage Examples
240
241
### Creating an RPC Endpoint
242
243
```java
244
import org.apache.flink.runtime.rpc.RpcEndpoint;
245
import org.apache.flink.runtime.rpc.RpcService;
246
import org.apache.flink.runtime.rpc.RpcMethod;
247
248
// Define the RPC gateway interface
249
public interface TaskManagerGateway extends RpcGateway {
250
CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, @RpcTimeout Time timeout);
251
CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
252
CompletableFuture<TaskExecutorInfo> requestTaskExecutorInfo(@RpcTimeout Time timeout);
253
}
254
255
// Implement the RPC endpoint
256
public class TaskManagerRpcEndpoint extends RpcEndpoint implements TaskManagerGateway {
257
private final TaskManager taskManager;
258
259
public TaskManagerRpcEndpoint(RpcService rpcService, TaskManager taskManager) {
260
super(rpcService, "TaskManager");
261
this.taskManager = taskManager;
262
}
263
264
@Override
265
protected void onStart() throws Exception {
266
System.out.println("TaskManager RPC endpoint started at: " + getAddress());
267
}
268
269
@Override
270
@RpcMethod(timeout = 30000L) // 30 second timeout
271
public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
272
return CompletableFuture.supplyAsync(() -> {
273
try {
274
taskManager.submitTask(tdd);
275
return Acknowledge.get();
276
} catch (Exception e) {
277
throw new CompletionException(e);
278
}
279
}, getRpcService().getExecutor());
280
}
281
282
@Override
283
@RpcMethod(timeout = 10000L) // 10 second timeout
284
public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
285
return CompletableFuture.supplyAsync(() -> {
286
try {
287
taskManager.cancelTask(executionAttemptID);
288
return Acknowledge.get();
289
} catch (Exception e) {
290
throw new CompletionException(e);
291
}
292
}, getRpcService().getExecutor());
293
}
294
295
@Override
296
@RpcMethod(timeout = 5000L) // 5 second timeout
297
public CompletableFuture<TaskExecutorInfo> requestTaskExecutorInfo(Time timeout) {
298
return CompletableFuture.supplyAsync(() -> {
299
return taskManager.getTaskExecutorInfo();
300
}, getRpcService().getExecutor());
301
}
302
303
@Override
304
protected CompletableFuture<Void> onStop() {
305
System.out.println("TaskManager RPC endpoint stopping");
306
return CompletableFuture.completedFuture(null);
307
}
308
}
309
```
310
311
### Setting Up RPC Service
312
313
```java
314
import org.apache.flink.runtime.rpc.RpcService;
315
import org.apache.flink.runtime.rpc.RpcServiceUtils;
316
import org.apache.flink.configuration.Configuration;
317
318
// Create RPC service configuration
319
Configuration config = new Configuration();
320
config.setString("rpc.bind-address", "localhost");
321
config.setInteger("rpc.port", 6123);
322
config.setString("rpc.ask-timeout", "10 s");
323
324
// Create RPC service
325
RpcService rpcService = RpcServiceUtils.createRpcService("localhost", 6123, config);
326
327
try {
328
// Start TaskManager RPC endpoint
329
TaskManager taskManager = new TaskManager();
330
TaskManagerRpcEndpoint taskManagerEndpoint = new TaskManagerRpcEndpoint(rpcService, taskManager);
331
taskManagerEndpoint.start();
332
333
System.out.println("TaskManager available at: " + taskManagerEndpoint.getAddress());
334
335
// Keep service running
336
Thread.sleep(60000);
337
338
} finally {
339
// Clean shutdown
340
rpcService.stopService().get();
341
}
342
```
343
344
### Connecting to Remote RPC Endpoints
345
346
```java
347
import org.apache.flink.runtime.rpc.RpcService;
348
import org.apache.flink.util.concurrent.FutureUtils;
349
350
public class JobManagerClient {
351
private final RpcService rpcService;
352
353
public JobManagerClient(RpcService rpcService) {
354
this.rpcService = rpcService;
355
}
356
357
public void connectAndSubmitJob(String taskManagerAddress, JobGraph jobGraph) {
358
// Connect to remote TaskManager
359
CompletableFuture<TaskManagerGateway> connectionFuture =
360
rpcService.connect(taskManagerAddress, TaskManagerGateway.class);
361
362
connectionFuture.thenCompose(taskManagerGateway -> {
363
System.out.println("Connected to TaskManager at: " + taskManagerGateway.getAddress());
364
365
// Submit tasks to TaskManager
366
List<CompletableFuture<Acknowledge>> taskFutures = new ArrayList<>();
367
368
for (JobVertex vertex : jobGraph.getVertices()) {
369
TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(vertex);
370
CompletableFuture<Acknowledge> taskFuture = taskManagerGateway.submitTask(
371
tdd,
372
Time.seconds(30)
373
);
374
taskFutures.add(taskFuture);
375
}
376
377
// Wait for all tasks to be submitted
378
return FutureUtils.waitForAll(taskFutures);
379
380
}).whenComplete((result, throwable) -> {
381
if (throwable != null) {
382
System.err.println("Failed to submit tasks: " + throwable.getMessage());
383
} else {
384
System.out.println("All tasks submitted successfully");
385
}
386
});
387
}
388
389
private TaskDeploymentDescriptor createTaskDeploymentDescriptor(JobVertex vertex) {
390
// Create task deployment descriptor from job vertex
391
return new TaskDeploymentDescriptor(/* ... */);
392
}
393
}
394
```
395
396
### Fenced RPC with Leader Election
397
398
```java
399
import org.apache.flink.runtime.rpc.FencedRpcGateway;
400
import java.util.UUID;
401
402
// Define fenced RPC gateway for leader election scenarios
403
public interface ResourceManagerGateway extends FencedRpcGateway<UUID> {
404
CompletableFuture<RegistrationResponse> registerTaskManager(
405
UUID leaderSessionId,
406
TaskManagerRegistration registration,
407
@RpcTimeout Time timeout
408
);
409
}
410
411
// Implement fenced RPC endpoint
412
public class ResourceManagerRpcEndpoint extends RpcEndpoint implements ResourceManagerGateway {
413
private volatile UUID leaderSessionId;
414
private volatile boolean isLeader = false;
415
416
public ResourceManagerRpcEndpoint(RpcService rpcService) {
417
super(rpcService, "ResourceManager");
418
}
419
420
@Override
421
public UUID getFencingToken() {
422
return leaderSessionId;
423
}
424
425
@Override
426
@RpcMethod(timeout = 15000L)
427
public CompletableFuture<RegistrationResponse> registerTaskManager(
428
UUID leaderSessionId,
429
TaskManagerRegistration registration,
430
Time timeout) {
431
432
return CompletableFuture.supplyAsync(() -> {
433
// Verify fencing token
434
if (!isLeader || !Objects.equals(this.leaderSessionId, leaderSessionId)) {
435
throw new CompletionException(new RpcException("Invalid leader session ID"));
436
}
437
438
// Process TaskManager registration
439
return processTaskManagerRegistration(registration);
440
441
}, getRpcService().getExecutor());
442
}
443
444
public void becomeLeader(UUID newLeaderSessionId) {
445
runAsync(() -> {
446
this.leaderSessionId = newLeaderSessionId;
447
this.isLeader = true;
448
System.out.println("Became leader with session ID: " + newLeaderSessionId);
449
});
450
}
451
452
public void revokeLeadership() {
453
runAsync(() -> {
454
this.isLeader = false;
455
this.leaderSessionId = null;
456
System.out.println("Leadership revoked");
457
});
458
}
459
460
private RegistrationResponse processTaskManagerRegistration(TaskManagerRegistration registration) {
461
// Implementation for processing registration
462
return new RegistrationResponse.Success();
463
}
464
}
465
```
466
467
### Error Handling and Retries
468
469
```java
470
public class RobustRpcClient {
471
private final RpcService rpcService;
472
private final ScheduledExecutorService retryExecutor;
473
474
public RobustRpcClient(RpcService rpcService) {
475
this.rpcService = rpcService;
476
this.retryExecutor = Executors.newScheduledThreadPool(1);
477
}
478
479
public <T> CompletableFuture<T> callWithRetry(
480
String address,
481
Class<? extends RpcGateway> gatewayClass,
482
Function<RpcGateway, CompletableFuture<T>> rpcCall,
483
int maxRetries) {
484
485
return callWithRetryInternal(address, gatewayClass, rpcCall, maxRetries, 0);
486
}
487
488
private <T> CompletableFuture<T> callWithRetryInternal(
489
String address,
490
Class<? extends RpcGateway> gatewayClass,
491
Function<RpcGateway, CompletableFuture<T>> rpcCall,
492
int maxRetries,
493
int currentAttempt) {
494
495
return rpcService.connect(address, gatewayClass)
496
.thenCompose(gateway -> {
497
return rpcCall.apply(gateway);
498
})
499
.handle((result, throwable) -> {
500
if (throwable != null && currentAttempt < maxRetries) {
501
System.out.println("RPC call failed (attempt " + (currentAttempt + 1) +
502
"/" + maxRetries + "), retrying...");
503
504
// Exponential backoff
505
long delay = (long) Math.pow(2, currentAttempt) * 1000;
506
507
CompletableFuture<T> retryFuture = new CompletableFuture<>();
508
retryExecutor.schedule(() -> {
509
callWithRetryInternal(address, gatewayClass, rpcCall, maxRetries, currentAttempt + 1)
510
.whenComplete((retryResult, retryThrowable) -> {
511
if (retryThrowable != null) {
512
retryFuture.completeExceptionally(retryThrowable);
513
} else {
514
retryFuture.complete(retryResult);
515
}
516
});
517
}, delay, TimeUnit.MILLISECONDS);
518
519
return retryFuture;
520
521
} else if (throwable != null) {
522
CompletableFuture<T> failedFuture = new CompletableFuture<>();
523
failedFuture.completeExceptionally(throwable);
524
return failedFuture;
525
} else {
526
return CompletableFuture.completedFuture(result);
527
}
528
})
529
.thenCompose(Function.identity());
530
}
531
}
532
```
533
534
## Common Patterns
535
536
### RPC Service Lifecycle Management
537
538
```java
539
public class ClusterRpcManager {
540
private final List<RpcService> rpcServices = new ArrayList<>();
541
private final List<RpcEndpoint> rpcEndpoints = new ArrayList<>();
542
543
public RpcService createRpcService(String hostname, int port, Configuration config) throws Exception {
544
RpcService rpcService = RpcServiceUtils.createRpcService(hostname, port, config);
545
rpcServices.add(rpcService);
546
return rpcService;
547
}
548
549
public <T extends RpcEndpoint> T startRpcEndpoint(T endpoint) throws Exception {
550
endpoint.start();
551
rpcEndpoints.add(endpoint);
552
return endpoint;
553
}
554
555
public void shutdown() throws Exception {
556
// Stop all endpoints first
557
List<CompletableFuture<Void>> endpointFutures = rpcEndpoints.stream()
558
.map(RpcEndpoint::closeAsync)
559
.collect(Collectors.toList());
560
561
CompletableFuture.allOf(endpointFutures.toArray(new CompletableFuture[0])).get();
562
563
// Then stop all services
564
List<CompletableFuture<Void>> serviceFutures = rpcServices.stream()
565
.map(RpcService::stopService)
566
.collect(Collectors.toList());
567
568
CompletableFuture.allOf(serviceFutures.toArray(new CompletableFuture[0])).get();
569
}
570
}
571
```
572
573
### Timeout Handling
574
575
```java
576
public class TimeoutAwareRpcClient {
577
578
public <T> CompletableFuture<T> callWithTimeout(
579
CompletableFuture<T> rpcCall,
580
Duration timeout,
581
ScheduledExecutorService timeoutExecutor) {
582
583
CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
584
585
// Set up timeout
586
ScheduledFuture<?> timeoutTask = timeoutExecutor.schedule(() -> {
587
timeoutFuture.completeExceptionally(
588
new TimeoutException("RPC call timed out after " + timeout)
589
);
590
}, timeout.toMillis(), TimeUnit.MILLISECONDS);
591
592
// Race between RPC call completion and timeout
593
rpcCall.whenComplete((result, throwable) -> {
594
timeoutTask.cancel(false);
595
if (throwable != null) {
596
timeoutFuture.completeExceptionally(throwable);
597
} else {
598
timeoutFuture.complete(result);
599
}
600
});
601
602
return timeoutFuture;
603
}
604
}
605
```