CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-rpc-akka

Pekko-based RPC implementation for Apache Flink's distributed computing framework

Pending
Overview
Eval results
Files

rpc-system.mddocs/

RPC System Management

Core RPC system functionality for creating and configuring RPC services in both local and distributed environments. The RPC system provides the foundation for all distributed communication in Flink clusters.

Capabilities

PekkoRpcSystem

Main RPC system implementation that serves as the primary entry point for creating RPC services.

/**
 * RpcSystem implementation based on Pekko actor system.
 * Provides methods to create local and remote RPC services.
 */
public class PekkoRpcSystem implements RpcSystem {
    
    /**
     * Creates a builder for local RPC service configuration.
     * @param configuration Flink configuration object
     * @return RpcServiceBuilder for local service setup
     */
    public RpcServiceBuilder localServiceBuilder(Configuration configuration);
    
    /**
     * Creates a builder for remote RPC service configuration.
     * @param configuration Flink configuration object
     * @param externalAddress External address for the RPC service (nullable)
     * @param externalPortRange External port range specification
     * @return RpcServiceBuilder for remote service setup
     */
    public RpcServiceBuilder remoteServiceBuilder(
        Configuration configuration, 
        String externalAddress, 
        String externalPortRange
    );
    
    /**
     * Extracts socket address from RPC URL.
     * @param url RPC URL string
     * @return InetSocketAddress extracted from the URL
     * @throws Exception if URL is malformed or cannot be parsed
     */
    public InetSocketAddress getInetSocketAddressFromRpcUrl(String url) throws Exception;
    
    /**
     * Constructs RPC URL from components.
     * @param hostname Target hostname
     * @param port Target port number
     * @param endpointName Name of the RPC endpoint
     * @param addressResolution Address resolution strategy
     * @param config Flink configuration
     * @return Constructed RPC URL string
     * @throws UnknownHostException if hostname cannot be resolved
     */
    public String getRpcUrl(
        String hostname, 
        int port, 
        String endpointName, 
        AddressResolution addressResolution, 
        Configuration config
    ) throws UnknownHostException;
    
    /**
     * Gets maximum message size from configuration.
     * @param config Flink configuration object
     * @return Maximum message size in bytes
     */
    public long getMaximumMessageSizeInBytes(Configuration config);
}

Usage Examples:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcSystem;
import org.apache.flink.runtime.rpc.RpcService;

// Create RPC system
PekkoRpcSystem rpcSystem = new PekkoRpcSystem();

// Create local RPC service for single-node scenarios
Configuration localConfig = new Configuration();
RpcService localService = rpcSystem.localServiceBuilder(localConfig).createAndStart();

// Create remote RPC service for distributed clusters
Configuration remoteConfig = new Configuration();
String externalAddress = "192.168.1.100";
String portRange = "6123-6130";
RpcService remoteService = rpcSystem.remoteServiceBuilder(
    remoteConfig, externalAddress, portRange
).createAndStart();

// Parse RPC URL to get socket address
String rpcUrl = "pekko://flink@192.168.1.100:6123/user/jobmanager";
InetSocketAddress address = rpcSystem.getInetSocketAddressFromRpcUrl(rpcUrl);

PekkoRpcService

Core Pekko-based RPC service implementation that manages connections, endpoints, and the underlying actor system.

/**
 * Pekko-based RPC service implementation.
 * Manages RPC endpoints, connections, and the underlying actor system.
 */
public class PekkoRpcService implements RpcService {
    
    /**
     * Constructor for PekkoRpcService.
     * @param actorSystem Underlying Pekko actor system
     * @param configuration Service configuration parameters
     */
    public PekkoRpcService(ActorSystem actorSystem, PekkoRpcServiceConfiguration configuration);
    
    /**
     * Returns the underlying Pekko actor system.
     * @return ActorSystem instance used by this service
     */
    public ActorSystem getActorSystem();
    
    /**
     * Gets the address of this RPC service.
     * @return Service address as string
     */
    public String getAddress();
    
    /**
     * Gets the port number of this RPC service.
     * @return Port number
     */
    public int getPort();
    
    /**
     * Creates a self gateway for the given RPC server.
     * @param selfGatewayType Type of the gateway interface
     * @param rpcServer RPC server instance
     * @return Gateway instance for self-communication
     */
    public <C extends RpcGateway> C getSelfGateway(Class<C> selfGatewayType, RpcServer rpcServer);
    
