CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common

Core networking library for Apache Spark providing transport layer abstractions and utilities

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration and Utilities

Comprehensive configuration system with performance tuning options for connection management, I/O settings, security parameters, and utility classes for networking operations.

Capabilities

Transport Configuration

Central configuration management for all transport settings with performance tuning options and security controls.

/**
 * Central location for all transport configuration settings
 * Provides typed access to configuration values with defaults
 */
public class TransportConf {
    /**
     * Create a transport configuration
     * @param module Module name for configuration key prefixes
     * @param conf Configuration provider for retrieving values
     */
    public TransportConf(String module, ConfigProvider conf);
    
    /**
     * Get integer configuration value with default
     * @param name Configuration key name
     * @param defaultValue Default value if key not found
     * @return Integer configuration value
     */
    public int getInt(String name, int defaultValue);
    
    /**
     * Get string configuration value with default
     * @param name Configuration key name
     * @param defaultValue Default value if key not found
     * @return String configuration value
     */
    public String get(String name, String defaultValue);
    
    /**
     * Get the module name for this configuration
     * @return Module name string
     */
    public String getModuleName();
    
    /**
     * Get I/O mode setting (nio or epoll)
     * @return I/O mode string
     */
    public String ioMode();
    
    /**
     * Whether to prefer direct (off-heap) byte buffers
     * @return true to use direct buffers, false for heap buffers
     */
    public boolean preferDirectBufs();
    
    /**
     * Connection timeout in milliseconds
     * @return Timeout for establishing connections
     */
    public int connectionTimeoutMs();
    
    /**
     * Number of concurrent connections per peer
     * @return Maximum concurrent connections to same host
     */
    public int numConnectionsPerPeer();
    
    /**
     * Server socket backlog size
     * @return Max length of pending connection queue
     */
    public int backLog();
    
    /**
     * Number of server threads for handling connections
     * @return Server thread pool size
     */
    public int serverThreads();
    
    /**
     * Number of client threads for handling connections
     * @return Client thread pool size
     */
    public int clientThreads();
    
    /**
     * Socket receive buffer size (SO_RCVBUF)
     * @return Receive buffer size in bytes
     */
    public int receiveBuf();
    
    /**
     * Socket send buffer size (SO_SNDBUF)
     * @return Send buffer size in bytes
     */
    public int sendBuf();
    
    /**
     * Authentication round trip timeout
     * @return Auth timeout in milliseconds
     */
    public int authRTTimeoutMs();
    
    /**
     * Maximum I/O retry attempts
     * @return Max retry count for failed I/O operations
     */
    public int maxIORetries();
    
    /**
     * Wait time between I/O retry attempts
     * @return Retry wait time in milliseconds
     */
    public int ioRetryWaitTimeMs();
    
    /**
     * Minimum size threshold for memory mapping files
     * @return Minimum bytes to use memory mapping
     */
    public int memoryMapBytes();
    
    /**
     * Whether to initialize file descriptors lazily
     * @return true for lazy initialization, false for eager
     */
    public boolean lazyFileDescriptor();
    
    /**
     * Whether to track detailed Netty metrics
     * @return true to enable verbose metrics, false for basic
     */
    public boolean verboseMetrics();
    
    /**
     * Maximum port binding retry attempts
     * @return Max retries when binding to ports
     */
    public int portMaxRetries();
    
    /**
     * Whether transport encryption is enabled
     * @return true if encryption should be used
     */
    public boolean encryptionEnabled();
    
    /**
     * Cipher transformation for encryption
     * @return Cipher transformation string (e.g., "AES/CTR/NoPadding")
     */
    public String cipherTransformation();
    
    /**
     * Maximum concurrent chunks being transferred on shuffle service
     * @return Max concurrent chunk transfers
     */
    public long maxChunksBeingTransferred();
}

Usage Examples:

// Create configuration with custom settings
Map<String, String> configMap = new HashMap<>();
configMap.put("spark.network.timeout", "120s");
configMap.put("spark.network.io.mode", "NIO");
configMap.put("spark.network.io.numConnectionsPerPeer", "3");
configMap.put("spark.network.io.serverThreads", "8");
configMap.put("spark.network.io.clientThreads", "8");
configMap.put("spark.authenticate", "true");
configMap.put("spark.network.crypto.enabled", "true");

MapConfigProvider configProvider = new MapConfigProvider(configMap);
TransportConf conf = new TransportConf("myapp", configProvider);

// Use configuration values
System.out.println("Connection timeout: " + conf.connectionTimeoutMs() + "ms");
System.out.println("Connections per peer: " + conf.numConnectionsPerPeer());
System.out.println("Encryption enabled: " + conf.encryptionEnabled());
System.out.println("I/O mode: " + conf.ioMode());

