CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-rpc-akka

Pekko-based RPC implementation for Apache Flink's distributed computing framework

Pending
Overview
Eval results
Files

actor-system.mddocs/

Actor System Management

Tools and utilities for starting, configuring, and managing Pekko actor systems with proper thread pool configuration, SSL support, and Flink-specific optimizations.

Capabilities

ActorSystemBootstrapTools

Provides factory methods for creating and configuring Pekko actor systems in various deployment scenarios.

/**
 * Tools for starting and configuring Pekko actor systems with Flink-specific settings.
 */
public class ActorSystemBootstrapTools {
    
    /**
     * Starts remote actor system with external address binding.
     * @param configuration Flink configuration object
     * @param externalAddress External address to bind to
     * @param externalPortRange Port range specification (e.g., "6123-6130")
     * @param logger Logger instance for bootstrap messages
     * @return Configured ActorSystem instance
     * @throws Exception if actor system creation fails
     */
    public static ActorSystem startRemoteActorSystem(
        Configuration configuration, 
        String externalAddress, 
        String externalPortRange, 
        Logger logger
    ) throws Exception;
    
    /**
     * Starts remote actor system with full configuration options.
     * @param configuration Flink configuration object
     * @param actorSystemName Name for the actor system
     * @param externalAddress External address to bind to
     * @param externalPortRange Port range specification
     * @param bindAddress Internal bind address (can be different from external)
     * @param bindPort Optional specific bind port
     * @param logger Logger instance
     * @param actorSystemExecutorConfiguration Executor configuration
     * @param customConfig Additional Pekko configuration
     * @return Configured ActorSystem instance
     * @throws Exception if actor system creation fails
     */
    public static ActorSystem startRemoteActorSystem(
        Configuration configuration, 
        String actorSystemName, 
        String externalAddress, 
        String externalPortRange, 
        String bindAddress, 
        Optional<Integer> bindPort, 
        Logger logger, 
        Config actorSystemExecutorConfiguration, 
        Config customConfig
    ) throws Exception;
    
    /**
     * Starts local actor system for single-node scenarios.
     * @param configuration Flink configuration object
     * @param actorSystemName Name for the actor system
     * @param logger Logger instance
     * @param actorSystemExecutorConfiguration Executor configuration
     * @param customConfig Additional Pekko configuration
     * @return Configured ActorSystem instance
     * @throws Exception if actor system creation fails
     */
    public static ActorSystem startLocalActorSystem(
        Configuration configuration, 
        String actorSystemName, 
        Logger logger, 
        Config actorSystemExecutorConfiguration, 
        Config customConfig
    ) throws Exception;
    
    /**
     * Gets fork-join executor configuration from Flink configuration.
     * @param configuration Flink configuration object
     * @return ForkJoinExecutorConfiguration for actor system
     */
    public static RpcSystem.ForkJoinExecutorConfiguration getForkJoinExecutorConfiguration(
        Configuration configuration
    );
    
    /**
     * Gets fork-join executor configuration optimized for remote communication.
     * @param configuration Flink configuration object
     * @return ForkJoinExecutorConfiguration for remote actor system
     */
    public static RpcSystem.ForkJoinExecutorConfiguration getRemoteForkJoinExecutorConfiguration(
        Configuration configuration
    );
}

Usage Examples:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rpc.pekko.ActorSystemBootstrapTools;
import org.apache.pekko.actor.ActorSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Logger logger = LoggerFactory.getLogger(MyClass.class);
Configuration config = new Configuration();

// Start local actor system for development
ActorSystem localSystem = ActorSystemBootstrapTools.startLocalActorSystem(
    config, 
    "flink-local", 
    logger, 
    null, // default executor config
    null  // default Pekko config
);

// Start remote actor system for cluster deployment
ActorSystem remoteSystem = ActorSystemBootstrapTools.startRemoteActorSystem(
    config, 
    "flink-cluster", 
    logger
);

// Start remote system with specific configuration
RpcSystem.ForkJoinExecutorConfiguration executorConfig = 
    ActorSystemBootstrapTools.getForkJoinExecutorConfiguration(config);
    
ActorSystem configuredSystem = ActorSystemBootstrapTools.startRemoteActorSystem(
    config, 
    "flink-production", 
    "192.168.1.100", 
    "6123-6130", 
    "0.0.0.0", 
    Optional.empty(), 
    logger, 
    executorConfig.toConfig(), 
    null
);

