Core networking library for Apache Spark providing transport layer abstractions and utilities
—
Comprehensive configuration system with performance tuning options for connection management, I/O settings, security parameters, and utility classes for networking operations.
Central configuration management for all transport settings with performance tuning options and security controls.
/**
* Central location for all transport configuration settings
* Provides typed access to configuration values with defaults
*/
public class TransportConf {
/**
* Create a transport configuration
* @param module Module name for configuration key prefixes
* @param conf Configuration provider for retrieving values
*/
public TransportConf(String module, ConfigProvider conf);
/**
* Get integer configuration value with default
* @param name Configuration key name
* @param defaultValue Default value if key not found
* @return Integer configuration value
*/
public int getInt(String name, int defaultValue);
/**
* Get string configuration value with default
* @param name Configuration key name
* @param defaultValue Default value if key not found
* @return String configuration value
*/
public String get(String name, String defaultValue);
/**
* Get the module name for this configuration
* @return Module name string
*/
public String getModuleName();
/**
* Get I/O mode setting (nio or epoll)
* @return I/O mode string
*/
public String ioMode();
/**
* Whether to prefer direct (off-heap) byte buffers
* @return true to use direct buffers, false for heap buffers
*/
public boolean preferDirectBufs();
/**
* Connection timeout in milliseconds
* @return Timeout for establishing connections
*/
public int connectionTimeoutMs();
/**
* Number of concurrent connections per peer
* @return Maximum concurrent connections to same host
*/
public int numConnectionsPerPeer();
/**
* Server socket backlog size
* @return Max length of pending connection queue
*/
public int backLog();
/**
* Number of server threads for handling connections
* @return Server thread pool size
*/
public int serverThreads();
/**
* Number of client threads for handling connections
* @return Client thread pool size
*/
public int clientThreads();
/**
* Socket receive buffer size (SO_RCVBUF)
* @return Receive buffer size in bytes
*/
public int receiveBuf();
/**
* Socket send buffer size (SO_SNDBUF)
* @return Send buffer size in bytes
*/
public int sendBuf();
/**
* Authentication round trip timeout
* @return Auth timeout in milliseconds
*/
public int authRTTimeoutMs();
/**
* Maximum I/O retry attempts
* @return Max retry count for failed I/O operations
*/
public int maxIORetries();
/**
* Wait time between I/O retry attempts
* @return Retry wait time in milliseconds
*/
public int ioRetryWaitTimeMs();
/**
* Minimum size threshold for memory mapping files
* @return Minimum bytes to use memory mapping
*/
public int memoryMapBytes();
/**
* Whether to initialize file descriptors lazily
* @return true for lazy initialization, false for eager
*/
public boolean lazyFileDescriptor();
/**
* Whether to track detailed Netty metrics
* @return true to enable verbose metrics, false for basic
*/
public boolean verboseMetrics();
/**
* Maximum port binding retry attempts
* @return Max retries when binding to ports
*/
public int portMaxRetries();
/**
* Whether transport encryption is enabled
* @return true if encryption should be used
*/
public boolean encryptionEnabled();
/**
* Cipher transformation for encryption
* @return Cipher transformation string (e.g., "AES/CTR/NoPadding")
*/
public String cipherTransformation();
/**
* Maximum concurrent chunks being transferred on shuffle service
* @return Max concurrent chunk transfers
*/
public long maxChunksBeingTransferred();
}Usage Examples:
// Create configuration with custom settings
Map<String, String> configMap = new HashMap<>();
configMap.put("spark.network.timeout", "120s");
configMap.put("spark.network.io.mode", "NIO");
configMap.put("spark.network.io.numConnectionsPerPeer", "3");
configMap.put("spark.network.io.serverThreads", "8");
configMap.put("spark.network.io.clientThreads", "8");
configMap.put("spark.authenticate", "true");
configMap.put("spark.network.crypto.enabled", "true");
MapConfigProvider configProvider = new MapConfigProvider(configMap);
TransportConf conf = new TransportConf("myapp", configProvider);
// Use configuration values
System.out.println("Connection timeout: " + conf.connectionTimeoutMs() + "ms");
System.out.println("Connections per peer: " + conf.numConnectionsPerPeer());
System.out.println("Encryption enabled: " + conf.encryptionEnabled());
System.out.println("I/O mode: " + conf.ioMode());
// Custom configuration values
int customValue = conf.getInt("myapp.custom.setting", 100);
String customString = conf.get("myapp.custom.name", "default");Abstract configuration provider system with concrete implementations for different configuration sources.
/**
* Abstract provider for configuration values
* Enables pluggable configuration sources
*/
public abstract class ConfigProvider {
/**
* Get configuration value by key
* @param name Configuration key
* @return Configuration value or null if not found
*/
public abstract String get(String name);
}
/**
* ConfigProvider backed by a Map for in-memory configuration
*/
public class MapConfigProvider extends ConfigProvider {
/**
* Create a map-based configuration provider
* @param props Map containing configuration key-value pairs
*/
public MapConfigProvider(Map<String, String> props);
/**
* Get configuration value from the map
* @param name Configuration key
* @return Configuration value or null if not found
*/
public String get(String name);
}Usage Examples:
// Map-based configuration
Map<String, String> config = new HashMap<>();
config.put("spark.network.timeout", "60s");
config.put("spark.network.io.mode", "EPOLL");
config.put("spark.authenticate", "true");
ConfigProvider provider = new MapConfigProvider(config);
TransportConf conf = new TransportConf("spark", provider);
// Properties-based configuration
public class PropertiesConfigProvider extends ConfigProvider {
private final Properties properties;
public PropertiesConfigProvider(Properties properties) {
this.properties = properties;
}
@Override
public String get(String name) {
return properties.getProperty(name);
}
}
// File-based configuration
Properties props = new Properties();
try (InputStream is = new FileInputStream("transport.properties")) {
props.load(is);
}
ConfigProvider fileProvider = new PropertiesConfigProvider(props);
TransportConf fileConf = new TransportConf("file-app", fileProvider);General Java utility functions for networking operations including resource management and parsing functions.
/**
* Java utility functions for networking operations
*/
public class JavaUtils {
/**
* Close a Closeable resource without throwing exceptions
* Logs any exceptions that occur during closing
* @param closeable Resource to close (may be null)
*/
public static void closeQuietly(Closeable closeable);
/**
* Parse a time string (e.g., "30s", "5m", "2h") as seconds
* @param str Time string with unit suffix
* @return Time value in seconds
* @throws NumberFormatException if string format is invalid
*/
public static long timeStringAsSec(String str);
/**
* Parse a byte string (e.g., "1k", "512m", "2g") as bytes
* @param str Byte string with unit suffix
* @return Byte value as long
* @throws NumberFormatException if string format is invalid
*/
public static long byteStringAsBytes(String str);
}Usage Examples:
// Safe resource cleanup
FileInputStream fis = null;
try {
fis = new FileInputStream("data.bin");
// Use stream
} catch (IOException e) {
System.err.println("Error: " + e.getMessage());
} finally {
JavaUtils.closeQuietly(fis); // Won't throw exception
}
// Parse time strings
long timeout = JavaUtils.timeStringAsSec("30s"); // 30
long maxWait = JavaUtils.timeStringAsSec("5m"); // 300
long deadline = JavaUtils.timeStringAsSec("2h"); // 7200
// Parse byte strings
long bufferSize = JavaUtils.byteStringAsBytes("64k"); // 65536
long maxMemory = JavaUtils.byteStringAsBytes("512m"); // 536870912
long diskSpace = JavaUtils.byteStringAsBytes("2g"); // 2147483648
// Use in configuration
Map<String, String> config = new HashMap<>();
config.put("network.timeout", "120s");
config.put("buffer.size", "1m");
TransportConf conf = new TransportConf("app", new MapConfigProvider(config));
long timeoutMs = JavaUtils.timeStringAsSec(conf.get("network.timeout", "60s")) * 1000;
long bufferBytes = JavaUtils.byteStringAsBytes(conf.get("buffer.size", "64k"));Netty-specific utility functions for channel operations and frame decoding with transport protocol support.
/**
* Netty-specific utility functions for channel operations
*/
public class NettyUtils {
/**
* Get remote address of a channel as string
* @param channel Netty channel
* @return Remote address string (host:port format)
*/
public static String getRemoteAddress(Channel channel);
/**
* Create a frame decoder for the transport protocol
* @return TransportFrameDecoder instance
*/
public static TransportFrameDecoder createFrameDecoder();
}Usage Examples:
// Get remote address for logging
Channel channel = // ... obtained from somewhere
String remoteAddr = NettyUtils.getRemoteAddress(channel);
System.out.println("Connection from: " + remoteAddr);
// Add frame decoder to pipeline
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("frameDecoder", NettyUtils.createFrameDecoder());Cryptography utility functions for secure networking operations including key generation and cipher management.
/**
* Cryptography utility functions for secure networking
*/
public class CryptoUtils {
/**
* Generate a secure random byte array
* @param length Number of bytes to generate
* @return Random byte array
*/
public static byte[] randomBytes(int length);
/**
* Create cipher instance with the specified transformation
* @param transformation Cipher transformation string
* @return Cipher instance
* @throws GeneralSecurityException if cipher creation fails
*/
public static Cipher createCipher(String transformation) throws GeneralSecurityException;
/**
* Derive key from password using PBKDF2
* @param password Password string
* @param salt Salt bytes
* @param iterations Number of iterations
* @param keyLength Key length in bits
* @return Derived key bytes
* @throws GeneralSecurityException if key derivation fails
*/
public static byte[] deriveKey(String password, byte[] salt, int iterations, int keyLength)
throws GeneralSecurityException;
}Usage Examples:
// Generate random data for keys and IVs
byte[] key = CryptoUtils.randomBytes(32); // 256-bit key
byte[] iv = CryptoUtils.randomBytes(16); // 128-bit IV
byte[] salt = CryptoUtils.randomBytes(16); // Salt for key derivation
// Create cipher for encryption
try {
Cipher cipher = CryptoUtils.createCipher("AES/CTR/NoPadding");
cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(key, "AES"), new IvParameterSpec(iv));
// Use cipher for encryption
} catch (GeneralSecurityException e) {
System.err.println("Cipher creation failed: " + e.getMessage());
}
// Derive key from password
try {
String password = "user-password";
byte[] derivedKey = CryptoUtils.deriveKey(password, salt, 10000, 256);
// Use derived key for encryption
} catch (GeneralSecurityException e) {
System.err.println("Key derivation failed: " + e.getMessage());
}Type-safe enumerations for configuration options including I/O modes and byte units.
/**
* Enumeration of I/O modes for transport operations
*/
public enum IOMode {
/** Java NIO-based I/O */
NIO,
/** Linux epoll-based I/O (higher performance on Linux) */
EPOLL;
}
/**
* Enumeration of byte units for configuration values
*/
public enum ByteUnit {
/** Single byte */
BYTE(1L),
/** Kibibyte (1024 bytes) */
KiB(1024L),
/** Mebibyte (1024^2 bytes) */
MiB(1024L * 1024L),
/** Gibibyte (1024^3 bytes) */
GiB(1024L * 1024L * 1024L),
/** Tebibyte (1024^4 bytes) */
TiB(1024L * 1024L * 1024L * 1024L),
/** Pebibyte (1024^5 bytes) */
PiB(1024L * 1024L * 1024L * 1024L * 1024L);
private final long bytes;
ByteUnit(long bytes) {
this.bytes = bytes;
}
/**
* Convert value in this unit to bytes
* @param value Value in this unit
* @return Value in bytes
*/
public long toBytes(long value);
/**
* Convert bytes to value in this unit
* @param bytes Value in bytes
* @return Value in this unit
*/
public long fromBytes(long bytes);
}Usage Examples:
// Using byte units
long bufferSize = ByteUnit.MiB.toBytes(64); // 64 MB in bytes
long diskSpace = ByteUnit.GiB.toBytes(10); // 10 GB in bytes
// Configuration with byte units
long configuredSize = conf.getInt("buffer.size.mb", 32);
long actualBytes = ByteUnit.MiB.toBytes(configuredSize);
// I/O mode configuration
String ioModeStr = conf.get("io.mode", "NIO");
IOMode ioMode = IOMode.valueOf(ioModeStr.toUpperCase());
switch (ioMode) {
case NIO:
System.out.println("Using Java NIO");
break;
case EPOLL:
System.out.println("Using Linux epoll (high performance)");
break;
}Transport frame decoder for handling message framing with optional stream interception capabilities.
/**
* Netty frame decoder for transport protocol messages
* Handles message framing and optional stream interception
*/
public class TransportFrameDecoder extends LengthFieldBasedFrameDecoder {
/**
* Create a frame decoder with default settings
*/
public TransportFrameDecoder();
/**
* Create a frame decoder with stream interceptor
* @param interceptor Optional interceptor for stream frames
*/
public TransportFrameDecoder(Interceptor interceptor);
/**
* Interface for intercepting decoded frames before processing
*/
public interface Interceptor {
/**
* Intercept a decoded frame
* @param ctx Channel handler context
* @param msgHeader Message header bytes
* @param msgBody Message body as ManagedBuffer
* @return true to continue processing, false to consume the frame
* @throws Exception if interception fails
*/
boolean handle(ChannelHandlerContext ctx, ByteBuf msgHeader, ManagedBuffer msgBody)
throws Exception;
}
}Usage Examples:
// Basic frame decoder
TransportFrameDecoder decoder = new TransportFrameDecoder();
pipeline.addLast("frameDecoder", decoder);
// Frame decoder with interceptor
TransportFrameDecoder.Interceptor streamInterceptor =
new TransportFrameDecoder.Interceptor() {
@Override
public boolean handle(ChannelHandlerContext ctx, ByteBuf msgHeader, ManagedBuffer msgBody)
throws Exception {
// Check if this is a stream frame we want to intercept
if (isStreamFrame(msgHeader)) {
handleStreamFrame(msgBody);
return false; // Consume the frame
}
return true; // Continue normal processing
}
};
TransportFrameDecoder interceptingDecoder = new TransportFrameDecoder(streamInterceptor);
pipeline.addLast("frameDecoder", interceptingDecoder);// High-performance production configuration
Map<String, String> prodConfig = new HashMap<>();
// Connection settings
prodConfig.put("spark.network.timeout", "300s");
prodConfig.put("spark.network.io.mode", "EPOLL");
prodConfig.put("spark.network.io.numConnectionsPerPeer", "5");
prodConfig.put("spark.network.io.serverThreads", "16");
prodConfig.put("spark.network.io.clientThreads", "16");
// Buffer settings
prodConfig.put("spark.network.io.preferDirectBufs", "true");
prodConfig.put("spark.network.io.receiveBuf", "1m");
prodConfig.put("spark.network.io.sendBuf", "1m");
// Security settings
prodConfig.put("spark.authenticate", "true");
prodConfig.put("spark.network.crypto.enabled", "true");
prodConfig.put("spark.network.crypto.keyLength", "256");
// Performance tuning
prodConfig.put("spark.network.io.memoryMapBytes", "2m");
prodConfig.put("spark.network.io.lazyFD", "true");
prodConfig.put("spark.network.maxChunksBeingTransferred", "1000");
TransportConf prodConf = new TransportConf("spark", new MapConfigProvider(prodConfig));// Development/testing configuration
Map<String, String> devConfig = new HashMap<>();
// Relaxed timeouts for debugging
devConfig.put("spark.network.timeout", "600s");
devConfig.put("spark.network.io.mode", "NIO");
devConfig.put("spark.network.io.numConnectionsPerPeer", "1");
// Smaller thread pools
devConfig.put("spark.network.io.serverThreads", "2");
devConfig.put("spark.network.io.clientThreads", "2");
// Verbose metrics for monitoring
devConfig.put("spark.network.verbose.metrics", "true");
// Disabled security for easier testing
devConfig.put("spark.authenticate", "false");
devConfig.put("spark.network.crypto.enabled", "false");
TransportConf devConf = new TransportConf("spark-dev", new MapConfigProvider(devConfig));// Configuration that can be updated at runtime
public class DynamicConfigProvider extends ConfigProvider {
private volatile Map<String, String> config = new ConcurrentHashMap<>();
public void updateConfig(String key, String value) {
config.put(key, value);
}
public void removeConfig(String key) {
config.remove(key);
}
@Override
public String get(String name) {
return config.get(name);
}
}
DynamicConfigProvider dynamicProvider = new DynamicConfigProvider();
TransportConf dynamicConf = new TransportConf("dynamic", dynamicProvider);
// Update configuration at runtime
dynamicProvider.updateConfig("spark.network.timeout", "120s");
dynamicProvider.updateConfig("spark.network.io.numConnectionsPerPeer", "3");Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-common