// Custom configuration values
int customValue = conf.getInt("myapp.custom.setting", 100);
String customString = conf.get("myapp.custom.name", "default");

Configuration Providers

Abstract configuration provider system with concrete implementations for different configuration sources.

/**
 * Abstract provider for configuration values
 * Enables pluggable configuration sources
 */
public abstract class ConfigProvider {
    /**
     * Get configuration value by key
     * @param name Configuration key
     * @return Configuration value or null if not found
     */
    public abstract String get(String name);
}

/**
 * ConfigProvider backed by a Map for in-memory configuration
 */
public class MapConfigProvider extends ConfigProvider {
    /**
     * Create a map-based configuration provider
     * @param props Map containing configuration key-value pairs
     */
    public MapConfigProvider(Map<String, String> props);
    
    /**
     * Get configuration value from the map
     * @param name Configuration key
     * @return Configuration value or null if not found
     */
    public String get(String name);
}

Usage Examples:

// Map-based configuration
Map<String, String> config = new HashMap<>();
config.put("spark.network.timeout", "60s");
config.put("spark.network.io.mode", "EPOLL");
config.put("spark.authenticate", "true");

ConfigProvider provider = new MapConfigProvider(config);
TransportConf conf = new TransportConf("spark", provider);

// Properties-based configuration
public class PropertiesConfigProvider extends ConfigProvider {
    private final Properties properties;
    
    public PropertiesConfigProvider(Properties properties) {
        this.properties = properties;
    }
    
    @Override
    public String get(String name) {
        return properties.getProperty(name);
    }
}

// File-based configuration
Properties props = new Properties();
try (InputStream is = new FileInputStream("transport.properties")) {
    props.load(is);
}
ConfigProvider fileProvider = new PropertiesConfigProvider(props);
TransportConf fileConf = new TransportConf("file-app", fileProvider);

Java Utilities

General Java utility functions for networking operations including resource management and parsing functions.

/**
 * Java utility functions for networking operations
 */
public class JavaUtils {
    /**
     * Close a Closeable resource without throwing exceptions
     * Logs any exceptions that occur during closing
     * @param closeable Resource to close (may be null)
     */
    public static void closeQuietly(Closeable closeable);
    
    /**
     * Parse a time string (e.g., "30s", "5m", "2h") as seconds
     * @param str Time string with unit suffix
     * @return Time value in seconds
     * @throws NumberFormatException if string format is invalid
     */
    public static long timeStringAsSec(String str);
    
    /**
     * Parse a byte string (e.g., "1k", "512m", "2g") as bytes
     * @param str Byte string with unit suffix
     * @return Byte value as long
     * @throws NumberFormatException if string format is invalid
     */
    public static long byteStringAsBytes(String str);
}

Usage Examples:

// Safe resource cleanup
FileInputStream fis = null;
try {
    fis = new FileInputStream("data.bin");
    // Use stream
} catch (IOException e) {
    System.err.println("Error: " + e.getMessage());
} finally {
    JavaUtils.closeQuietly(fis); // Won't throw exception
}

// Parse time strings
long timeout = JavaUtils.timeStringAsSec("30s");     // 30
long maxWait = JavaUtils.timeStringAsSec("5m");      // 300
long deadline = JavaUtils.timeStringAsSec("2h");     // 7200

// Parse byte strings
long bufferSize = JavaUtils.byteStringAsBytes("64k");    // 65536
long maxMemory = JavaUtils.byteStringAsBytes("512m");    // 536870912
long diskSpace = JavaUtils.byteStringAsBytes("2g");      // 2147483648

// Use in configuration
Map<String, String> config = new HashMap<>();
config.put("network.timeout", "120s");
config.put("buffer.size", "1m");

TransportConf conf = new TransportConf("app", new MapConfigProvider(config));
long timeoutMs = JavaUtils.timeStringAsSec(conf.get("network.timeout", "60s")) * 1000;
long bufferBytes = JavaUtils.byteStringAsBytes(conf.get("buffer.size", "64k"));

Netty Utilities

Netty-specific utility functions for channel operations and frame decoding with transport protocol support.

/**
 * Netty-specific utility functions for channel operations
 */
public class NettyUtils {
    /**
     * Get remote address of a channel as string
     * @param channel Netty channel
     * @return Remote address string (host:port format)
     */
    public static String getRemoteAddress(Channel channel);
    
    /**
     * Create a frame decoder for the transport protocol
     * @return TransportFrameDecoder instance
     */
    public static TransportFrameDecoder createFrameDecoder();
}