PekkoUtils

Comprehensive utility class for Pekko configuration, actor system management, and URL handling.

/**
 * Utility methods for Pekko actor system configuration and management.
 */
public class PekkoUtils {
    
    /**
     * Gets the standard Flink actor system name.
     * @return Standard actor system name used by Flink
     */
    public static String getFlinkActorSystemName();
    
    /**
     * Gets thread pool executor configuration from settings.
     * @param configuration Thread pool executor configuration
     * @return Pekko Config object for thread pool executor
     */
    public static Config getThreadPoolExecutorConfig(
        RpcSystem.FixedThreadPoolExecutorConfiguration configuration
    );
    
    /**
     * Gets fork-join executor configuration from settings.
     * @param configuration Fork-join executor configuration
     * @return Pekko Config object for fork-join executor
     */
    public static Config getForkJoinExecutorConfig(
        RpcSystem.ForkJoinExecutorConfiguration configuration
    );
    
    /**
     * Creates local actor system with Flink configuration.
     * @param configuration Flink configuration object
     * @return Local ActorSystem instance
     */
    public static ActorSystem createLocalActorSystem(Configuration configuration);
    
    /**
     * Creates actor system with custom name and configuration.
     * @param actorSystemName Name for the actor system
     * @param config Pekko configuration object
     * @return ActorSystem instance
     */
    public static ActorSystem createActorSystem(String actorSystemName, Config config);
    
    /**
     * Creates default actor system with standard settings.
     * @return Default ActorSystem instance
     */
    public static ActorSystem createDefaultActorSystem();
    
    /**
     * Gets Pekko configuration for external address binding.
     * @param configuration Flink configuration object
     * @param externalAddress External address to bind to
     * @return Pekko Config object
     */
    public static Config getConfig(Configuration configuration, HostAndPort externalAddress);
    
    /**
     * Gets Pekko configuration with separate bind address.
     * @param configuration Flink configuration object
     * @param externalAddress External address for external communication
     * @param bindAddress Internal bind address
     * @param executorConfig Executor configuration
     * @return Pekko Config object
     */
    public static Config getConfig(
        Configuration configuration, 
        HostAndPort externalAddress, 
        HostAndPort bindAddress, 
        Config executorConfig
    );
    
    /**
     * Gets address from running actor system.
     * @param system ActorSystem instance
     * @return Address object representing the system's address
     */
    public static Address getAddress(ActorSystem system);
    
    /**
     * Gets RPC URL for a specific actor.
     * @param system ActorSystem containing the actor
     * @param actor ActorRef to generate URL for
     * @return RPC URL string for the actor
     */
    public static String getRpcURL(ActorSystem system, ActorRef actor);
    
    /**
     * Extracts address from RPC URL string.
     * @param rpcURL RPC URL to parse  
     * @return Address object extracted from URL
     * @throws MalformedURLException if URL is malformed
     */
    public static Address getAddressFromRpcURL(String rpcURL) throws MalformedURLException;
    
    /**
     * Extracts InetSocketAddress from RPC URL string.
     * @param rpcURL RPC URL to parse
     * @return InetSocketAddress extracted from URL
     * @throws Exception if URL cannot be parsed
     */
    public static InetSocketAddress getInetSocketAddressFromRpcURL(String rpcURL) throws Exception;
    
    /**
     * Terminates actor system gracefully.
     * @param actorSystem ActorSystem to terminate
     * @return CompletableFuture indicating termination completion
     */
    public static CompletableFuture<Void> terminateActorSystem(ActorSystem actorSystem);
}

Advanced Configuration Examples:

import org.apache.flink.runtime.rpc.pekko.PekkoUtils;
import org.apache.flink.runtime.rpc.pekko.HostAndPort;
import org.apache.pekko.actor.Address;
import com.typesafe.config.Config;

// Create actor system with custom executor configuration
RpcSystem.ForkJoinExecutorConfiguration executorConfig = 
    new RpcSystem.ForkJoinExecutorConfiguration(8, 64, 2.0);
Config pekkoExecutorConfig = PekkoUtils.getForkJoinExecutorConfig(executorConfig);

ActorSystem customSystem = PekkoUtils.createActorSystem(
    "flink-custom", 
    pekkoExecutorConfig
);