    /**
     * Connects to a remote RPC endpoint.
     * @param address Remote endpoint address
     * @param clazz Gateway interface class
     * @return CompletableFuture containing the connected gateway
     */
    public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz);
    
    /**
     * Connects to a fenced remote RPC endpoint.
     * @param address Remote endpoint address
     * @param fencingToken Token for fenced communication
     * @param clazz Gateway interface class
     * @return CompletableFuture containing the connected fenced gateway
     */
    public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(
        String address, F fencingToken, Class<C> clazz
    );
    
    /**
     * Starts an RPC server for the given endpoint.
     * @param rpcEndpoint RPC endpoint implementation
     * @param loggingContext Logging context for the server
     * @return RpcServer instance managing the endpoint
     */
    public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(
        C rpcEndpoint, Map<String, String> loggingContext
    );
    
    /**
     * Stops the specified RPC server.
     * @param selfGateway RPC server to stop
     */
    public void stopServer(RpcServer selfGateway);
    
    /**
     * Closes the RPC service asynchronously.
     * @return CompletableFuture indicating completion
     */
    public CompletableFuture<Void> closeAsync();
    
    /**
     * Gets the scheduled executor for this service.
     * @return ScheduledExecutor instance
     */
    public ScheduledExecutor getScheduledExecutor();
}

PekkoRpcServiceUtils

Utility class providing helper methods for RPC URL construction and service configuration.

/**
 * Utility methods for RPC service operations and URL handling.
 */
public class PekkoRpcServiceUtils {
    
    /**
     * Constructs RPC URL from components.
     * @param hostname Target hostname
     * @param port Target port number
     * @param endpointName Name of the RPC endpoint
     * @param addressResolution Address resolution strategy
     * @param config Flink configuration
     * @return Constructed RPC URL string
     * @throws UnknownHostException if hostname cannot be resolved
     */
    public static String getRpcUrl(
        String hostname, 
        int port, 
        String endpointName, 
        AddressResolution addressResolution, 
        Configuration config
    ) throws UnknownHostException;
    
    /**
     * Constructs RPC URL with explicit protocol specification.
     * @param hostname Target hostname
     * @param port Target port number
     * @param endpointName Name of the RPC endpoint
     * @param addressResolution Address resolution strategy
     * @param protocol Communication protocol (TCP, SSL_TCP)
     * @return Constructed RPC URL string
     * @throws UnknownHostException if hostname cannot be resolved
     */
    public static String getRpcUrl(
        String hostname, 
        int port, 
        String endpointName, 
        AddressResolution addressResolution, 
        Protocol protocol
    ) throws UnknownHostException;
    
    /**
     * Gets local RPC URL for the specified endpoint.
     * @param endpointName Name of the endpoint
     * @return Local RPC URL string
     */
    public static String getLocalRpcUrl(String endpointName);
    
    /**
     * Checks if exception indicates recipient termination.
     * @param exception Exception to check
     * @return true if exception is recipient terminated
     */
    public static boolean isRecipientTerminatedException(Throwable exception);
    
    /**
     * Extracts maximum frame size from configuration.
     * @param configuration Flink configuration object
     * @return Maximum frame size in bytes
     */
    public static long extractMaximumFramesize(Configuration configuration);
}

/**
 * Communication protocol enumeration for RPC connections.
 */
public enum Protocol {
    TCP,     // Standard TCP communication
    SSL_TCP  // SSL-encrypted TCP communication
}

Advanced Usage Examples:

import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils;
import org.apache.flink.runtime.rpc.AddressResolution;

// Create RPC URLs for different scenarios
String localUrl = PekkoRpcServiceUtils.getLocalRpcUrl("taskmanager");

String remoteUrl = PekkoRpcServiceUtils.getRpcUrl(
    "cluster-node-1", 
    6123, 
    "jobmanager", 
    AddressResolution.TRY_ADDRESS_RESOLUTION, 
    config
);

String secureUrl = PekkoRpcServiceUtils.getRpcUrl(
    "secure-cluster-node", 
    6124, 
    "jobmanager", 
    AddressResolution.TRY_ADDRESS_RESOLUTION, 
    PekkoRpcServiceUtils.Protocol.SSL_TCP
);

// Check for connection issues
try {
    // ... RPC call
} catch (Exception e) {
    if (PekkoRpcServiceUtils.isRecipientTerminatedException(e)) {
        // Handle recipient termination
        logger.warn("RPC recipient has terminated");
    }
}

// Configure service with custom settings
PekkoRpcServiceConfiguration config = PekkoRpcServiceConfiguration
    .fromConfiguration(flinkConfig)
    .withTimeout(Duration.ofSeconds(30))
    .withMaximumFramesize(16 * 1024 * 1024); // 16MB

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-rpc-akka

docs

actor-system.md

concurrent-utilities.md

exceptions.md

index.md

rpc-configuration.md

rpc-system.md

tile.json