or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-rpc-akka

Pekko-based RPC implementation for Apache Flink's distributed computing framework

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-rpc-akka@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-rpc-akka@2.1.0

0

# Flink RPC Akka

1

2

Flink RPC Akka provides a Pekko-based RPC (Remote Procedure Call) implementation for Apache Flink's distributed computing framework. It serves as a critical communication layer that enables different components of Flink clusters to communicate across network boundaries using the Pekko actor system (Apache's fork of Akka).

3

4

## Package Information

5

6

- **Package Name**: flink-rpc-akka

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**: Add dependency to your Maven `pom.xml`:

10

11

```xml

12

<dependency>

13

<groupId>org.apache.flink</groupId>

14

<artifactId>flink-rpc-akka</artifactId>

15

<version>2.1.0</version>

16

</dependency>

17

```

18

19

## Core Imports

20

21

```java

22

import org.apache.flink.runtime.rpc.pekko.PekkoRpcSystem;

23

import org.apache.flink.runtime.rpc.pekko.PekkoRpcService;

24

import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceConfiguration;

25

import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils;

26

import org.apache.flink.runtime.rpc.pekko.PekkoUtils;

27

import org.apache.flink.runtime.rpc.pekko.ActorSystemBootstrapTools;

28

import org.apache.flink.runtime.rpc.pekko.ControlMessages;

29

import org.apache.flink.runtime.rpc.pekko.exceptions.RpcInvalidStateException;

30

import org.apache.flink.runtime.rpc.pekko.exceptions.UnknownMessageException;

31

```

32

33

## Basic Usage

34

35

```java

36

import org.apache.flink.configuration.Configuration;

37

import org.apache.flink.runtime.rpc.pekko.PekkoRpcSystem;

38

import org.apache.flink.runtime.rpc.pekko.PekkoRpcService;

39

import org.apache.flink.runtime.rpc.RpcService;

40

41

// Create RPC system

42

PekkoRpcSystem rpcSystem = new PekkoRpcSystem();

43

44

// Create local RPC service

45

Configuration config = new Configuration();

46

RpcService rpcService = rpcSystem.localServiceBuilder(config).createAndStart();

47

48

// Connect to a remote RPC endpoint

49

String remoteAddress = "akka.tcp://flink@localhost:6123/user/jobmanager";

50

MyRpcGateway gateway = rpcService.connect(remoteAddress, MyRpcGateway.class).get();

51

52

// Use the gateway to make RPC calls

53

CompletableFuture<String> result = gateway.someRemoteMethod();

54

55

// Clean up

56

rpcService.closeAsync();

57

```

58

59

## Architecture

60

61

Flink RPC Akka is built around several key components:

62

63

- **RPC System**: The main entry point (`PekkoRpcSystem`) for creating RPC services

64

- **RPC Service**: Core service implementation (`PekkoRpcService`) managing connections and endpoints

65

- **Actor System Management**: Utilities for bootstrapping and configuring Pekko actor systems

66

- **Configuration**: Comprehensive configuration options for RPC behavior, timeouts, and security

67

- **Concurrent Utilities**: Adapters and utilities for integrating with Java concurrency APIs

68

- **Exception Handling**: Specialized exceptions for RPC-specific error conditions

69

70

## Capabilities

71

72

### RPC System Management

73

74

Core RPC system functionality for creating and configuring RPC services in both local and distributed environments.

75

76

```java { .api }

77

public class PekkoRpcSystem implements RpcSystem {

78

public RpcServiceBuilder localServiceBuilder(Configuration configuration);

79

public RpcServiceBuilder remoteServiceBuilder(

80

Configuration configuration,

81

String externalAddress,

82

String externalPortRange

83

);

84

public InetSocketAddress getInetSocketAddressFromRpcUrl(String url) throws Exception;

85

public String getRpcUrl(

86

String hostname,

87

int port,

88

String endpointName,

89

AddressResolution addressResolution,

90

Configuration config

91

) throws UnknownHostException;

92

public long getMaximumMessageSizeInBytes(Configuration config);

93

}

94

```

