CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common-2-12

Core networking functionality for Apache Spark including transport layers, buffer management, client-server communication, cryptographic protocols, SASL authentication, and shuffle database support using Netty for high-performance network I/O.

Pending
Overview
Eval results
Files

configuration-management.mddocs/

Configuration Management

The configuration management API provides a flexible and extensible system for managing transport layer settings and network parameters in Apache Spark. It uses a provider pattern to abstract configuration sources and includes comprehensive settings for performance tuning, security, and network behavior.

Capabilities

TransportConf

Central configuration management class for the transport layer, providing access to all network-related settings.

/**
 * Create a transport configuration with module name and config provider
 * @param module - String identifier for the configuration module (e.g., "spark", "shuffle")
 * @param conf - ConfigProvider instance supplying configuration values
 */
public TransportConf(String module, ConfigProvider conf);

/**
 * Get the module name for this configuration
 * @return String representing the module identifier
 */
public String getModule();

/**
 * Get the underlying configuration provider
 * @return ConfigProvider instance used by this configuration
 */
public ConfigProvider getConfigProvider();

I/O Configuration

Settings for controlling low-level I/O behavior and performance characteristics.

/**
 * Get the I/O mode for network operations
 * @return String representing the I/O mode ("NIO" or "EPOLL")
 */
public String ioMode();

/**
 * Check if direct ByteBuffers should be preferred for network operations
 * @return boolean indicating preference for direct buffers
 */
public boolean preferDirectBufs();

/**
 * Get the size of receive buffers for network operations
 * @return int representing buffer size in bytes
 */
public int receiveBuf();

/**
 * Get the size of send buffers for network operations
 * @return int representing buffer size in bytes
 */
public int sendBuf();

/**
 * Check if TCP_NODELAY should be enabled
 * @return boolean indicating if Nagle's algorithm should be disabled
 */
public boolean enableTcpKeepAlive();

/**
 * Check if SO_REUSEADDR should be enabled
 * @return boolean indicating if address reuse should be enabled
 */
public boolean enableTcpReuseAddr();

Connection Management

Configuration for connection timeouts, pooling, and lifecycle management.

/**
 * Get the connection timeout in milliseconds
 * @return int representing timeout for establishing connections
 */
public int connectionTimeoutMs();

/**
 * Get the connection creation timeout in milliseconds
 * @return int representing timeout for creating new connections
 */
public int connectionCreationTimeoutMs();

/**
 * Get the number of connections per peer
 * @return int representing maximum connections to maintain per remote peer
 */
public int numConnectionsPerPeer();

/**
 * Get the maximum number of retries for connection attempts
 * @return int representing retry count for failed connections
 */
public int maxRetries();

/**
 * Get the retry wait time in milliseconds
 * @return int representing wait time between connection retry attempts
 */
public int retryWaitMs();

/**
 * Get the idle timeout for connections in milliseconds
 * @return int representing timeout for idle connection cleanup
 */
public int connectionIdleTimeoutMs();

Threading Configuration

Settings for controlling thread pool sizes and concurrency behavior.

/**
 * Get the number of server threads for handling connections
 * @return int representing server thread pool size
 */
public int serverThreads();

/**
 * Get the number of client threads for handling connections
 * @return int representing client thread pool size
 */
public int clientThreads();

/**
 * Get the number of threads for the shared client factory
 * @return int representing shared thread pool size
 */
public int sharedClientFactoryThreads();

/**
 * Check if client factory should be shared across contexts
 * @return boolean indicating if client factory sharing is enabled
 */
public boolean sharedClientFactory();

Security Configuration

Settings for encryption, authentication, and security-related features.

/**
 * Check if transport layer encryption is enabled
 * @return boolean indicating if encryption should be used
 */
public boolean encryptionEnabled();

/**
 * Check if SASL encryption is enabled
 * @return boolean indicating if SASL encryption should be used
 */
public boolean saslEncryption();

/**
 * Get the encryption key length in bits
 * @return int representing key length (128, 192, or 256)
 */
