Core networking functionality for Apache Spark including transport layers, buffer management, client-server communication, cryptographic protocols, SASL authentication, and shuffle database support using Netty for high-performance network I/O.
—
The configuration management API provides a flexible and extensible system for managing transport layer settings and network parameters in Apache Spark. It uses a provider pattern to abstract configuration sources and includes comprehensive settings for performance tuning, security, and network behavior.
Central configuration management class for the transport layer, providing access to all network-related settings.
/**
* Create a transport configuration with module name and config provider
* @param module - String identifier for the configuration module (e.g., "spark", "shuffle")
* @param conf - ConfigProvider instance supplying configuration values
*/
public TransportConf(String module, ConfigProvider conf);
/**
* Get the module name for this configuration
* @return String representing the module identifier
*/
public String getModule();
/**
* Get the underlying configuration provider
* @return ConfigProvider instance used by this configuration
*/
public ConfigProvider getConfigProvider();Settings for controlling low-level I/O behavior and performance characteristics.
/**
* Get the I/O mode for network operations
* @return String representing the I/O mode ("NIO" or "EPOLL")
*/
public String ioMode();
/**
* Check if direct ByteBuffers should be preferred for network operations
* @return boolean indicating preference for direct buffers
*/
public boolean preferDirectBufs();
/**
* Get the size of receive buffers for network operations
* @return int representing buffer size in bytes
*/
public int receiveBuf();
/**
* Get the size of send buffers for network operations
* @return int representing buffer size in bytes
*/
public int sendBuf();
/**
* Check if TCP_NODELAY should be enabled
* @return boolean indicating if Nagle's algorithm should be disabled
*/
public boolean enableTcpKeepAlive();
/**
* Check if SO_REUSEADDR should be enabled
* @return boolean indicating if address reuse should be enabled
*/
public boolean enableTcpReuseAddr();Configuration for connection timeouts, pooling, and lifecycle management.
/**
* Get the connection timeout in milliseconds
* @return int representing timeout for establishing connections
*/
public int connectionTimeoutMs();
/**
* Get the connection creation timeout in milliseconds
* @return int representing timeout for creating new connections
*/
public int connectionCreationTimeoutMs();
/**
* Get the number of connections per peer
* @return int representing maximum connections to maintain per remote peer
*/
public int numConnectionsPerPeer();
/**
* Get the maximum number of retries for connection attempts
* @return int representing retry count for failed connections
*/
public int maxRetries();
/**
* Get the retry wait time in milliseconds
* @return int representing wait time between connection retry attempts
*/
public int retryWaitMs();
/**
* Get the idle timeout for connections in milliseconds
* @return int representing timeout for idle connection cleanup
*/
public int connectionIdleTimeoutMs();Settings for controlling thread pool sizes and concurrency behavior.
/**
* Get the number of server threads for handling connections
* @return int representing server thread pool size
*/
public int serverThreads();
/**
* Get the number of client threads for handling connections
* @return int representing client thread pool size
*/
public int clientThreads();
/**
* Get the number of threads for the shared client factory
* @return int representing shared thread pool size
*/
public int sharedClientFactoryThreads();
/**
* Check if client factory should be shared across contexts
* @return boolean indicating if client factory sharing is enabled
*/
public boolean sharedClientFactory();Settings for encryption, authentication, and security-related features.
/**
* Check if transport layer encryption is enabled
* @return boolean indicating if encryption should be used
*/
public boolean encryptionEnabled();
/**
* Check if SASL encryption is enabled
* @return boolean indicating if SASL encryption should be used
*/
public boolean saslEncryption();
/**
* Get the encryption key length in bits
* @return int representing key length (128, 192, or 256)
*/
public int encryptionKeyLength();
/**
* Get the cipher transformation for encryption
* @return String representing the cipher transformation (e.g., "AES/CTR/NoPadding")
*/
public String cipherTransformation();
/**
* Check if authentication is required
* @return boolean indicating if client authentication is mandatory
*/
public boolean authenticationEnabled();
/**
* Get the SASL authentication timeout in milliseconds
* @return int representing timeout for SASL authentication
*/
public int saslTimeoutMs();Configuration for memory usage, buffer management, and garbage collection optimization.
/**
* Get the maximum size for in-memory shuffle blocks
* @return long representing maximum block size in bytes
*/
public long maxInMemoryShuffleBlockSize();
/**
* Get the memory fraction for off-heap storage
* @return double representing fraction of available memory for off-heap use
*/
public double memoryFraction();
/**
* Check if off-heap memory should be used
* @return boolean indicating if off-heap storage is enabled
*/
public boolean offHeapEnabled();
/**
* Get the memory map threshold for file operations
* @return long representing minimum file size for memory mapping
*/
public long memoryMapThreshold();Configuration for data transfer behavior, streaming operations, and chunk management.
/**
* Get the maximum number of chunks per TransferTo operation
* @return long representing maximum chunks in a single transfer
*/
public long maxChunksBeingTransferred();
/**
* Get the maximum size of messages that can be sent
* @return long representing maximum message size in bytes
*/
public long maxMessageSize();
/**
* Get the chunk fetch buffer size
* @return int representing buffer size for chunk fetching operations
*/
public int chunkFetchBufferSize();
/**
* Get the timeout for individual chunk fetch operations
* @return int representing timeout in milliseconds
*/
public int chunkFetchTimeoutMs();
/**
* Check if zero-copy streaming is enabled
* @return boolean indicating if zero-copy operations should be used
*/
public boolean zeroCopyStreaming();Abstract base class for providing configuration values from various sources.
/**
* Get a configuration value by name
* @param name - String key for the configuration property
* @return String value of the property, or null if not found
*/
public abstract String get(String name);
/**
* Get a configuration value with a default fallback
* @param name - String key for the configuration property
* @param defaultValue - String default value if property is not found
* @return String value of the property, or defaultValue if not found
*/
public String get(String name, String defaultValue);
/**
* Get a boolean configuration value with default fallback
* @param name - String key for the configuration property
* @param defaultValue - boolean default value if property is not found
* @return boolean value of the property, or defaultValue if not found or invalid
*/
public boolean getBoolean(String name, boolean defaultValue);
/**
* Get an integer configuration value with default fallback
* @param name - String key for the configuration property
* @param defaultValue - int default value if property is not found
* @return int value of the property, or defaultValue if not found or invalid
*/
public int getInt(String name, int defaultValue);
/**
* Get a long configuration value with default fallback
* @param name - String key for the configuration property
* @param defaultValue - long default value if property is not found
* @return long value of the property, or defaultValue if not found or invalid
*/
public long getLong(String name, long defaultValue);
/**
* Get a double configuration value with default fallback
* @param name - String key for the configuration property
* @param defaultValue - double default value if property is not found
* @return double value of the property, or defaultValue if not found or invalid
*/
public double getDouble(String name, double defaultValue);
/**
* Get all configuration properties as a map
* @return Map<String, String> containing all configuration key-value pairs
*/
public Map<String, String> getAll();Map-based implementation of ConfigProvider for simple configuration scenarios.
/**
* Create a configuration provider backed by a Map
* @param properties - Map<String, String> containing configuration properties
*/
public MapConfigProvider(Map<String, String> properties);
@Override
public String get(String name);
@Override
public Map<String, String> getAll();
/**
* Add or update a configuration property
* @param key - String property name
* @param value - String property value
*/
public void set(String key, String value);
/**
* Remove a configuration property
* @param key - String property name to remove
*/
public void remove(String key);
/**
* Clear all configuration properties
*/
public void clear();Enumeration for selecting the underlying I/O implementation.
public enum IOMode {
/**
* Standard NIO implementation (default, works on all platforms)
*/
NIO,
/**
* Linux epoll implementation (higher performance on Linux)
*/
EPOLL;
/**
* Parse IOMode from string representation
* @param value - String value ("NIO" or "EPOLL")
* @return IOMode corresponding to the string
* @throws IllegalArgumentException if value is not recognized
*/
public static IOMode parse(String value);
}import org.apache.spark.network.util.*;
import java.util.HashMap;
import java.util.Map;
// Create configuration properties
Map<String, String> properties = new HashMap<>();
properties.put("spark.network.timeout", "120s");
properties.put("spark.network.io.mode", "NIO");
properties.put("spark.network.io.preferDirectBufs", "true");
properties.put("spark.network.io.connectionTimeout", "30s");
properties.put("spark.network.io.numConnectionsPerPeer", "1");
properties.put("spark.network.io.serverThreads", "0"); // 0 means use default
properties.put("spark.network.io.clientThreads", "0");
// Create config provider
ConfigProvider configProvider = new MapConfigProvider(properties);
// Create transport configuration
TransportConf conf = new TransportConf("spark", configProvider);
// Access configuration values
System.out.println("I/O Mode: " + conf.ioMode());
System.out.println("Prefer Direct Buffers: " + conf.preferDirectBufs());
System.out.println("Connection Timeout: " + conf.connectionTimeoutMs() + "ms");
System.out.println("Connections Per Peer: " + conf.numConnectionsPerPeer());
System.out.println("Server Threads: " + conf.serverThreads());
System.out.println("Client Threads: " + conf.clientThreads());// Configure transport security
Map<String, String> securityProperties = new HashMap<>();
securityProperties.put("spark.network.crypto.enabled", "true");
securityProperties.put("spark.network.crypto.keyLength", "256");
securityProperties.put("spark.network.crypto.cipherTransformation", "AES/GCM/NoPadding");
securityProperties.put("spark.network.sasl.encryption", "true");
securityProperties.put("spark.network.sasl.timeout", "30s");
securityProperties.put("spark.authenticate", "true");
ConfigProvider securityProvider = new MapConfigProvider(securityProperties);
TransportConf securityConf = new TransportConf("secure-spark", securityProvider);
// Check security settings
System.out.println("Encryption Enabled: " + securityConf.encryptionEnabled());
System.out.println("SASL Encryption: " + securityConf.saslEncryption());
System.out.println("Key Length: " + securityConf.encryptionKeyLength() + " bits");
System.out.println("Cipher: " + securityConf.cipherTransformation());
System.out.println("Authentication Required: " + securityConf.authenticationEnabled());
System.out.println("SASL Timeout: " + securityConf.saslTimeoutMs() + "ms");// Configure for high-performance scenarios
Map<String, String> performanceProperties = new HashMap<>();
performanceProperties.put("spark.network.io.mode", "EPOLL"); // Linux only
performanceProperties.put("spark.network.io.preferDirectBufs", "true");
performanceProperties.put("spark.network.io.serverThreads", "8");
performanceProperties.put("spark.network.io.clientThreads", "8");
performanceProperties.put("spark.network.io.numConnectionsPerPeer", "2");
performanceProperties.put("spark.network.io.receiveBuf", "65536"); // 64KB
performanceProperties.put("spark.network.io.sendBuf", "65536"); // 64KB
performanceProperties.put("spark.network.maxMessageSize", "128MB");
performanceProperties.put("spark.network.zeroCopy", "true");
ConfigProvider perfProvider = new MapConfigProvider(performanceProperties);
TransportConf perfConf = new TransportConf("high-perf", perfProvider);
// Validate performance settings
System.out.println("Performance Configuration:");
System.out.println(" I/O Mode: " + perfConf.ioMode());
System.out.println(" Direct Buffers: " + perfConf.preferDirectBufs());
System.out.println(" Server Threads: " + perfConf.serverThreads());
System.out.println(" Client Threads: " + perfConf.clientThreads());
System.out.println(" Connections Per Peer: " + perfConf.numConnectionsPerPeer());
System.out.println(" Receive Buffer: " + perfConf.receiveBuf() + " bytes");
System.out.println(" Send Buffer: " + perfConf.sendBuf() + " bytes");
System.out.println(" Max Message Size: " + perfConf.maxMessageSize() + " bytes");
System.out.println(" Zero Copy: " + perfConf.zeroCopyStreaming());// Configure memory settings for large-scale operations
Map<String, String> memoryProperties = new HashMap<>();
memoryProperties.put("spark.network.memory.fraction", "0.8");
memoryProperties.put("spark.network.memory.offHeap.enabled", "true");
memoryProperties.put("spark.network.memory.memoryMapThreshold", "2MB");
memoryProperties.put("spark.shuffle.file.buffer", "1MB");
memoryProperties.put("spark.network.maxInMemoryShuffleBlockSize", "64MB");
ConfigProvider memoryProvider = new MapConfigProvider(memoryProperties);
TransportConf memoryConf = new TransportConf("memory-optimized", memoryProvider);
System.out.println("Memory Configuration:");
System.out.println(" Memory Fraction: " + memoryConf.memoryFraction());
System.out.println(" Off-Heap Enabled: " + memoryConf.offHeapEnabled());
System.out.println(" Memory Map Threshold: " + memoryConf.memoryMapThreshold() + " bytes");
System.out.println(" Max In-Memory Block Size: " + memoryConf.maxInMemoryShuffleBlockSize() + " bytes");// Create mutable configuration that can be updated at runtime
MapConfigProvider dynamicProvider = new MapConfigProvider(new HashMap<>());
// Initial configuration
dynamicProvider.set("spark.network.timeout", "60s");
dynamicProvider.set("spark.network.io.connectionTimeout", "15s");
TransportConf dynamicConf = new TransportConf("dynamic", dynamicProvider);
System.out.println("Initial timeout: " + dynamicConf.connectionTimeoutMs() + "ms");
// Update configuration at runtime
dynamicProvider.set("spark.network.io.connectionTimeout", "30s");
System.out.println("Updated timeout: " + dynamicConf.connectionTimeoutMs() + "ms");
// Add new configuration
dynamicProvider.set("spark.network.io.numConnectionsPerPeer", "3");
System.out.println("Connections per peer: " + dynamicConf.numConnectionsPerPeer());
// Remove configuration (falls back to default)
dynamicProvider.remove("spark.network.io.numConnectionsPerPeer");
System.out.println("Default connections per peer: " + dynamicConf.numConnectionsPerPeer());// Custom configuration provider that loads from multiple sources
public class HierarchicalConfigProvider extends ConfigProvider {
private final List<ConfigProvider> providers;
public HierarchicalConfigProvider(ConfigProvider... providers) {
this.providers = Arrays.asList(providers);
}
@Override
public String get(String name) {
// Check providers in order, return first non-null value
for (ConfigProvider provider : providers) {
String value = provider.get(name);
if (value != null) {
return value;
}
}
return null;
}
@Override
public Map<String, String> getAll() {
Map<String, String> result = new HashMap<>();
// Merge all providers, with later providers overriding earlier ones
for (ConfigProvider provider : providers) {
result.putAll(provider.getAll());
}
return result;
}
}
// Usage: system properties override environment, which overrides defaults
Map<String, String> defaults = new HashMap<>();
defaults.put("spark.network.timeout", "120s");
defaults.put("spark.network.io.mode", "NIO");
Map<String, String> environment = new HashMap<>();
environment.put("spark.network.timeout", "180s"); // Override default
Map<String, String> systemProps = new HashMap<>();
systemProps.put("spark.network.io.mode", "EPOLL"); // Override environment
HierarchicalConfigProvider hierarchical = new HierarchicalConfigProvider(
new MapConfigProvider(defaults),
new MapConfigProvider(environment),
new MapConfigProvider(systemProps)
);
TransportConf hierarchicalConf = new TransportConf("hierarchical", hierarchical);
System.out.println("Final timeout: " + hierarchicalConf.connectionTimeoutMs() + "ms"); // 180s from environment
System.out.println("Final I/O mode: " + hierarchicalConf.ioMode()); // EPOLL from system props// Utility methods for configuration validation and debugging
public class ConfigValidator {
public static void validateConfiguration(TransportConf conf) {
System.out.println("=== Transport Configuration Validation ===");
// Validate I/O settings
if (conf.ioMode().equals("EPOLL") && !isLinux()) {
System.out.println("WARNING: EPOLL mode is only supported on Linux");
}
// Validate thread counts
if (conf.serverThreads() < 0) {
System.out.println("WARNING: Invalid server thread count: " + conf.serverThreads());
}
// Validate timeouts
if (conf.connectionTimeoutMs() <= 0) {
System.out.println("WARNING: Invalid connection timeout: " + conf.connectionTimeoutMs());
}
// Validate security settings
if (conf.encryptionEnabled() && conf.encryptionKeyLength() < 128) {
System.out.println("WARNING: Weak encryption key length: " + conf.encryptionKeyLength());
}
System.out.println("Configuration validation completed");
}
public static void printConfiguration(TransportConf conf) {
System.out.println("=== Transport Configuration Summary ===");
System.out.println("Module: " + conf.getModule());
System.out.println("\nI/O Settings:");
System.out.println(" Mode: " + conf.ioMode());
System.out.println(" Prefer Direct Buffers: " + conf.preferDirectBufs());
System.out.println(" Receive Buffer: " + conf.receiveBuf() + " bytes");
System.out.println(" Send Buffer: " + conf.sendBuf() + " bytes");
System.out.println("\nConnection Settings:");
System.out.println(" Timeout: " + conf.connectionTimeoutMs() + "ms");
System.out.println(" Creation Timeout: " + conf.connectionCreationTimeoutMs() + "ms");
System.out.println(" Connections Per Peer: " + conf.numConnectionsPerPeer());
System.out.println(" Max Retries: " + conf.maxRetries());
System.out.println("\nThread Settings:");
System.out.println(" Server Threads: " + conf.serverThreads());
System.out.println(" Client Threads: " + conf.clientThreads());
System.out.println("\nSecurity Settings:");
System.out.println(" Encryption Enabled: " + conf.encryptionEnabled());
System.out.println(" SASL Encryption: " + conf.saslEncryption());
System.out.println(" Authentication Required: " + conf.authenticationEnabled());
if (conf.encryptionEnabled()) {
System.out.println(" Key Length: " + conf.encryptionKeyLength() + " bits");
System.out.println(" Cipher: " + conf.cipherTransformation());
}
System.out.println("\nMemory Settings:");
System.out.println(" Off-Heap Enabled: " + conf.offHeapEnabled());
System.out.println(" Memory Fraction: " + conf.memoryFraction());
System.out.println(" Memory Map Threshold: " + conf.memoryMapThreshold() + " bytes");
System.out.println("=== End Configuration Summary ===");
}
private static boolean isLinux() {
return System.getProperty("os.name").toLowerCase().contains("linux");
}
}
// Usage
TransportConf conf = new TransportConf("validation-test", configProvider);
ConfigValidator.validateConfiguration(conf);
ConfigValidator.printConfiguration(conf);// Predefined configuration profiles for common scenarios
public class ConfigProfiles {
public static TransportConf createDevelopmentConfig() {
Map<String, String> devProps = new HashMap<>();
devProps.put("spark.network.timeout", "30s");
devProps.put("spark.network.io.connectionTimeout", "10s");
devProps.put("spark.network.io.numConnectionsPerPeer", "1");
devProps.put("spark.network.io.serverThreads", "2");
devProps.put("spark.network.io.clientThreads", "2");
devProps.put("spark.authenticate", "false");
devProps.put("spark.network.crypto.enabled", "false");
return new TransportConf("development", new MapConfigProvider(devProps));
}
public static TransportConf createProductionConfig() {
Map<String, String> prodProps = new HashMap<>();
prodProps.put("spark.network.timeout", "300s");
prodProps.put("spark.network.io.connectionTimeout", "60s");
prodProps.put("spark.network.io.numConnectionsPerPeer", "2");
prodProps.put("spark.network.io.serverThreads", "0"); // Use defaults
prodProps.put("spark.network.io.clientThreads", "0");
prodProps.put("spark.authenticate", "true");
prodProps.put("spark.network.crypto.enabled", "true");
prodProps.put("spark.network.crypto.keyLength", "256");
prodProps.put("spark.network.sasl.encryption", "true");
return new TransportConf("production", new MapConfigProvider(prodProps));
}
public static TransportConf createHighThroughputConfig() {
Map<String, String> htProps = new HashMap<>();
htProps.put("spark.network.io.mode", "EPOLL");
htProps.put("spark.network.io.preferDirectBufs", "true");
htProps.put("spark.network.io.numConnectionsPerPeer", "4");
htProps.put("spark.network.io.receiveBuf", "131072"); // 128KB
htProps.put("spark.network.io.sendBuf", "131072"); // 128KB
htProps.put("spark.network.maxMessageSize", "268435456"); // 256MB
htProps.put("spark.network.zeroCopy", "true");
htProps.put("spark.network.memory.offHeap.enabled", "true");
return new TransportConf("high-throughput", new MapConfigProvider(htProps));
}
}
// Usage
TransportConf devConf = ConfigProfiles.createDevelopmentConfig();
TransportConf prodConf = ConfigProfiles.createProductionConfig();
TransportConf htConf = ConfigProfiles.createHighThroughputConfig();
System.out.println("Development config - encryption: " + devConf.encryptionEnabled());
System.out.println("Production config - encryption: " + prodConf.encryptionEnabled());
System.out.println("High throughput config - I/O mode: " + htConf.ioMode());Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-12