95

96

[RPC System Management](./rpc-system.md)

97

98

### Actor System Bootstrap

99

100

Tools and utilities for starting and configuring Pekko actor systems with proper thread pool configuration and SSL support.

101

102

```java { .api }

103

public class ActorSystemBootstrapTools {

104

public static ActorSystem startRemoteActorSystem(

105

Configuration configuration,

106

String externalAddress,

107

String externalPortRange,

108

Logger logger

109

) throws Exception;

110

111

public static ActorSystem startLocalActorSystem(

112

Configuration configuration,

113

String actorSystemName,

114

Logger logger,

115

Config actorSystemExecutorConfiguration,

116

Config customConfig

117

) throws Exception;

118

119

public static RpcSystem.ForkJoinExecutorConfiguration getForkJoinExecutorConfiguration(

120

Configuration configuration

121

);

122

}

123

```

124

125

[Actor System Management](./actor-system.md)

126

127

### Concurrent Utilities

128

129

Utilities for integrating Pekko actor systems with Java's concurrency APIs and converting between Scala and Java futures.

130

131

```java { .api }

132

public class ScalaFutureUtils {

133

public static <T, U extends T> CompletableFuture<T> toJava(Future<U> scalaFuture);

134

}

135

136

public class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor {

137

public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem, ClassLoader flinkClassLoader);

138

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

139

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

140

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

141

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

142

}

143

```

144

145

[Concurrent Utilities](./concurrent-utilities.md)

146

147

### RPC Configuration

148

149

Configuration management for RPC services including timeouts, message sizes, and serialization options.

150

151

```java { .api }

152

public class PekkoRpcServiceConfiguration {

153

public static PekkoRpcServiceConfiguration fromConfiguration(Configuration configuration);

154

public static PekkoRpcServiceConfiguration defaultConfiguration();

155

public Configuration getConfiguration();

156

public Duration getTimeout();

157

public long getMaximumFramesize();

158

public boolean captureAskCallStack();

159

public boolean isForceRpcInvocationSerialization();

160

}

161

```

162

163

[RPC Configuration](./rpc-configuration.md)

164

165

### Exception Handling

166

167

Specialized exception classes for RPC-specific error conditions and state management.

168

169

```java { .api }

170

public class RpcInvalidStateException extends FlinkRuntimeException {

171

public RpcInvalidStateException(String message);

172

public RpcInvalidStateException(Throwable cause);

173

public RpcInvalidStateException(String message, Throwable cause);

174

}

175

176

public class UnknownMessageException extends RpcRuntimeException {

177

public UnknownMessageException(String message);

178

public UnknownMessageException(String message, Throwable cause);

179

public UnknownMessageException(Throwable cause);

180

}

181

```

182

183

[Exception Handling](./exceptions.md)

184

185

## Types

186

187

```java { .api }

188

public interface PekkoBasedEndpoint extends RpcGateway {

189

ActorRef getActorRef();

190

}

191

192

public enum ControlMessages {

193

START, // Start processing incoming messages

194

STOP, // Stop processing messages and drop all newly incoming messages

195

TERMINATE // Terminate the RpcActor

196

}

197

198

public class HostAndPort {

199

// Host and port pair data structure for network addressing

200

}

201

202

public class RpcSerializedValue {

203

// Serialized value wrapper for RPC communication

204

}

205

206

public static class Protocol {

207

public static final Protocol TCP = new Protocol("tcp");

208

public static final Protocol SSL_TCP = new Protocol("ssl-tcp");

209

}

210

211

public enum ControlMessages {

212

START, // Start processing incoming messages

213

STOP, // Stop processing messages and drop all newly incoming messages

214

TERMINATE // Terminate the RpcActor

215

}

216

```