public int encryptionKeyLength();

/**
 * Get the cipher transformation for encryption
 * @return String representing the cipher transformation (e.g., "AES/CTR/NoPadding")
 */
public String cipherTransformation();

/**
 * Check if authentication is required
 * @return boolean indicating if client authentication is mandatory
 */
public boolean authenticationEnabled();

/**
 * Get the SASL authentication timeout in milliseconds
 * @return int representing timeout for SASL authentication
 */
public int saslTimeoutMs();

Memory Management

Configuration for memory usage, buffer management, and garbage collection optimization.

/**
 * Get the maximum size for in-memory shuffle blocks
 * @return long representing maximum block size in bytes
 */
public long maxInMemoryShuffleBlockSize();

/**
 * Get the memory fraction for off-heap storage
 * @return double representing fraction of available memory for off-heap use
 */
public double memoryFraction();

/**
 * Check if off-heap memory should be used
 * @return boolean indicating if off-heap storage is enabled
 */
public boolean offHeapEnabled();

/**
 * Get the memory map threshold for file operations
 * @return long representing minimum file size for memory mapping
 */
public long memoryMapThreshold();

Transfer and Streaming Settings

Configuration for data transfer behavior, streaming operations, and chunk management.

/**
 * Get the maximum number of chunks per TransferTo operation
 * @return long representing maximum chunks in a single transfer
 */
public long maxChunksBeingTransferred();

/**
 * Get the maximum size of messages that can be sent
 * @return long representing maximum message size in bytes
 */
public long maxMessageSize();

/**
 * Get the chunk fetch buffer size
 * @return int representing buffer size for chunk fetching operations
 */
public int chunkFetchBufferSize();

/**
 * Get the timeout for individual chunk fetch operations
 * @return int representing timeout in milliseconds
 */
public int chunkFetchTimeoutMs();

/**
 * Check if zero-copy streaming is enabled
 * @return boolean indicating if zero-copy operations should be used
 */
public boolean zeroCopyStreaming();

Configuration Provider

ConfigProvider (Abstract Class)

Abstract base class for providing configuration values from various sources.

/**
 * Get a configuration value by name
 * @param name - String key for the configuration property
 * @return String value of the property, or null if not found
 */
public abstract String get(String name);

/**
 * Get a configuration value with a default fallback
 * @param name - String key for the configuration property
 * @param defaultValue - String default value if property is not found
 * @return String value of the property, or defaultValue if not found
 */
public String get(String name, String defaultValue);

/**
 * Get a boolean configuration value with default fallback
 * @param name - String key for the configuration property
 * @param defaultValue - boolean default value if property is not found
 * @return boolean value of the property, or defaultValue if not found or invalid
 */
public boolean getBoolean(String name, boolean defaultValue);

/**
 * Get an integer configuration value with default fallback
 * @param name - String key for the configuration property
 * @param defaultValue - int default value if property is not found
 * @return int value of the property, or defaultValue if not found or invalid
 */
public int getInt(String name, int defaultValue);

/**
 * Get a long configuration value with default fallback
 * @param name - String key for the configuration property
 * @param defaultValue - long default value if property is not found
 * @return long value of the property, or defaultValue if not found or invalid
 */
public long getLong(String name, long defaultValue);

/**
 * Get a double configuration value with default fallback
 * @param name - String key for the configuration property
 * @param defaultValue - double default value if property is not found
 * @return double value of the property, or defaultValue if not found or invalid
 */
public double getDouble(String name, double defaultValue);

/**
 * Get all configuration properties as a map
 * @return Map<String, String> containing all configuration key-value pairs
 */
public Map<String, String> getAll();

MapConfigProvider

Map-based implementation of ConfigProvider for simple configuration scenarios.

/**
 * Create a configuration provider backed by a Map
 * @param properties - Map<String, String> containing configuration properties
 */
public MapConfigProvider(Map<String, String> properties);

@Override
public String get(String name);

@Override
public Map<String, String> getAll();

/**
 * Add or update a configuration property
 * @param key - String property name
 * @param value - String property value
 */
