Pekko-based RPC implementation for Apache Flink's distributed computing framework
—
Configuration management for Pekko RPC services, providing comprehensive settings for timeouts, message sizes, serialization options, and debugging features.
Main configuration class that encapsulates all RPC service settings and provides factory methods for creating configurations from Flink configuration objects.
/**
* Configuration for PekkoRpcService with settings for timeouts, message sizes, and debugging options.
*/
public class PekkoRpcServiceConfiguration {
/**
* Creates configuration from a Flink Configuration object.
* Extracts RPC-specific settings including timeouts, frame sizes, and debugging options.
*
* @param configuration Flink configuration object containing RPC settings
* @return PekkoRpcServiceConfiguration with settings extracted from configuration
*/
public static PekkoRpcServiceConfiguration fromConfiguration(Configuration configuration);
/**
* Creates default configuration with standard RPC settings.
* Uses default timeout values, frame sizes, and debugging options.
*
* @return PekkoRpcServiceConfiguration with default settings
*/
public static PekkoRpcServiceConfiguration defaultConfiguration();
/**
* Gets the underlying Flink configuration object.
*
* @return Configuration object containing all Flink settings
*/
public Configuration getConfiguration();
/**
* Gets the RPC ask timeout duration.
* This timeout applies to RPC calls that expect a response.
*
* @return Duration representing the ask timeout
*/
public Duration getTimeout();
/**
* Gets the maximum frame size for RPC messages.
* Messages larger than this size will be rejected.
*
* @return Maximum frame size in bytes
*/
public long getMaximumFramesize();
/**
* Checks if ask call stack capture is enabled.
* When enabled, provides better debugging information for RPC failures.
*
* @return true if ask call stack capture is enabled
*/
public boolean captureAskCallStack();
/**
* Checks if RPC invocation serialization is forced.
* When enabled, all RPC invocations are serialized even for local calls.
*
* @return true if RPC invocation serialization is forced
*/
public boolean isForceRpcInvocationSerialization();
}Usage Examples:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceConfiguration;
import java.time.Duration;
// Create configuration from Flink Configuration
Configuration flinkConfig = new Configuration();
flinkConfig.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(30));
flinkConfig.set(RpcOptions.FRAMESIZE, "10MB");
flinkConfig.set(RpcOptions.CAPTURE_ASK_CALLSTACK, true);
PekkoRpcServiceConfiguration rpcConfig =
PekkoRpcServiceConfiguration.fromConfiguration(flinkConfig);
// Use default configuration for simple scenarios
PekkoRpcServiceConfiguration defaultConfig =
PekkoRpcServiceConfiguration.defaultConfiguration();
// Access configuration properties
Duration askTimeout = rpcConfig.getTimeout();
long maxFrameSize = rpcConfig.getMaximumFramesize();
boolean debugMode = rpcConfig.captureAskCallStack();
boolean forceSerialization = rpcConfig.isForceRpcInvocationSerialization();
// Log configuration details
System.out.println("RPC ask timeout: " + askTimeout);
System.out.println("Max frame size: " + maxFrameSize + " bytes");
System.out.println("Debug mode enabled: " + debugMode);
System.out.println("Force serialization: " + forceSerialization);
// Pass configuration to RPC service creation
// (This would typically be done internally by PekkoRpcServiceUtils)The configuration system supports various RPC-related options from RpcOptions:
// Timeout settings
RpcOptions.ASK_TIMEOUT_DURATION // Duration for RPC ask operations
RpcOptions.LOOKUP_TIMEOUT_DURATION // Duration for RPC endpoint lookups
// Message size limits
RpcOptions.FRAMESIZE // Maximum message frame size
// Debugging and development
RpcOptions.CAPTURE_ASK_CALLSTACK // Enable call stack capture for debugging
// Serialization behavior
RpcOptions.FORCE_RPC_INVOCATION_SERIALIZATION // Force serialization of all RPC callsConfiguration Examples:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RpcOptions;
// Configure for production environment
Configuration prodConfig = new Configuration();
prodConfig.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(5));
prodConfig.set(RpcOptions.FRAMESIZE, "50MB");
prodConfig.set(RpcOptions.CAPTURE_ASK_CALLSTACK, false); // Disable for performance
// Configure for development environment
Configuration devConfig = new Configuration();
devConfig.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(10));
devConfig.set(RpcOptions.FRAMESIZE, "10MB");
devConfig.set(RpcOptions.CAPTURE_ASK_CALLSTACK, true); // Enable for debugging
devConfig.set(RpcOptions.FORCE_RPC_INVOCATION_SERIALIZATION, true); // Test serialization
// Configure for testing environment
Configuration testConfig = new Configuration();
testConfig.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(1));
testConfig.set(RpcOptions.FRAMESIZE, "1MB");
testConfig.set(RpcOptions.CAPTURE_ASK_CALLSTACK, true);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-rpc-akka