Pekko-based RPC implementation for Apache Flink's distributed computing framework
npx @tessl/cli install tessl/maven-org-apache-flink--flink-rpc-akka@2.1.00
# Flink RPC Akka
1
2
Flink RPC Akka provides a Pekko-based RPC (Remote Procedure Call) implementation for Apache Flink's distributed computing framework. It serves as a critical communication layer that enables different components of Flink clusters to communicate across network boundaries using the Pekko actor system (Apache's fork of Akka).
3
4
## Package Information
5
6
- **Package Name**: flink-rpc-akka
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add dependency to your Maven `pom.xml`:
10
11
```xml
12
<dependency>
13
<groupId>org.apache.flink</groupId>
14
<artifactId>flink-rpc-akka</artifactId>
15
<version>2.1.0</version>
16
</dependency>
17
```
18
19
## Core Imports
20
21
```java
22
import org.apache.flink.runtime.rpc.pekko.PekkoRpcSystem;
23
import org.apache.flink.runtime.rpc.pekko.PekkoRpcService;
24
import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceConfiguration;
25
import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils;
26
import org.apache.flink.runtime.rpc.pekko.PekkoUtils;
27
import org.apache.flink.runtime.rpc.pekko.ActorSystemBootstrapTools;
28
import org.apache.flink.runtime.rpc.pekko.ControlMessages;
29
import org.apache.flink.runtime.rpc.pekko.exceptions.RpcInvalidStateException;
30
import org.apache.flink.runtime.rpc.pekko.exceptions.UnknownMessageException;
31
```
32
33
## Basic Usage
34
35
```java
36
import org.apache.flink.configuration.Configuration;
37
import org.apache.flink.runtime.rpc.pekko.PekkoRpcSystem;
38
import org.apache.flink.runtime.rpc.pekko.PekkoRpcService;
39
import org.apache.flink.runtime.rpc.RpcService;
40
41
// Create RPC system
42
PekkoRpcSystem rpcSystem = new PekkoRpcSystem();
43
44
// Create local RPC service
45
Configuration config = new Configuration();
46
RpcService rpcService = rpcSystem.localServiceBuilder(config).createAndStart();
47
48
// Connect to a remote RPC endpoint
49
String remoteAddress = "akka.tcp://flink@localhost:6123/user/jobmanager";
50
MyRpcGateway gateway = rpcService.connect(remoteAddress, MyRpcGateway.class).get();
51
52
// Use the gateway to make RPC calls
53
CompletableFuture<String> result = gateway.someRemoteMethod();
54
55
// Clean up
56
rpcService.closeAsync();
57
```
58
59
## Architecture
60
61
Flink RPC Akka is built around several key components:
62
63
- **RPC System**: The main entry point (`PekkoRpcSystem`) for creating RPC services
64
- **RPC Service**: Core service implementation (`PekkoRpcService`) managing connections and endpoints
65
- **Actor System Management**: Utilities for bootstrapping and configuring Pekko actor systems
66
- **Configuration**: Comprehensive configuration options for RPC behavior, timeouts, and security
67
- **Concurrent Utilities**: Adapters and utilities for integrating with Java concurrency APIs
68
- **Exception Handling**: Specialized exceptions for RPC-specific error conditions
69
70
## Capabilities
71
72
### RPC System Management
73
74
Core RPC system functionality for creating and configuring RPC services in both local and distributed environments.
75
76
```java { .api }
77
public class PekkoRpcSystem implements RpcSystem {
78
public RpcServiceBuilder localServiceBuilder(Configuration configuration);
79
public RpcServiceBuilder remoteServiceBuilder(
80
Configuration configuration,
81
String externalAddress,
82
String externalPortRange
83
);
84
public InetSocketAddress getInetSocketAddressFromRpcUrl(String url) throws Exception;
85
public String getRpcUrl(
86
String hostname,
87
int port,
88
String endpointName,
89
AddressResolution addressResolution,
90
Configuration config
91
) throws UnknownHostException;
92
public long getMaximumMessageSizeInBytes(Configuration config);
93
}
94
```
95
96
[RPC System Management](./rpc-system.md)
97
98
### Actor System Bootstrap
99
100
Tools and utilities for starting and configuring Pekko actor systems with proper thread pool configuration and SSL support.
101
102
```java { .api }
103
public class ActorSystemBootstrapTools {
104
public static ActorSystem startRemoteActorSystem(
105
Configuration configuration,
106
String externalAddress,
107
String externalPortRange,
108
Logger logger
109
) throws Exception;
110
111
public static ActorSystem startLocalActorSystem(
112
Configuration configuration,
113
String actorSystemName,
114
Logger logger,
115
Config actorSystemExecutorConfiguration,
116
Config customConfig
117
) throws Exception;
118
119
public static RpcSystem.ForkJoinExecutorConfiguration getForkJoinExecutorConfiguration(
120
Configuration configuration
121
);
122
}
123
```
124
125
[Actor System Management](./actor-system.md)
126
127
### Concurrent Utilities
128
129
Utilities for integrating Pekko actor systems with Java's concurrency APIs and converting between Scala and Java futures.
130
131
```java { .api }
132
public class ScalaFutureUtils {
133
public static <T, U extends T> CompletableFuture<T> toJava(Future<U> scalaFuture);
134
}
135
136
public class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor {
137
public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem, ClassLoader flinkClassLoader);
138
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
139
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
140
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
141
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
142
}
143
```
144
145
[Concurrent Utilities](./concurrent-utilities.md)
146
147
### RPC Configuration
148
149
Configuration management for RPC services including timeouts, message sizes, and serialization options.
150
151
```java { .api }
152
public class PekkoRpcServiceConfiguration {
153
public static PekkoRpcServiceConfiguration fromConfiguration(Configuration configuration);
154
public static PekkoRpcServiceConfiguration defaultConfiguration();
155
public Configuration getConfiguration();
156
public Duration getTimeout();
157
public long getMaximumFramesize();
158
public boolean captureAskCallStack();
159
public boolean isForceRpcInvocationSerialization();
160
}
161
```
162
163
[RPC Configuration](./rpc-configuration.md)
164
165
### Exception Handling
166
167
Specialized exception classes for RPC-specific error conditions and state management.
168
169
```java { .api }
170
public class RpcInvalidStateException extends FlinkRuntimeException {
171
public RpcInvalidStateException(String message);
172
public RpcInvalidStateException(Throwable cause);
173
public RpcInvalidStateException(String message, Throwable cause);
174
}
175
176
public class UnknownMessageException extends RpcRuntimeException {
177
public UnknownMessageException(String message);
178
public UnknownMessageException(String message, Throwable cause);
179
public UnknownMessageException(Throwable cause);
180
}
181
```
182
183
[Exception Handling](./exceptions.md)
184
185
## Types
186
187
```java { .api }
188
public interface PekkoBasedEndpoint extends RpcGateway {
189
ActorRef getActorRef();
190
}
191
192
public enum ControlMessages {
193
START, // Start processing incoming messages
194
STOP, // Stop processing messages and drop all newly incoming messages
195
TERMINATE // Terminate the RpcActor
196
}
197
198
public class HostAndPort {
199
// Host and port pair data structure for network addressing
200
}
201
202
public class RpcSerializedValue {
203
// Serialized value wrapper for RPC communication
204
}
205
206
public static class Protocol {
207
public static final Protocol TCP = new Protocol("tcp");
208
public static final Protocol SSL_TCP = new Protocol("ssl-tcp");
209
}
210
211
public enum ControlMessages {
212
START, // Start processing incoming messages
213
STOP, // Stop processing messages and drop all newly incoming messages
214
TERMINATE // Terminate the RpcActor
215
}
216
```