Pekko-based RPC implementation for Apache Flink's distributed computing framework
—
Tools and utilities for starting, configuring, and managing Pekko actor systems with proper thread pool configuration, SSL support, and Flink-specific optimizations.
Provides factory methods for creating and configuring Pekko actor systems in various deployment scenarios.
/**
* Tools for starting and configuring Pekko actor systems with Flink-specific settings.
*/
public class ActorSystemBootstrapTools {
/**
* Starts remote actor system with external address binding.
* @param configuration Flink configuration object
* @param externalAddress External address to bind to
* @param externalPortRange Port range specification (e.g., "6123-6130")
* @param logger Logger instance for bootstrap messages
* @return Configured ActorSystem instance
* @throws Exception if actor system creation fails
*/
public static ActorSystem startRemoteActorSystem(
Configuration configuration,
String externalAddress,
String externalPortRange,
Logger logger
) throws Exception;
/**
* Starts remote actor system with full configuration options.
* @param configuration Flink configuration object
* @param actorSystemName Name for the actor system
* @param externalAddress External address to bind to
* @param externalPortRange Port range specification
* @param bindAddress Internal bind address (can be different from external)
* @param bindPort Optional specific bind port
* @param logger Logger instance
* @param actorSystemExecutorConfiguration Executor configuration
* @param customConfig Additional Pekko configuration
* @return Configured ActorSystem instance
* @throws Exception if actor system creation fails
*/
public static ActorSystem startRemoteActorSystem(
Configuration configuration,
String actorSystemName,
String externalAddress,
String externalPortRange,
String bindAddress,
Optional<Integer> bindPort,
Logger logger,
Config actorSystemExecutorConfiguration,
Config customConfig
) throws Exception;
/**
* Starts local actor system for single-node scenarios.
* @param configuration Flink configuration object
* @param actorSystemName Name for the actor system
* @param logger Logger instance
* @param actorSystemExecutorConfiguration Executor configuration
* @param customConfig Additional Pekko configuration
* @return Configured ActorSystem instance
* @throws Exception if actor system creation fails
*/
public static ActorSystem startLocalActorSystem(
Configuration configuration,
String actorSystemName,
Logger logger,
Config actorSystemExecutorConfiguration,
Config customConfig
) throws Exception;
/**
* Gets fork-join executor configuration from Flink configuration.
* @param configuration Flink configuration object
* @return ForkJoinExecutorConfiguration for actor system
*/
public static RpcSystem.ForkJoinExecutorConfiguration getForkJoinExecutorConfiguration(
Configuration configuration
);
/**
* Gets fork-join executor configuration optimized for remote communication.
* @param configuration Flink configuration object
* @return ForkJoinExecutorConfiguration for remote actor system
*/
public static RpcSystem.ForkJoinExecutorConfiguration getRemoteForkJoinExecutorConfiguration(
Configuration configuration
);
}Usage Examples:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rpc.pekko.ActorSystemBootstrapTools;
import org.apache.pekko.actor.ActorSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Logger logger = LoggerFactory.getLogger(MyClass.class);
Configuration config = new Configuration();
// Start local actor system for development
ActorSystem localSystem = ActorSystemBootstrapTools.startLocalActorSystem(
config,
"flink-local",
logger,
null, // default executor config
null // default Pekko config
);
// Start remote actor system for cluster deployment
ActorSystem remoteSystem = ActorSystemBootstrapTools.startRemoteActorSystem(
config,
"flink-cluster",
logger
);
// Start remote system with specific configuration
RpcSystem.ForkJoinExecutorConfiguration executorConfig =
ActorSystemBootstrapTools.getForkJoinExecutorConfiguration(config);
ActorSystem configuredSystem = ActorSystemBootstrapTools.startRemoteActorSystem(
config,
"flink-production",
"192.168.1.100",
"6123-6130",
"0.0.0.0",
Optional.empty(),
logger,
executorConfig.toConfig(),
null
);Comprehensive utility class for Pekko configuration, actor system management, and URL handling.
/**
* Utility methods for Pekko actor system configuration and management.
*/
public class PekkoUtils {
/**
* Gets the standard Flink actor system name.
* @return Standard actor system name used by Flink
*/
public static String getFlinkActorSystemName();
/**
* Gets thread pool executor configuration from settings.
* @param configuration Thread pool executor configuration
* @return Pekko Config object for thread pool executor
*/
public static Config getThreadPoolExecutorConfig(
RpcSystem.FixedThreadPoolExecutorConfiguration configuration
);
/**
* Gets fork-join executor configuration from settings.
* @param configuration Fork-join executor configuration
* @return Pekko Config object for fork-join executor
*/
public static Config getForkJoinExecutorConfig(
RpcSystem.ForkJoinExecutorConfiguration configuration
);
/**
* Creates local actor system with Flink configuration.
* @param configuration Flink configuration object
* @return Local ActorSystem instance
*/
public static ActorSystem createLocalActorSystem(Configuration configuration);
/**
* Creates actor system with custom name and configuration.
* @param actorSystemName Name for the actor system
* @param config Pekko configuration object
* @return ActorSystem instance
*/
public static ActorSystem createActorSystem(String actorSystemName, Config config);
/**
* Creates default actor system with standard settings.
* @return Default ActorSystem instance
*/
public static ActorSystem createDefaultActorSystem();
/**
* Gets Pekko configuration for external address binding.
* @param configuration Flink configuration object
* @param externalAddress External address to bind to
* @return Pekko Config object
*/
public static Config getConfig(Configuration configuration, HostAndPort externalAddress);
/**
* Gets Pekko configuration with separate bind address.
* @param configuration Flink configuration object
* @param externalAddress External address for external communication
* @param bindAddress Internal bind address
* @param executorConfig Executor configuration
* @return Pekko Config object
*/
public static Config getConfig(
Configuration configuration,
HostAndPort externalAddress,
HostAndPort bindAddress,
Config executorConfig
);
/**
* Gets address from running actor system.
* @param system ActorSystem instance
* @return Address object representing the system's address
*/
public static Address getAddress(ActorSystem system);
/**
* Gets RPC URL for a specific actor.
* @param system ActorSystem containing the actor
* @param actor ActorRef to generate URL for
* @return RPC URL string for the actor
*/
public static String getRpcURL(ActorSystem system, ActorRef actor);
/**
* Extracts address from RPC URL string.
* @param rpcURL RPC URL to parse
* @return Address object extracted from URL
* @throws MalformedURLException if URL is malformed
*/
public static Address getAddressFromRpcURL(String rpcURL) throws MalformedURLException;
/**
* Extracts InetSocketAddress from RPC URL string.
* @param rpcURL RPC URL to parse
* @return InetSocketAddress extracted from URL
* @throws Exception if URL cannot be parsed
*/
public static InetSocketAddress getInetSocketAddressFromRpcURL(String rpcURL) throws Exception;
/**
* Terminates actor system gracefully.
* @param actorSystem ActorSystem to terminate
* @return CompletableFuture indicating termination completion
*/
public static CompletableFuture<Void> terminateActorSystem(ActorSystem actorSystem);
}Advanced Configuration Examples:
import org.apache.flink.runtime.rpc.pekko.PekkoUtils;
import org.apache.flink.runtime.rpc.pekko.HostAndPort;
import org.apache.pekko.actor.Address;
import com.typesafe.config.Config;
// Create actor system with custom executor configuration
RpcSystem.ForkJoinExecutorConfiguration executorConfig =
new RpcSystem.ForkJoinExecutorConfiguration(8, 64, 2.0);
Config pekkoExecutorConfig = PekkoUtils.getForkJoinExecutorConfig(executorConfig);
ActorSystem customSystem = PekkoUtils.createActorSystem(
"flink-custom",
pekkoExecutorConfig
);
// Configure for external access with separate bind address
HostAndPort externalAddress = new HostAndPort("public.example.com", 6123);
HostAndPort bindAddress = new HostAndPort("0.0.0.0", 6123);
Config clusterConfig = PekkoUtils.getConfig(
flinkConfig,
externalAddress,
bindAddress,
pekkoExecutorConfig
);
// Extract network information from running systems
Address systemAddress = PekkoUtils.getAddress(customSystem);
String actorUrl = PekkoUtils.getRpcURL(customSystem, someActor);
// Parse URLs for connection information
InetSocketAddress socketAddress = PekkoUtils.getInetSocketAddressFromRpcURL(
"pekko://flink@cluster-node:6123/user/jobmanager"
);
// Graceful shutdown
CompletableFuture<Void> termination = PekkoUtils.terminateActorSystem(customSystem);
termination.thenRun(() -> logger.info("Actor system terminated"));Enhanced ActorSystem implementation with configurable exception handling for production environments.
/**
* ActorSystem with configurable UncaughtExceptionHandler for robust error handling.
*/
public abstract class RobustActorSystem extends ActorSystemImpl {
/**
* Constructor for RobustActorSystem.
* @param name Name of the actor system
* @param applicationConfig Application configuration
* @param classLoader ClassLoader for the system
* @param defaultExecutionContext Default execution context
* @param setup ActorSystemSetup configuration
*/
public RobustActorSystem(
String name,
Config applicationConfig,
ClassLoader classLoader,
Option<ExecutionContext> defaultExecutionContext,
ActorSystemSetup setup
);
/**
* Factory method to create RobustActorSystem.
* @param name Name of the actor system
* @param applicationConfig Application configuration
* @return RobustActorSystem instance
*/
public static RobustActorSystem create(String name, Config applicationConfig);
}Additional classes that provide specialized functionality for actor system management.
/**
* Actor for handling dead letters in the actor system.
*/
public class DeadLettersActor extends AbstractActor {
/**
* Gets Props for creating DeadLettersActor instances.
* @return Props configuration for the actor
*/
public static Props getProps();
}
/**
* Supervisor actor for managing child actors with escalation strategy.
*/
public class SupervisorActor extends AbstractActor {
// Supervisor implementation for actor lifecycle management
}
/**
* Supervisor strategy that escalates all exceptions to parent actors.
*/
public class EscalatingSupervisorStrategy implements SupervisorStrategyConfigurator {
// Strategy implementation for exception handling
}
/**
* Custom SSL engine provider for secure Pekko communication.
*/
public class CustomSSLEngineProvider extends ConfigSSLEngineProvider {
// SSL engine configuration for secure RPC communication
}
/**
* Thread factory that sets thread priority for actor system threads.
*/
public class PrioritySettingThreadFactory implements ThreadFactory {
// Thread factory for priority-based thread management
}
/**
* Dispatcher configurator for priority-based thread scheduling.
*/
public class PriorityThreadsDispatcher extends DispatcherConfigurator {
// Dispatcher configuration for priority threads
}
/**
* Pekko extension for remote address handling and resolution.
*/
public class RemoteAddressExtension extends AbstractExtension {
// Extension for remote address management
}Production Configuration Example:
import org.apache.flink.runtime.rpc.pekko.RobustActorSystem;
import org.apache.flink.runtime.rpc.pekko.CustomSSLEngineProvider;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.Config;
// Create production-ready actor system with SSL and robust error handling
Config productionConfig = ConfigFactory.parseString("""
pekko {
remote.artery {
transport = tls-tcp
canonical.hostname = "production-node.example.com"
canonical.port = 6123
}
actor {
provider = remote
serialization-bindings {
"java.io.Serializable" = java
}
}
ssl-config {
trustManager = "org.apache.flink.runtime.rpc.pekko.CustomSSLEngineProvider"
}
}
""");
RobustActorSystem productionSystem = RobustActorSystem.create(
"flink-production",
productionConfig
);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-rpc-akka