public void set(String key, String value);

/**
 * Remove a configuration property
 * @param key - String property name to remove
 */
public void remove(String key);

/**
 * Clear all configuration properties
 */
public void clear();

I/O Mode Enumeration

IOMode

Enumeration for selecting the underlying I/O implementation.

public enum IOMode {
    /**
     * Standard NIO implementation (default, works on all platforms)
     */
    NIO,
    
    /**
     * Linux epoll implementation (higher performance on Linux)
     */
    EPOLL;
    
    /**
     * Parse IOMode from string representation
     * @param value - String value ("NIO" or "EPOLL")
     * @return IOMode corresponding to the string
     * @throws IllegalArgumentException if value is not recognized
     */
    public static IOMode parse(String value);
}

Usage Examples

Basic Configuration Setup

import org.apache.spark.network.util.*;
import java.util.HashMap;
import java.util.Map;

// Create configuration properties
Map<String, String> properties = new HashMap<>();
properties.put("spark.network.timeout", "120s");
properties.put("spark.network.io.mode", "NIO");
properties.put("spark.network.io.preferDirectBufs", "true");
properties.put("spark.network.io.connectionTimeout", "30s");
properties.put("spark.network.io.numConnectionsPerPeer", "1");
properties.put("spark.network.io.serverThreads", "0"); // 0 means use default
properties.put("spark.network.io.clientThreads", "0");

// Create config provider
ConfigProvider configProvider = new MapConfigProvider(properties);

// Create transport configuration
TransportConf conf = new TransportConf("spark", configProvider);

// Access configuration values
System.out.println("I/O Mode: " + conf.ioMode());
System.out.println("Prefer Direct Buffers: " + conf.preferDirectBufs());
System.out.println("Connection Timeout: " + conf.connectionTimeoutMs() + "ms");
System.out.println("Connections Per Peer: " + conf.numConnectionsPerPeer());
System.out.println("Server Threads: " + conf.serverThreads());
System.out.println("Client Threads: " + conf.clientThreads());

Security Configuration

// Configure transport security
Map<String, String> securityProperties = new HashMap<>();
securityProperties.put("spark.network.crypto.enabled", "true");
securityProperties.put("spark.network.crypto.keyLength", "256");
securityProperties.put("spark.network.crypto.cipherTransformation", "AES/GCM/NoPadding");
securityProperties.put("spark.network.sasl.encryption", "true");
securityProperties.put("spark.network.sasl.timeout", "30s");
securityProperties.put("spark.authenticate", "true");

ConfigProvider securityProvider = new MapConfigProvider(securityProperties);
TransportConf securityConf = new TransportConf("secure-spark", securityProvider);

// Check security settings
System.out.println("Encryption Enabled: " + securityConf.encryptionEnabled());
System.out.println("SASL Encryption: " + securityConf.saslEncryption());
System.out.println("Key Length: " + securityConf.encryptionKeyLength() + " bits");
System.out.println("Cipher: " + securityConf.cipherTransformation());
System.out.println("Authentication Required: " + securityConf.authenticationEnabled());
System.out.println("SASL Timeout: " + securityConf.saslTimeoutMs() + "ms");

Performance Tuning Configuration

// Configure for high-performance scenarios
Map<String, String> performanceProperties = new HashMap<>();
performanceProperties.put("spark.network.io.mode", "EPOLL"); // Linux only
performanceProperties.put("spark.network.io.preferDirectBufs", "true");
performanceProperties.put("spark.network.io.serverThreads", "8");
performanceProperties.put("spark.network.io.clientThreads", "8");
performanceProperties.put("spark.network.io.numConnectionsPerPeer", "2");
performanceProperties.put("spark.network.io.receiveBuf", "65536"); // 64KB
performanceProperties.put("spark.network.io.sendBuf", "65536"); // 64KB
performanceProperties.put("spark.network.maxMessageSize", "128MB");
performanceProperties.put("spark.network.zeroCopy", "true");

