Pekko-based RPC implementation for Apache Flink's distributed computing framework
npx @tessl/cli install tessl/maven-org-apache-flink--flink-rpc-akka@2.1.0Flink RPC Akka provides a Pekko-based RPC (Remote Procedure Call) implementation for Apache Flink's distributed computing framework. It serves as a critical communication layer that enables different components of Flink clusters to communicate across network boundaries using the Pekko actor system (Apache's fork of Akka).
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-rpc-akka</artifactId>
<version>2.1.0</version>
</dependency>import org.apache.flink.runtime.rpc.pekko.PekkoRpcSystem;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcService;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceConfiguration;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils;
import org.apache.flink.runtime.rpc.pekko.PekkoUtils;
import org.apache.flink.runtime.rpc.pekko.ActorSystemBootstrapTools;
import org.apache.flink.runtime.rpc.pekko.ControlMessages;
import org.apache.flink.runtime.rpc.pekko.exceptions.RpcInvalidStateException;
import org.apache.flink.runtime.rpc.pekko.exceptions.UnknownMessageException;import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcSystem;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcService;
import org.apache.flink.runtime.rpc.RpcService;
// Create RPC system
PekkoRpcSystem rpcSystem = new PekkoRpcSystem();
// Create local RPC service
Configuration config = new Configuration();
RpcService rpcService = rpcSystem.localServiceBuilder(config).createAndStart();
// Connect to a remote RPC endpoint
String remoteAddress = "akka.tcp://flink@localhost:6123/user/jobmanager";
MyRpcGateway gateway = rpcService.connect(remoteAddress, MyRpcGateway.class).get();
// Use the gateway to make RPC calls
CompletableFuture<String> result = gateway.someRemoteMethod();
// Clean up
rpcService.closeAsync();Flink RPC Akka is built around several key components:
PekkoRpcSystem) for creating RPC servicesPekkoRpcService) managing connections and endpointsCore RPC system functionality for creating and configuring RPC services in both local and distributed environments.
public class PekkoRpcSystem implements RpcSystem {
public RpcServiceBuilder localServiceBuilder(Configuration configuration);
public RpcServiceBuilder remoteServiceBuilder(
Configuration configuration,
String externalAddress,
String externalPortRange
);
public InetSocketAddress getInetSocketAddressFromRpcUrl(String url) throws Exception;
public String getRpcUrl(
String hostname,
int port,
String endpointName,
AddressResolution addressResolution,
Configuration config
) throws UnknownHostException;
public long getMaximumMessageSizeInBytes(Configuration config);
}Tools and utilities for starting and configuring Pekko actor systems with proper thread pool configuration and SSL support.
public class ActorSystemBootstrapTools {
public static ActorSystem startRemoteActorSystem(
Configuration configuration,
String externalAddress,
String externalPortRange,
Logger logger
) throws Exception;
public static ActorSystem startLocalActorSystem(
Configuration configuration,
String actorSystemName,
Logger logger,
Config actorSystemExecutorConfiguration,
Config customConfig
) throws Exception;
public static RpcSystem.ForkJoinExecutorConfiguration getForkJoinExecutorConfiguration(
Configuration configuration
);
}Utilities for integrating Pekko actor systems with Java's concurrency APIs and converting between Scala and Java futures.
public class ScalaFutureUtils {
public static <T, U extends T> CompletableFuture<T> toJava(Future<U> scalaFuture);
}
public class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor {
public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem, ClassLoader flinkClassLoader);
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}Configuration management for RPC services including timeouts, message sizes, and serialization options.
public class PekkoRpcServiceConfiguration {
public static PekkoRpcServiceConfiguration fromConfiguration(Configuration configuration);
public static PekkoRpcServiceConfiguration defaultConfiguration();
public Configuration getConfiguration();
public Duration getTimeout();
public long getMaximumFramesize();
public boolean captureAskCallStack();
public boolean isForceRpcInvocationSerialization();
}Specialized exception classes for RPC-specific error conditions and state management.
public class RpcInvalidStateException extends FlinkRuntimeException {
public RpcInvalidStateException(String message);
public RpcInvalidStateException(Throwable cause);
public RpcInvalidStateException(String message, Throwable cause);
}
public class UnknownMessageException extends RpcRuntimeException {
public UnknownMessageException(String message);
public UnknownMessageException(String message, Throwable cause);
public UnknownMessageException(Throwable cause);
}public interface PekkoBasedEndpoint extends RpcGateway {
ActorRef getActorRef();
}
public enum ControlMessages {
START, // Start processing incoming messages
STOP, // Stop processing messages and drop all newly incoming messages
TERMINATE // Terminate the RpcActor
}
public class HostAndPort {
// Host and port pair data structure for network addressing
}
public class RpcSerializedValue {
// Serialized value wrapper for RPC communication
}
public static class Protocol {
public static final Protocol TCP = new Protocol("tcp");
public static final Protocol SSL_TCP = new Protocol("ssl-tcp");
}
public enum ControlMessages {
START, // Start processing incoming messages
STOP, // Stop processing messages and drop all newly incoming messages
TERMINATE // Terminate the RpcActor
}