or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actor-system.mdconcurrent-utilities.mdexceptions.mdindex.mdrpc-configuration.mdrpc-system.md

rpc-system.mddocs/

0

# RPC System Management

1

2

Core RPC system functionality for creating and configuring RPC services in both local and distributed environments. The RPC system provides the foundation for all distributed communication in Flink clusters.

3

4

## Capabilities

5

6

### PekkoRpcSystem

7

8

Main RPC system implementation that serves as the primary entry point for creating RPC services.

9

10

```java { .api }

11

/**

12

* RpcSystem implementation based on Pekko actor system.

13

* Provides methods to create local and remote RPC services.

14

*/

15

public class PekkoRpcSystem implements RpcSystem {

16

17

/**

18

* Creates a builder for local RPC service configuration.

19

* @param configuration Flink configuration object

20

* @return RpcServiceBuilder for local service setup

21

*/

22

public RpcServiceBuilder localServiceBuilder(Configuration configuration);

23

24

/**

25

* Creates a builder for remote RPC service configuration.

26

* @param configuration Flink configuration object

27

* @param externalAddress External address for the RPC service (nullable)

28

* @param externalPortRange External port range specification

29

* @return RpcServiceBuilder for remote service setup

30

*/

31

public RpcServiceBuilder remoteServiceBuilder(

32

Configuration configuration,

33

String externalAddress,

34

String externalPortRange

35

);

36

37

/**

38

* Extracts socket address from RPC URL.

39

* @param url RPC URL string

40

* @return InetSocketAddress extracted from the URL

41

* @throws Exception if URL is malformed or cannot be parsed

42

*/

43

public InetSocketAddress getInetSocketAddressFromRpcUrl(String url) throws Exception;

44

45

/**

46

* Constructs RPC URL from components.

47

* @param hostname Target hostname

48

* @param port Target port number

49

* @param endpointName Name of the RPC endpoint

50

* @param addressResolution Address resolution strategy

51

* @param config Flink configuration

52

* @return Constructed RPC URL string

53

* @throws UnknownHostException if hostname cannot be resolved

54

*/

55

public String getRpcUrl(

56

String hostname,

57

int port,

58

String endpointName,

59

AddressResolution addressResolution,

60

Configuration config

61

) throws UnknownHostException;

62

63

/**

64

* Gets maximum message size from configuration.

65

* @param config Flink configuration object

66

* @return Maximum message size in bytes

67

*/

68

public long getMaximumMessageSizeInBytes(Configuration config);

69

}

70

```

71

72

**Usage Examples:**

73

74

```java

75

import org.apache.flink.configuration.Configuration;

76

import org.apache.flink.runtime.rpc.pekko.PekkoRpcSystem;

77

import org.apache.flink.runtime.rpc.RpcService;

78

79

// Create RPC system

80

PekkoRpcSystem rpcSystem = new PekkoRpcSystem();

81

82

// Create local RPC service for single-node scenarios

83

Configuration localConfig = new Configuration();

84

RpcService localService = rpcSystem.localServiceBuilder(localConfig).createAndStart();

85

86

// Create remote RPC service for distributed clusters

87

Configuration remoteConfig = new Configuration();

88

String externalAddress = "192.168.1.100";

89

String portRange = "6123-6130";

90

RpcService remoteService = rpcSystem.remoteServiceBuilder(

91

remoteConfig, externalAddress, portRange

92

).createAndStart();

93

94

// Parse RPC URL to get socket address

95

String rpcUrl = "pekko://flink@192.168.1.100:6123/user/jobmanager";

96

InetSocketAddress address = rpcSystem.getInetSocketAddressFromRpcUrl(rpcUrl);

97

```

98

99

### PekkoRpcService

100

101

Core Pekko-based RPC service implementation that manages connections, endpoints, and the underlying actor system.

102

103

```java { .api }

104

/**

105

* Pekko-based RPC service implementation.

106

* Manages RPC endpoints, connections, and the underlying actor system.

107

*/

108

public class PekkoRpcService implements RpcService {

109

110

/**

111

* Constructor for PekkoRpcService.

112

* @param actorSystem Underlying Pekko actor system

113

* @param configuration Service configuration parameters

114

*/

115

public PekkoRpcService(ActorSystem actorSystem, PekkoRpcServiceConfiguration configuration);

116

117

/**

118

* Returns the underlying Pekko actor system.

119

* @return ActorSystem instance used by this service

120

*/

121

public ActorSystem getActorSystem();

122

123

/**

124

* Gets the address of this RPC service.

125

* @return Service address as string

126

*/

127

public String getAddress();

128

129

/**

130

* Gets the port number of this RPC service.

131

* @return Port number

132

*/

133

public int getPort();

134

135

/**

136

* Creates a self gateway for the given RPC server.

137

* @param selfGatewayType Type of the gateway interface

138

* @param rpcServer RPC server instance

139

* @return Gateway instance for self-communication

140

*/

141

public <C extends RpcGateway> C getSelfGateway(Class<C> selfGatewayType, RpcServer rpcServer);

142

143

/**

144

* Connects to a remote RPC endpoint.

145

* @param address Remote endpoint address

146

* @param clazz Gateway interface class

147

* @return CompletableFuture containing the connected gateway

148

*/

149

public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz);

150

151

/**

152

* Connects to a fenced remote RPC endpoint.

153

* @param address Remote endpoint address

154

* @param fencingToken Token for fenced communication

155

* @param clazz Gateway interface class

156

* @return CompletableFuture containing the connected fenced gateway

157

*/

158

public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(

159

String address, F fencingToken, Class<C> clazz

160

);

161

162

/**

163

* Starts an RPC server for the given endpoint.

164

* @param rpcEndpoint RPC endpoint implementation

165

* @param loggingContext Logging context for the server

166

* @return RpcServer instance managing the endpoint

167

*/

168

public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(

169

C rpcEndpoint, Map<String, String> loggingContext

170

);

171

172

/**

173

* Stops the specified RPC server.

174

* @param selfGateway RPC server to stop

175

*/

176

public void stopServer(RpcServer selfGateway);

177

178

/**

179

* Closes the RPC service asynchronously.

180

* @return CompletableFuture indicating completion

181

*/

182

public CompletableFuture<Void> closeAsync();

183

184

/**

185

* Gets the scheduled executor for this service.

186

* @return ScheduledExecutor instance

187

*/

188

public ScheduledExecutor getScheduledExecutor();

189

}

190

```