ConfigProvider perfProvider = new MapConfigProvider(performanceProperties);
TransportConf perfConf = new TransportConf("high-perf", perfProvider);

// Validate performance settings
System.out.println("Performance Configuration:");
System.out.println("  I/O Mode: " + perfConf.ioMode());
System.out.println("  Direct Buffers: " + perfConf.preferDirectBufs());
System.out.println("  Server Threads: " + perfConf.serverThreads());
System.out.println("  Client Threads: " + perfConf.clientThreads());
System.out.println("  Connections Per Peer: " + perfConf.numConnectionsPerPeer());
System.out.println("  Receive Buffer: " + perfConf.receiveBuf() + " bytes");
System.out.println("  Send Buffer: " + perfConf.sendBuf() + " bytes");
System.out.println("  Max Message Size: " + perfConf.maxMessageSize() + " bytes");
System.out.println("  Zero Copy: " + perfConf.zeroCopyStreaming());

Memory Management Configuration

// Configure memory settings for large-scale operations
Map<String, String> memoryProperties = new HashMap<>();
memoryProperties.put("spark.network.memory.fraction", "0.8");
memoryProperties.put("spark.network.memory.offHeap.enabled", "true");
memoryProperties.put("spark.network.memory.memoryMapThreshold", "2MB");
memoryProperties.put("spark.shuffle.file.buffer", "1MB");
memoryProperties.put("spark.network.maxInMemoryShuffleBlockSize", "64MB");

ConfigProvider memoryProvider = new MapConfigProvider(memoryProperties);
TransportConf memoryConf = new TransportConf("memory-optimized", memoryProvider);

System.out.println("Memory Configuration:");
System.out.println("  Memory Fraction: " + memoryConf.memoryFraction());
System.out.println("  Off-Heap Enabled: " + memoryConf.offHeapEnabled());
System.out.println("  Memory Map Threshold: " + memoryConf.memoryMapThreshold() + " bytes");
System.out.println("  Max In-Memory Block Size: " + memoryConf.maxInMemoryShuffleBlockSize() + " bytes");

Dynamic Configuration Updates

// Create mutable configuration that can be updated at runtime
MapConfigProvider dynamicProvider = new MapConfigProvider(new HashMap<>());

// Initial configuration
dynamicProvider.set("spark.network.timeout", "60s");
dynamicProvider.set("spark.network.io.connectionTimeout", "15s");

TransportConf dynamicConf = new TransportConf("dynamic", dynamicProvider);
System.out.println("Initial timeout: " + dynamicConf.connectionTimeoutMs() + "ms");

// Update configuration at runtime
dynamicProvider.set("spark.network.io.connectionTimeout", "30s");
System.out.println("Updated timeout: " + dynamicConf.connectionTimeoutMs() + "ms");

// Add new configuration
dynamicProvider.set("spark.network.io.numConnectionsPerPeer", "3");
System.out.println("Connections per peer: " + dynamicConf.numConnectionsPerPeer());

// Remove configuration (falls back to default)
dynamicProvider.remove("spark.network.io.numConnectionsPerPeer");
System.out.println("Default connections per peer: " + dynamicConf.numConnectionsPerPeer());

Custom Configuration Provider

// Custom configuration provider that loads from multiple sources
public class HierarchicalConfigProvider extends ConfigProvider {
    private final List<ConfigProvider> providers;
    
    public HierarchicalConfigProvider(ConfigProvider... providers) {
        this.providers = Arrays.asList(providers);
    }
    
    @Override
    public String get(String name) {
        // Check providers in order, return first non-null value
        for (ConfigProvider provider : providers) {
            String value = provider.get(name);
            if (value != null) {
                return value;
            }
        }
        return null;
    }
    
    @Override
    public Map<String, String> getAll() {
        Map<String, String> result = new HashMap<>();
        // Merge all providers, with later providers overriding earlier ones
        for (ConfigProvider provider : providers) {
            result.putAll(provider.getAll());
        }
        return result;
    }
}