Usage Examples:

// Get remote address for logging
Channel channel = // ... obtained from somewhere
String remoteAddr = NettyUtils.getRemoteAddress(channel);
System.out.println("Connection from: " + remoteAddr);

// Add frame decoder to pipeline
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("frameDecoder", NettyUtils.createFrameDecoder());

Crypto Utilities

Cryptography utility functions for secure networking operations including key generation and cipher management.

/**
 * Cryptography utility functions for secure networking
 */
public class CryptoUtils {
    /**
     * Generate a secure random byte array
     * @param length Number of bytes to generate
     * @return Random byte array
     */
    public static byte[] randomBytes(int length);
    
    /**
     * Create cipher instance with the specified transformation
     * @param transformation Cipher transformation string
     * @return Cipher instance
     * @throws GeneralSecurityException if cipher creation fails
     */
    public static Cipher createCipher(String transformation) throws GeneralSecurityException;
    
    /**
     * Derive key from password using PBKDF2
     * @param password Password string
     * @param salt Salt bytes
     * @param iterations Number of iterations
     * @param keyLength Key length in bits
     * @return Derived key bytes
     * @throws GeneralSecurityException if key derivation fails
     */
    public static byte[] deriveKey(String password, byte[] salt, int iterations, int keyLength) 
        throws GeneralSecurityException;
}

Usage Examples:

// Generate random data for keys and IVs
byte[] key = CryptoUtils.randomBytes(32);  // 256-bit key
byte[] iv = CryptoUtils.randomBytes(16);   // 128-bit IV
byte[] salt = CryptoUtils.randomBytes(16); // Salt for key derivation

// Create cipher for encryption
try {
    Cipher cipher = CryptoUtils.createCipher("AES/CTR/NoPadding");
    cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(key, "AES"), new IvParameterSpec(iv));
    // Use cipher for encryption
} catch (GeneralSecurityException e) {
    System.err.println("Cipher creation failed: " + e.getMessage());
}

// Derive key from password
try {
    String password = "user-password";
    byte[] derivedKey = CryptoUtils.deriveKey(password, salt, 10000, 256);
    // Use derived key for encryption
} catch (GeneralSecurityException e) {
    System.err.println("Key derivation failed: " + e.getMessage());
}

Enumerations

Type-safe enumerations for configuration options including I/O modes and byte units.

/**
 * Enumeration of I/O modes for transport operations
 */
public enum IOMode {
    /** Java NIO-based I/O */
    NIO,
    
    /** Linux epoll-based I/O (higher performance on Linux) */
    EPOLL;
}

/**
 * Enumeration of byte units for configuration values
 */
public enum ByteUnit {
    /** Single byte */
    BYTE(1L),
    
    /** Kibibyte (1024 bytes) */
    KiB(1024L),
    
    /** Mebibyte (1024^2 bytes) */
    MiB(1024L * 1024L),
    
    /** Gibibyte (1024^3 bytes) */
    GiB(1024L * 1024L * 1024L),
    
    /** Tebibyte (1024^4 bytes) */
    TiB(1024L * 1024L * 1024L * 1024L),
    
    /** Pebibyte (1024^5 bytes) */
    PiB(1024L * 1024L * 1024L * 1024L * 1024L);
    
    private final long bytes;
    
    ByteUnit(long bytes) {
        this.bytes = bytes;
    }
    
    /**
     * Convert value in this unit to bytes
     * @param value Value in this unit
     * @return Value in bytes
     */
    public long toBytes(long value);
    
    /**
     * Convert bytes to value in this unit
     * @param bytes Value in bytes
     * @return Value in this unit
     */
    public long fromBytes(long bytes);
}

Usage Examples:

// Using byte units
long bufferSize = ByteUnit.MiB.toBytes(64); // 64 MB in bytes
long diskSpace = ByteUnit.GiB.toBytes(10);  // 10 GB in bytes

// Configuration with byte units
long configuredSize = conf.getInt("buffer.size.mb", 32);
long actualBytes = ByteUnit.MiB.toBytes(configuredSize);

// I/O mode configuration
String ioModeStr = conf.get("io.mode", "NIO");
IOMode ioMode = IOMode.valueOf(ioModeStr.toUpperCase());

switch (ioMode) {
    case NIO:
        System.out.println("Using Java NIO");
        break;
    case EPOLL:
        System.out.println("Using Linux epoll (high performance)");
        break;
}

Frame Decoder

Transport frame decoder for handling message framing with optional stream interception capabilities.

/**
 * Netty frame decoder for transport protocol messages
 * Handles message framing and optional stream interception
 */