191

192

### PekkoRpcServiceUtils

193

194

Utility class providing helper methods for RPC URL construction and service configuration.

195

196

```java { .api }

197

/**

198

* Utility methods for RPC service operations and URL handling.

199

*/

200

public class PekkoRpcServiceUtils {

201

202

/**

203

* Constructs RPC URL from components.

204

* @param hostname Target hostname

205

* @param port Target port number

206

* @param endpointName Name of the RPC endpoint

207

* @param addressResolution Address resolution strategy

208

* @param config Flink configuration

209

* @return Constructed RPC URL string

210

* @throws UnknownHostException if hostname cannot be resolved

211

*/

212

public static String getRpcUrl(

213

String hostname,

214

int port,

215

String endpointName,

216

AddressResolution addressResolution,

217

Configuration config

218

) throws UnknownHostException;

219

220

/**

221

* Constructs RPC URL with explicit protocol specification.

222

* @param hostname Target hostname

223

* @param port Target port number

224

* @param endpointName Name of the RPC endpoint

225

* @param addressResolution Address resolution strategy

226

* @param protocol Communication protocol (TCP, SSL_TCP)

227

* @return Constructed RPC URL string

228

* @throws UnknownHostException if hostname cannot be resolved

229

*/

230

public static String getRpcUrl(

231

String hostname,

232

int port,

233

String endpointName,

234

AddressResolution addressResolution,

235

Protocol protocol

236

) throws UnknownHostException;

237

238

/**

239

* Gets local RPC URL for the specified endpoint.

240

* @param endpointName Name of the endpoint

241

* @return Local RPC URL string

242

*/

243

public static String getLocalRpcUrl(String endpointName);

244

245

/**

246

* Checks if exception indicates recipient termination.

247

* @param exception Exception to check

248

* @return true if exception is recipient terminated

249

*/

250

public static boolean isRecipientTerminatedException(Throwable exception);

251

252

/**

253

* Extracts maximum frame size from configuration.

254

* @param configuration Flink configuration object

255

* @return Maximum frame size in bytes

256

*/

257

public static long extractMaximumFramesize(Configuration configuration);

258

}

259

260

/**

261

* Communication protocol enumeration for RPC connections.

262

*/

263

public enum Protocol {

264

TCP, // Standard TCP communication

265

SSL_TCP // SSL-encrypted TCP communication

266

}

267

```

268

269

**Advanced Usage Examples:**

270

271

```java

272

import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils;

273

import org.apache.flink.runtime.rpc.AddressResolution;

274

275

// Create RPC URLs for different scenarios

276

String localUrl = PekkoRpcServiceUtils.getLocalRpcUrl("taskmanager");

277

278

String remoteUrl = PekkoRpcServiceUtils.getRpcUrl(

279

"cluster-node-1",

280

6123,

281

"jobmanager",

282

AddressResolution.TRY_ADDRESS_RESOLUTION,

283

config

284

);

285

286

String secureUrl = PekkoRpcServiceUtils.getRpcUrl(

287

"secure-cluster-node",

288

6124,

289

"jobmanager",

290

AddressResolution.TRY_ADDRESS_RESOLUTION,

291

PekkoRpcServiceUtils.Protocol.SSL_TCP

292

);

293

294

// Check for connection issues

295

try {

296

// ... RPC call

297

} catch (Exception e) {

298

if (PekkoRpcServiceUtils.isRecipientTerminatedException(e)) {

299

// Handle recipient termination

300

logger.warn("RPC recipient has terminated");

301

}

302

}

303

304

// Configure service with custom settings

305

PekkoRpcServiceConfiguration config = PekkoRpcServiceConfiguration

306

.fromConfiguration(flinkConfig)

307

.withTimeout(Duration.ofSeconds(30))

308

.withMaximumFramesize(16 * 1024 * 1024); // 16MB

309

```