0
# RPC System Management
1
2
Core RPC system functionality for creating and configuring RPC services in both local and distributed environments. The RPC system provides the foundation for all distributed communication in Flink clusters.
3
4
## Capabilities
5
6
### PekkoRpcSystem
7
8
Main RPC system implementation that serves as the primary entry point for creating RPC services.
9
10
```java { .api }
11
/**
12
* RpcSystem implementation based on Pekko actor system.
13
* Provides methods to create local and remote RPC services.
14
*/
15
public class PekkoRpcSystem implements RpcSystem {
16
17
/**
18
* Creates a builder for local RPC service configuration.
19
* @param configuration Flink configuration object
20
* @return RpcServiceBuilder for local service setup
21
*/
22
public RpcServiceBuilder localServiceBuilder(Configuration configuration);
23
24
/**
25
* Creates a builder for remote RPC service configuration.
26
* @param configuration Flink configuration object
27
* @param externalAddress External address for the RPC service (nullable)
28
* @param externalPortRange External port range specification
29
* @return RpcServiceBuilder for remote service setup
30
*/
31
public RpcServiceBuilder remoteServiceBuilder(
32
Configuration configuration,
33
String externalAddress,
34
String externalPortRange
35
);
36
37
/**
38
* Extracts socket address from RPC URL.
39
* @param url RPC URL string
40
* @return InetSocketAddress extracted from the URL
41
* @throws Exception if URL is malformed or cannot be parsed
42
*/
43
public InetSocketAddress getInetSocketAddressFromRpcUrl(String url) throws Exception;
44
45
/**
46
* Constructs RPC URL from components.
47
* @param hostname Target hostname
48
* @param port Target port number
49
* @param endpointName Name of the RPC endpoint
50
* @param addressResolution Address resolution strategy
51
* @param config Flink configuration
52
* @return Constructed RPC URL string
53
* @throws UnknownHostException if hostname cannot be resolved
54
*/
55
public String getRpcUrl(
56
String hostname,
57
int port,
58
String endpointName,
59
AddressResolution addressResolution,
60
Configuration config
61
) throws UnknownHostException;
62
63
/**
64
* Gets maximum message size from configuration.
65
* @param config Flink configuration object
66
* @return Maximum message size in bytes
67
*/
68
public long getMaximumMessageSizeInBytes(Configuration config);
69
}
70
```
71
72
**Usage Examples:**
73
74
```java
75
import org.apache.flink.configuration.Configuration;
76
import org.apache.flink.runtime.rpc.pekko.PekkoRpcSystem;
77
import org.apache.flink.runtime.rpc.RpcService;
78
79
// Create RPC system
80
PekkoRpcSystem rpcSystem = new PekkoRpcSystem();
81
82
// Create local RPC service for single-node scenarios
83
Configuration localConfig = new Configuration();
84
RpcService localService = rpcSystem.localServiceBuilder(localConfig).createAndStart();
85
86
// Create remote RPC service for distributed clusters
87
Configuration remoteConfig = new Configuration();
88
String externalAddress = "192.168.1.100";
89
String portRange = "6123-6130";
90
RpcService remoteService = rpcSystem.remoteServiceBuilder(
91
remoteConfig, externalAddress, portRange
92
).createAndStart();
93
94
// Parse RPC URL to get socket address
95
String rpcUrl = "pekko://flink@192.168.1.100:6123/user/jobmanager";
96
InetSocketAddress address = rpcSystem.getInetSocketAddressFromRpcUrl(rpcUrl);
97
```
98
99
### PekkoRpcService
100
101
Core Pekko-based RPC service implementation that manages connections, endpoints, and the underlying actor system.
102
103
```java { .api }
104
/**
105
* Pekko-based RPC service implementation.
106
* Manages RPC endpoints, connections, and the underlying actor system.
107
*/
108
public class PekkoRpcService implements RpcService {
109
110
/**
111
* Constructor for PekkoRpcService.
112
* @param actorSystem Underlying Pekko actor system
113
* @param configuration Service configuration parameters
114
*/
115
public PekkoRpcService(ActorSystem actorSystem, PekkoRpcServiceConfiguration configuration);
116
117
/**
118
* Returns the underlying Pekko actor system.
119
* @return ActorSystem instance used by this service
120
*/
121
public ActorSystem getActorSystem();
122
123
/**
124
* Gets the address of this RPC service.
125
* @return Service address as string
126
*/
127
public String getAddress();
128
129
/**
130
* Gets the port number of this RPC service.
131
* @return Port number
132
*/
133
public int getPort();
134
135
/**
136
* Creates a self gateway for the given RPC server.
137
* @param selfGatewayType Type of the gateway interface
138
* @param rpcServer RPC server instance
139
* @return Gateway instance for self-communication
140
*/
141
public <C extends RpcGateway> C getSelfGateway(Class<C> selfGatewayType, RpcServer rpcServer);
142
143
/**
144
* Connects to a remote RPC endpoint.
145
* @param address Remote endpoint address
146
* @param clazz Gateway interface class
147
* @return CompletableFuture containing the connected gateway
148
*/
149
public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz);
150
151
/**
152
* Connects to a fenced remote RPC endpoint.
153
* @param address Remote endpoint address
154
* @param fencingToken Token for fenced communication
155
* @param clazz Gateway interface class
156
* @return CompletableFuture containing the connected fenced gateway
157
*/
158
public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(
159
String address, F fencingToken, Class<C> clazz
160
);
161
162
/**
163
* Starts an RPC server for the given endpoint.
164
* @param rpcEndpoint RPC endpoint implementation
165
* @param loggingContext Logging context for the server
166
* @return RpcServer instance managing the endpoint
167
*/
168
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(
169
C rpcEndpoint, Map<String, String> loggingContext
170
);
171
172
/**
173
* Stops the specified RPC server.
174
* @param selfGateway RPC server to stop
175
*/
176
public void stopServer(RpcServer selfGateway);
177
178
/**
179
* Closes the RPC service asynchronously.
180
* @return CompletableFuture indicating completion
181
*/
182
public CompletableFuture<Void> closeAsync();
183
184
/**
185
* Gets the scheduled executor for this service.
186
* @return ScheduledExecutor instance
187
*/
188
public ScheduledExecutor getScheduledExecutor();
189
}
190
```
191
192
### PekkoRpcServiceUtils
193
194
Utility class providing helper methods for RPC URL construction and service configuration.
195
196
```java { .api }
197
/**
198
* Utility methods for RPC service operations and URL handling.
199
*/
200
public class PekkoRpcServiceUtils {
201
202
/**
203
* Constructs RPC URL from components.
204
* @param hostname Target hostname
205
* @param port Target port number
206
* @param endpointName Name of the RPC endpoint
207
* @param addressResolution Address resolution strategy
208
* @param config Flink configuration
209
* @return Constructed RPC URL string
210
* @throws UnknownHostException if hostname cannot be resolved
211
*/
212
public static String getRpcUrl(
213
String hostname,
214
int port,
215
String endpointName,
216
AddressResolution addressResolution,
217
Configuration config
218
) throws UnknownHostException;
219
220
/**
221
* Constructs RPC URL with explicit protocol specification.
222
* @param hostname Target hostname
223
* @param port Target port number
224
* @param endpointName Name of the RPC endpoint
225
* @param addressResolution Address resolution strategy
226
* @param protocol Communication protocol (TCP, SSL_TCP)
227
* @return Constructed RPC URL string
228
* @throws UnknownHostException if hostname cannot be resolved
229
*/
230
public static String getRpcUrl(
231
String hostname,
232
int port,
233
String endpointName,
234
AddressResolution addressResolution,
235
Protocol protocol
236
) throws UnknownHostException;
237
238
/**
239
* Gets local RPC URL for the specified endpoint.
240
* @param endpointName Name of the endpoint
241
* @return Local RPC URL string
242
*/
243
public static String getLocalRpcUrl(String endpointName);
244
245
/**
246
* Checks if exception indicates recipient termination.
247
* @param exception Exception to check
248
* @return true if exception is recipient terminated
249
*/
250
public static boolean isRecipientTerminatedException(Throwable exception);
251
252
/**
253
* Extracts maximum frame size from configuration.
254
* @param configuration Flink configuration object
255
* @return Maximum frame size in bytes
256
*/
257
public static long extractMaximumFramesize(Configuration configuration);
258
}
259
260
/**
261
* Communication protocol enumeration for RPC connections.
262
*/
263
public enum Protocol {
264
TCP, // Standard TCP communication
265
SSL_TCP // SSL-encrypted TCP communication
266
}
267
```
268
269
**Advanced Usage Examples:**
270
271
```java
272
import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils;
273
import org.apache.flink.runtime.rpc.AddressResolution;
274
275
// Create RPC URLs for different scenarios
276
String localUrl = PekkoRpcServiceUtils.getLocalRpcUrl("taskmanager");
277
278
String remoteUrl = PekkoRpcServiceUtils.getRpcUrl(
279
"cluster-node-1",
280
6123,
281
"jobmanager",
282
AddressResolution.TRY_ADDRESS_RESOLUTION,
283
config
284
);
285
286
String secureUrl = PekkoRpcServiceUtils.getRpcUrl(
287
"secure-cluster-node",
288
6124,
289
"jobmanager",
290
AddressResolution.TRY_ADDRESS_RESOLUTION,
291
PekkoRpcServiceUtils.Protocol.SSL_TCP
292
);
293
294
// Check for connection issues
295
try {
296
// ... RPC call
297
} catch (Exception e) {
298
if (PekkoRpcServiceUtils.isRecipientTerminatedException(e)) {
299
// Handle recipient termination
300
logger.warn("RPC recipient has terminated");
301
}
302
}
303
304
// Configure service with custom settings
305
PekkoRpcServiceConfiguration config = PekkoRpcServiceConfiguration
306
.fromConfiguration(flinkConfig)
307
.withTimeout(Duration.ofSeconds(30))
308
.withMaximumFramesize(16 * 1024 * 1024); // 16MB
309
```