// Configure for external access with separate bind address
HostAndPort externalAddress = new HostAndPort("public.example.com", 6123);
HostAndPort bindAddress = new HostAndPort("0.0.0.0", 6123);

Config clusterConfig = PekkoUtils.getConfig(
    flinkConfig, 
    externalAddress, 
    bindAddress, 
    pekkoExecutorConfig
);

// Extract network information from running systems
Address systemAddress = PekkoUtils.getAddress(customSystem);
String actorUrl = PekkoUtils.getRpcURL(customSystem, someActor);

// Parse URLs for connection information
InetSocketAddress socketAddress = PekkoUtils.getInetSocketAddressFromRpcURL(
    "pekko://flink@cluster-node:6123/user/jobmanager"
);

// Graceful shutdown
CompletableFuture<Void> termination = PekkoUtils.terminateActorSystem(customSystem);
termination.thenRun(() -> logger.info("Actor system terminated"));

RobustActorSystem

Enhanced ActorSystem implementation with configurable exception handling for production environments.

/**
 * ActorSystem with configurable UncaughtExceptionHandler for robust error handling.
 */
public abstract class RobustActorSystem extends ActorSystemImpl {
    
    /**
     * Constructor for RobustActorSystem.
     * @param name Name of the actor system
     * @param applicationConfig Application configuration
     * @param classLoader ClassLoader for the system
     * @param defaultExecutionContext Default execution context
     * @param setup ActorSystemSetup configuration
     */
    public RobustActorSystem(
        String name, 
        Config applicationConfig, 
        ClassLoader classLoader, 
        Option<ExecutionContext> defaultExecutionContext, 
        ActorSystemSetup setup
    );
    
    /**
     * Factory method to create RobustActorSystem.
     * @param name Name of the actor system
     * @param applicationConfig Application configuration
     * @return RobustActorSystem instance
     */
    public static RobustActorSystem create(String name, Config applicationConfig);
}

Support Classes

Additional classes that provide specialized functionality for actor system management.

/**
 * Actor for handling dead letters in the actor system.
 */
public class DeadLettersActor extends AbstractActor {
    /**
     * Gets Props for creating DeadLettersActor instances.
     * @return Props configuration for the actor
     */
    public static Props getProps();
}

/**
 * Supervisor actor for managing child actors with escalation strategy.
 */
public class SupervisorActor extends AbstractActor {
    // Supervisor implementation for actor lifecycle management
}

/**
 * Supervisor strategy that escalates all exceptions to parent actors.
 */
public class EscalatingSupervisorStrategy implements SupervisorStrategyConfigurator {
    // Strategy implementation for exception handling
}

/**
 * Custom SSL engine provider for secure Pekko communication.
 */
public class CustomSSLEngineProvider extends ConfigSSLEngineProvider {
    // SSL engine configuration for secure RPC communication
}

/**
 * Thread factory that sets thread priority for actor system threads.
 */
public class PrioritySettingThreadFactory implements ThreadFactory {
    // Thread factory for priority-based thread management
}

/**
 * Dispatcher configurator for priority-based thread scheduling.
 */
public class PriorityThreadsDispatcher extends DispatcherConfigurator {
    // Dispatcher configuration for priority threads
}

/**
 * Pekko extension for remote address handling and resolution.
 */
public class RemoteAddressExtension extends AbstractExtension {
    // Extension for remote address management
}

Production Configuration Example:

import org.apache.flink.runtime.rpc.pekko.RobustActorSystem;
import org.apache.flink.runtime.rpc.pekko.CustomSSLEngineProvider;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.Config;

// Create production-ready actor system with SSL and robust error handling
Config productionConfig = ConfigFactory.parseString("""
    pekko {
        remote.artery {
            transport = tls-tcp
            canonical.hostname = "production-node.example.com"
            canonical.port = 6123
        }
        
        actor {
            provider = remote
            serialization-bindings {
                "java.io.Serializable" = java
            }
        }
        
        ssl-config {
            trustManager = "org.apache.flink.runtime.rpc.pekko.CustomSSLEngineProvider"
        }
    }
""");

RobustActorSystem productionSystem = RobustActorSystem.create(
    "flink-production", 
    productionConfig
);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-rpc-akka

docs

actor-system.md

concurrent-utilities.md

exceptions.md

index.md

rpc-configuration.md

rpc-system.md

tile.json