// Usage: system properties override environment, which overrides defaults
Map<String, String> defaults = new HashMap<>();
defaults.put("spark.network.timeout", "120s");
defaults.put("spark.network.io.mode", "NIO");

Map<String, String> environment = new HashMap<>();
environment.put("spark.network.timeout", "180s"); // Override default

Map<String, String> systemProps = new HashMap<>();
systemProps.put("spark.network.io.mode", "EPOLL"); // Override environment

HierarchicalConfigProvider hierarchical = new HierarchicalConfigProvider(
    new MapConfigProvider(defaults),
    new MapConfigProvider(environment),
    new MapConfigProvider(systemProps)
);

TransportConf hierarchicalConf = new TransportConf("hierarchical", hierarchical);
System.out.println("Final timeout: " + hierarchicalConf.connectionTimeoutMs() + "ms"); // 180s from environment
System.out.println("Final I/O mode: " + hierarchicalConf.ioMode()); // EPOLL from system props

Configuration Validation and Debugging

// Utility methods for configuration validation and debugging
public class ConfigValidator {
    
    public static void validateConfiguration(TransportConf conf) {
        System.out.println("=== Transport Configuration Validation ===");
        
        // Validate I/O settings
        if (conf.ioMode().equals("EPOLL") && !isLinux()) {
            System.out.println("WARNING: EPOLL mode is only supported on Linux");
        }
        
        // Validate thread counts
        if (conf.serverThreads() < 0) {
            System.out.println("WARNING: Invalid server thread count: " + conf.serverThreads());
        }
        
        // Validate timeouts
        if (conf.connectionTimeoutMs() <= 0) {
            System.out.println("WARNING: Invalid connection timeout: " + conf.connectionTimeoutMs());
        }
        
        // Validate security settings
        if (conf.encryptionEnabled() && conf.encryptionKeyLength() < 128) {
            System.out.println("WARNING: Weak encryption key length: " + conf.encryptionKeyLength());
        }
        
        System.out.println("Configuration validation completed");
    }
    
    public static void printConfiguration(TransportConf conf) {
        System.out.println("=== Transport Configuration Summary ===");
        System.out.println("Module: " + conf.getModule());
        
        System.out.println("\nI/O Settings:");
        System.out.println("  Mode: " + conf.ioMode());
        System.out.println("  Prefer Direct Buffers: " + conf.preferDirectBufs());
        System.out.println("  Receive Buffer: " + conf.receiveBuf() + " bytes");
        System.out.println("  Send Buffer: " + conf.sendBuf() + " bytes");
        
        System.out.println("\nConnection Settings:");
        System.out.println("  Timeout: " + conf.connectionTimeoutMs() + "ms");
        System.out.println("  Creation Timeout: " + conf.connectionCreationTimeoutMs() + "ms");
        System.out.println("  Connections Per Peer: " + conf.numConnectionsPerPeer());
        System.out.println("  Max Retries: " + conf.maxRetries());
        
        System.out.println("\nThread Settings:");
        System.out.println("  Server Threads: " + conf.serverThreads());
        System.out.println("  Client Threads: " + conf.clientThreads());
        
        System.out.println("\nSecurity Settings:");
        System.out.println("  Encryption Enabled: " + conf.encryptionEnabled());
        System.out.println("  SASL Encryption: " + conf.saslEncryption());
        System.out.println("  Authentication Required: " + conf.authenticationEnabled());
        
        if (conf.encryptionEnabled()) {
            System.out.println("  Key Length: " + conf.encryptionKeyLength() + " bits");
            System.out.println("  Cipher: " + conf.cipherTransformation());
        }
        
        System.out.println("\nMemory Settings:");
        System.out.println("  Off-Heap Enabled: " + conf.offHeapEnabled());
        System.out.println("  Memory Fraction: " + conf.memoryFraction());
        System.out.println("  Memory Map Threshold: " + conf.memoryMapThreshold() + " bytes");
        
        System.out.println("=== End Configuration Summary ===");
    }
    
    private static boolean isLinux() {
        return System.getProperty("os.name").toLowerCase().contains("linux");
    }
}