public class TransportFrameDecoder extends LengthFieldBasedFrameDecoder {
    /**
     * Create a frame decoder with default settings
     */
    public TransportFrameDecoder();
    
    /**
     * Create a frame decoder with stream interceptor
     * @param interceptor Optional interceptor for stream frames
     */
    public TransportFrameDecoder(Interceptor interceptor);
    
    /**
     * Interface for intercepting decoded frames before processing
     */
    public interface Interceptor {
        /**
         * Intercept a decoded frame
         * @param ctx Channel handler context
         * @param msgHeader Message header bytes
         * @param msgBody Message body as ManagedBuffer
         * @return true to continue processing, false to consume the frame
         * @throws Exception if interception fails
         */
        boolean handle(ChannelHandlerContext ctx, ByteBuf msgHeader, ManagedBuffer msgBody) 
            throws Exception;
    }
}

Usage Examples:

// Basic frame decoder
TransportFrameDecoder decoder = new TransportFrameDecoder();
pipeline.addLast("frameDecoder", decoder);

// Frame decoder with interceptor
TransportFrameDecoder.Interceptor streamInterceptor = 
    new TransportFrameDecoder.Interceptor() {
        @Override
        public boolean handle(ChannelHandlerContext ctx, ByteBuf msgHeader, ManagedBuffer msgBody) 
            throws Exception {
            // Check if this is a stream frame we want to intercept
            if (isStreamFrame(msgHeader)) {
                handleStreamFrame(msgBody);
                return false; // Consume the frame
            }
            return true; // Continue normal processing
        }
    };

TransportFrameDecoder interceptingDecoder = new TransportFrameDecoder(streamInterceptor);
pipeline.addLast("frameDecoder", interceptingDecoder);

Configuration Usage Patterns

Production Configuration

// High-performance production configuration
Map<String, String> prodConfig = new HashMap<>();

// Connection settings
prodConfig.put("spark.network.timeout", "300s");
prodConfig.put("spark.network.io.mode", "EPOLL");
prodConfig.put("spark.network.io.numConnectionsPerPeer", "5");
prodConfig.put("spark.network.io.serverThreads", "16");
prodConfig.put("spark.network.io.clientThreads", "16");

// Buffer settings
prodConfig.put("spark.network.io.preferDirectBufs", "true");
prodConfig.put("spark.network.io.receiveBuf", "1m");
prodConfig.put("spark.network.io.sendBuf", "1m");

// Security settings
prodConfig.put("spark.authenticate", "true");
prodConfig.put("spark.network.crypto.enabled", "true");
prodConfig.put("spark.network.crypto.keyLength", "256");

// Performance tuning
prodConfig.put("spark.network.io.memoryMapBytes", "2m");
prodConfig.put("spark.network.io.lazyFD", "true");
prodConfig.put("spark.network.maxChunksBeingTransferred", "1000");

TransportConf prodConf = new TransportConf("spark", new MapConfigProvider(prodConfig));

Development Configuration

// Development/testing configuration
Map<String, String> devConfig = new HashMap<>();

// Relaxed timeouts for debugging
devConfig.put("spark.network.timeout", "600s");
devConfig.put("spark.network.io.mode", "NIO");
devConfig.put("spark.network.io.numConnectionsPerPeer", "1");

// Smaller thread pools
devConfig.put("spark.network.io.serverThreads", "2");
devConfig.put("spark.network.io.clientThreads", "2");

// Verbose metrics for monitoring
devConfig.put("spark.network.verbose.metrics", "true");

// Disabled security for easier testing
devConfig.put("spark.authenticate", "false");
devConfig.put("spark.network.crypto.enabled", "false");

TransportConf devConf = new TransportConf("spark-dev", new MapConfigProvider(devConfig));

Dynamic Configuration

// Configuration that can be updated at runtime
public class DynamicConfigProvider extends ConfigProvider {
    private volatile Map<String, String> config = new ConcurrentHashMap<>();
    
    public void updateConfig(String key, String value) {
        config.put(key, value);
    }
    
    public void removeConfig(String key) {
        config.remove(key);
    }
    
    @Override
    public String get(String name) {
        return config.get(name);
    }
}

DynamicConfigProvider dynamicProvider = new DynamicConfigProvider();
TransportConf dynamicConf = new TransportConf("dynamic", dynamicProvider);

// Update configuration at runtime
dynamicProvider.updateConfig("spark.network.timeout", "120s");
dynamicProvider.updateConfig("spark.network.io.numConnectionsPerPeer", "3");

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-network-common

docs

authentication.md

buffers.md

configuration.md

index.md

protocol.md

streaming.md

transport.md

tile.json