// Usage
TransportConf conf = new TransportConf("validation-test", configProvider);
ConfigValidator.validateConfiguration(conf);
ConfigValidator.printConfiguration(conf);

Configuration Profiles

// Predefined configuration profiles for common scenarios
public class ConfigProfiles {
    
    public static TransportConf createDevelopmentConfig() {
        Map<String, String> devProps = new HashMap<>();
        devProps.put("spark.network.timeout", "30s");
        devProps.put("spark.network.io.connectionTimeout", "10s");
        devProps.put("spark.network.io.numConnectionsPerPeer", "1");
        devProps.put("spark.network.io.serverThreads", "2");
        devProps.put("spark.network.io.clientThreads", "2");
        devProps.put("spark.authenticate", "false");
        devProps.put("spark.network.crypto.enabled", "false");
        
        return new TransportConf("development", new MapConfigProvider(devProps));
    }
    
    public static TransportConf createProductionConfig() {
        Map<String, String> prodProps = new HashMap<>();
        prodProps.put("spark.network.timeout", "300s");
        prodProps.put("spark.network.io.connectionTimeout", "60s");
        prodProps.put("spark.network.io.numConnectionsPerPeer", "2");
        prodProps.put("spark.network.io.serverThreads", "0"); // Use defaults
        prodProps.put("spark.network.io.clientThreads", "0");
        prodProps.put("spark.authenticate", "true");
        prodProps.put("spark.network.crypto.enabled", "true");
        prodProps.put("spark.network.crypto.keyLength", "256");
        prodProps.put("spark.network.sasl.encryption", "true");
        
        return new TransportConf("production", new MapConfigProvider(prodProps));
    }
    
    public static TransportConf createHighThroughputConfig() {
        Map<String, String> htProps = new HashMap<>();
        htProps.put("spark.network.io.mode", "EPOLL");
        htProps.put("spark.network.io.preferDirectBufs", "true");
        htProps.put("spark.network.io.numConnectionsPerPeer", "4");
        htProps.put("spark.network.io.receiveBuf", "131072"); // 128KB
        htProps.put("spark.network.io.sendBuf", "131072"); // 128KB
        htProps.put("spark.network.maxMessageSize", "268435456"); // 256MB
        htProps.put("spark.network.zeroCopy", "true");
        htProps.put("spark.network.memory.offHeap.enabled", "true");
        
        return new TransportConf("high-throughput", new MapConfigProvider(htProps));
    }
}

// Usage
TransportConf devConf = ConfigProfiles.createDevelopmentConfig();
TransportConf prodConf = ConfigProfiles.createProductionConfig();
TransportConf htConf = ConfigProfiles.createHighThroughputConfig();

System.out.println("Development config - encryption: " + devConf.encryptionEnabled());
System.out.println("Production config - encryption: " + prodConf.encryptionEnabled());
System.out.println("High throughput config - I/O mode: " + htConf.ioMode());

Best Practices

Configuration Organization

  1. Module Naming: Use descriptive module names to distinguish different transport contexts
  2. Property Grouping: Group related properties using consistent naming conventions
  3. Environment Separation: Use different configurations for development, testing, and production
  4. Default Values: Provide sensible defaults for all configuration properties

Performance Considerations

  1. I/O Mode Selection: Use EPOLL on Linux for better performance
  2. Thread Pool Sizing: Set thread counts based on workload characteristics
  3. Buffer Sizing: Tune buffer sizes based on typical message sizes
  4. Connection Pooling: Configure appropriate connection counts per peer

Security Best Practices

  1. Enable Authentication: Always enable authentication in production environments
  2. Use Strong Encryption: Use AES-256 with GCM mode for authenticated encryption
  3. Timeout Configuration: Set appropriate timeouts to prevent resource exhaustion
  4. Key Management: Use secure key distribution mechanisms

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-12

docs

buffer-management.md

client-operations.md

configuration-management.md

index.md

message-protocol.md

security-authentication.md

server-operations.md

shuffle-database.md

transport-context.md

tile.json