CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common-2-10

Core networking infrastructure for Apache Spark cluster computing with transport layer, RPC, chunk fetching, and SASL authentication

Pending
Overview
Eval results
Files

configuration-utilities.mddocs/

Configuration and Utilities

Configuration system and utility classes for transport settings, Netty integration, and Java operations. The configuration framework provides centralized management of transport behavior with multiple provider implementations.

Capabilities

TransportConf

Central configuration class managing all transport layer settings and behavior.

/**
 * TransportConf manages configuration settings for the Spark transport layer.
 * It provides typed access to all networking configuration options with defaults.
 */
public class TransportConf {
  /**
   * Creates a transport configuration.
   * 
   * @param module The configuration module name (used as key prefix)
   * @param conf The configuration provider for loading settings
   */
  public TransportConf(String module, ConfigProvider conf);
  
  /**
   * Gets the I/O mode for network operations.
   * 
   * @return "NIO" or "EPOLL" (Linux only)
   */
  public String ioMode();
  
  /**
   * Whether to prefer direct ByteBuffers for better performance.
   * 
   * @return true to prefer direct buffers
   */
  public boolean preferDirectBufs();
  
  /**
   * Connection timeout in milliseconds.
   * 
   * @return Timeout for establishing connections
   */
  public int connectionTimeoutMs();
  
  /**
   * Maximum number of connections per remote peer.
   * 
   * @return Number of connections to maintain per peer
   */
  public int numConnectionsPerPeer();
  
  /**
   * Server socket backlog size.
   * 
   * @return Number of pending connections to queue
   */
  public int backLog();
  
  /**
   * Number of server worker threads.
   * 
   * @return Thread count for server-side operations
   */
  public int serverThreads();
  
  /**
   * Number of client worker threads.
   * 
   * @return Thread count for client-side operations
   */
  public int clientThreads();
  
  /**
   * Socket receive buffer size in bytes.
   * 
   * @return Size of socket receive buffer
   */
  public int receiveBuf();
  
  /**
   * Socket send buffer size in bytes.
   * 
   * @return Size of socket send buffer
   */
  public int sendBuf();
  
  /**
   * SASL roundtrip timeout in milliseconds.
   * 
   * @return Timeout for SASL authentication
   */
  public int saslRTTimeoutMs();
  
  /**
   * Maximum number of I/O retry attempts.
   * 
   * @return Number of times to retry failed I/O operations
   */
  public int maxIORetries();
  
  /**
   * Wait time between I/O retry attempts in milliseconds.
   * 
   * @return Delay between retry attempts
   */
  public int ioRetryWaitTimeMs();
  
  /**
   * Threshold for memory mapping files instead of reading into heap.
   * 
   * @return Size threshold in bytes for memory mapping
   */
  public int memoryMapBytes();
  
  /**
   * Whether to use lazy file descriptor allocation.
   * 
   * @return true to defer file opening until needed
   */
  public boolean lazyFileDescriptor();
  
  /**
   * Maximum number of port binding retry attempts.
   * 
   * @return Number of times to retry port binding
   */
  public int portMaxRetries();
  
  /**
   * Maximum size of encrypted blocks when using SASL encryption.
   * 
   * @return Maximum encrypted block size in bytes
   */
  public int maxSaslEncryptedBlockSize();
  
  /**
   * Whether the server should always encrypt data (when SASL is enabled).
   * 
   * @return true to require encryption on server side
   */
  public boolean saslServerAlwaysEncrypt();
  
  /**
   * Gets the configuration module name.
   * 
   * @return The module name used for configuration keys
   */
  public String getModule();
  
  /**
   * Gets a raw configuration value.
   * 
   * @param name The configuration key name
   * @return The configuration value or null if not set
   */
  public String get(String name);
  
  /**
   * Gets a raw configuration value with default.
   * 
   * @param name The configuration key name
   * @param defaultValue Default value if key is not set
   * @return The configuration value or default
   */
  public String get(String name, String defaultValue);
}

Configuration Provider Framework

ConfigProvider

/**
 * Abstract base class for configuration providers.
 * Implementations load configuration from different sources (properties, maps, etc.).
 */
public abstract class ConfigProvider {
  /**
   * Gets a configuration value by name.
   * 
   * @param name The configuration key name
   * @return The configuration value or null if not found
   */
  public abstract String get(String name);
  
  /**
   * Gets a configuration value with a default.
   * 
   * @param name The configuration key name
   * @param defaultValue Default value if key is not found
   * @return The configuration value or default
   */
  public String get(String name, String defaultValue) {
    String value = get(name);
    return value != null ? value : defaultValue;
  }
  
  /**
   * Gets an integer configuration value with default.
   * 
   * @param name The configuration key name
   * @param defaultValue Default value if key is not found or invalid
   * @return The integer value or default
   */
  public int getInt(String name, int defaultValue) {
    String value = get(name);
    if (value != null) {
      try {
        return Integer.parseInt(value);
      } catch (NumberFormatException e) {
        // Fall through to default
      }
    }
    return defaultValue;
  }
  
  /**
   * Gets a long configuration value with default.
   * 
   * @param name The configuration key name
   * @param defaultValue Default value if key is not found or invalid
   * @return The long value or default
   */
  public long getLong(String name, long defaultValue) {
    String value = get(name);
    if (value != null) {
      try {
        return Long.parseLong(value);
      } catch (NumberFormatException e) {
        // Fall through to default
      }
    }
    return defaultValue;
  }
  
  /**
   * Gets a double configuration value with default.
   * 
   * @param name The configuration key name
   * @param defaultValue Default value if key is not found or invalid
   * @return The double value or default
   */
  public double getDouble(String name, double defaultValue) {
    String value = get(name);
    if (value != null) {
      try {
        return Double.parseDouble(value);
      } catch (NumberFormatException e) {
        // Fall through to default
      }
    }
    return defaultValue;
  }
  
  /**
   * Gets a boolean configuration value with default.
   * 
   * @param name The configuration key name
   * @param defaultValue Default value if key is not found or invalid
   * @return The boolean value or default
   */
  public boolean getBoolean(String name, boolean defaultValue) {
    String value = get(name);
    if (value != null) {
      return Boolean.parseBoolean(value);
    }
    return defaultValue;
  }
}

MapConfigProvider

/**
 * Configuration provider backed by a Map.
 * Useful for programmatic configuration or testing.
 */
public class MapConfigProvider extends ConfigProvider {
  /**
   * Creates a map-based configuration provider.
   * 
   * @param config Map containing configuration key-value pairs
   */
  public MapConfigProvider(Map<String, String> config);
  
  @Override
  public String get(String name) {
    return config.get(name);
  }
}

SystemPropertyConfigProvider

/**
 * Configuration provider that loads values from Java system properties.
 * Useful for configuration via command-line properties.
 */
public class SystemPropertyConfigProvider extends ConfigProvider {
  /**
   * Creates a system property configuration provider.
   */
  public SystemPropertyConfigProvider();
  
  @Override
  public String get(String name) {
    return System.getProperty(name);
  }
}

Utility Enumerations

ByteUnit

/**
 * Enumeration for byte size units with conversion utilities.
 * Provides convenient methods for converting between different byte units.
 */
public enum ByteUnit {
  BYTE(1),
  KiB(1024L),
  MiB(1024L * 1024L),
  GiB(1024L * 1024L * 1024L),
  TiB(1024L * 1024L * 1024L * 1024L),
  PiB(1024L * 1024L * 1024L * 1024L * 1024L);
  
  private final long multiplier;
  
  ByteUnit(long multiplier) {
    this.multiplier = multiplier;
  }
  
  /**
   * Converts a value from another unit to this unit.
   * 
   * @param d The value to convert
   * @param u The source unit
   * @return The converted value in this unit
   */
  public long convertFrom(long d, ByteUnit u) {
    return (d * u.multiplier) / this.multiplier;
  }
  
  /**
   * Converts a value from this unit to another unit.
   * 
   * @param d The value to convert
   * @param u The target unit
   * @return The converted value in the target unit
   */
  public long convertTo(long d, ByteUnit u) {
    return (d * this.multiplier) / u.multiplier;
  }
  
  /**
   * Converts a value in this unit to bytes.
   * 
   * @param d The value in this unit
   * @return The value in bytes as a double
   */
  public double toBytes(long d) {
    return d * multiplier;
  }
  
  /**
   * Converts a value in this unit to KiB.
   * 
   * @param d The value in this unit
   * @return The value in KiB
   */
  public long toKiB(long d) {
    return convertTo(d, KiB);
  }
  
  /**
   * Converts a value in this unit to MiB.
   * 
   * @param d The value in this unit
   * @return The value in MiB
   */
  public long toMiB(long d) {
    return convertTo(d, MiB);
  }
  
  /**
   * Converts a value in this unit to GiB.
   * 
   * @param d The value in this unit
   * @return The value in GiB
   */
  public long toGiB(long d) {
    return convertTo(d, GiB);
  }
  
  /**
   * Converts a value in this unit to TiB.
   * 
   * @param d The value in this unit
   * @return The value in TiB
   */
  public long toTiB(long d) {
    return convertTo(d, TiB);
  }
  
  /**
   * Converts a value in this unit to PiB.
   * 
   * @param d The value in this unit
   * @return The value in PiB
   */
  public long toPiB(long d) {
    return convertTo(d, PiB);
  }
}

IOMode

/**
 * Enumeration for I/O modes supported by the transport layer.
 * Different modes provide different performance characteristics.
 */
public enum IOMode {
  /** 
   * Standard Java NIO - works on all platforms but may have lower performance
   */
  NIO,
  
  /** 
   * Linux EPOLL - higher performance on Linux systems, requires native library
   */
  EPOLL
}

Java Utility Classes

JavaUtils

/**
 * General utility methods for Java operations used throughout the transport layer.
 */
public class JavaUtils {
  /**
   * Closes a Closeable resource quietly, ignoring any exceptions.
   * 
   * @param closeable The resource to close (can be null)
   */
  public static void closeQuietly(Closeable closeable) {
    if (closeable != null) {
      try {
        closeable.close();
      } catch (IOException e) {
        // Ignore exception
      }
    }
  }
  
  /**
   * Parses a time string (e.g., "30s", "5m", "2h") to seconds.
   * 
   * @param str The time string to parse
   * @return Time in seconds
   * @throws NumberFormatException if string format is invalid
   */
  public static long timeStringAsSec(String str) {
    return parseTimeString(str, TimeUnit.SECONDS);
  }
  
  /**
   * Parses a time string to the specified time unit.
   * 
   * @param str The time string to parse
   * @param unit The target time unit
   * @return Time in the specified unit
   */
  public static long timeStringAs(String str, TimeUnit unit) {
    return parseTimeString(str, unit);
  }
  
  /**
   * Parses a byte string (e.g., "100k", "64m", "1g") to bytes.
   * 
   * @param str The byte string to parse
   * @return Size in bytes
   * @throws NumberFormatException if string format is invalid
   */
  public static long byteStringAsBytes(String str) {
    return parseByteString(str);
  }
  
  /**
   * Formats bytes as a human-readable string.
   * 
   * @param size Size in bytes
   * @return Formatted string (e.g., "1.5 GB", "256 MB")
   */
  public static String bytesToString(long size) {
    return formatBytes(size);
  }
  
  /**
   * Gets the system property or environment variable with the given name.
   * Checks system properties first, then environment variables.
   * 
   * @param name The property/variable name
   * @param defaultValue Default value if not found
   * @return The property value or default
   */
  public static String getSystemProperty(String name, String defaultValue) {
    String value = System.getProperty(name);
    if (value == null) {
      value = System.getenv(name);
    }
    return value != null ? value : defaultValue;
  }
}

NettyUtils

/**
 * Utility methods for Netty integration and configuration.
 * Provides helpers for creating Netty components with proper settings.
 */
public class NettyUtils {
  /**
   * Gets the remote address from a Netty channel as a string.
   * 
   * @param channel The Netty channel
   * @return String representation of remote address
   */
  public static String getRemoteAddress(Channel channel) {
    if (channel != null && channel.remoteAddress() != null) {
      return channel.remoteAddress().toString();
    }
    return "unknown";
  }
  
  /**
   * Creates an EventLoopGroup for the specified I/O mode.
   * 
   * @param mode The I/O mode (NIO or EPOLL)
   * @param numThreads Number of threads in the event loop
   * @param threadPrefix Prefix for thread names
   * @return Configured EventLoopGroup
   */
  public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) {
    ThreadFactory threadFactory = createThreadFactory(threadPrefix);
    
    switch (mode) {
      case NIO:
        return new NioEventLoopGroup(numThreads, threadFactory);
      case EPOLL:
        return new EpollEventLoopGroup(numThreads, threadFactory);
      default:
        throw new IllegalArgumentException("Unknown I/O mode: " + mode);
    }
  }
  
  /**
   * Gets the server channel class for the specified I/O mode.
   * 
   * @param mode The I/O mode
   * @return ServerChannel class for the mode
   */
  public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
    switch (mode) {
      case NIO:
        return NioServerSocketChannel.class;
      case EPOLL:
        return EpollServerSocketChannel.class;
      default:
        throw new IllegalArgumentException("Unknown I/O mode: " + mode);
    }
  }
  
  /**
   * Gets the client channel class for the specified I/O mode.
   * 
   * @param mode The I/O mode
   * @return Channel class for the mode
   */
  public static Class<? extends Channel> getClientChannelClass(IOMode mode) {
    switch (mode) {
      case NIO:
        return NioSocketChannel.class;
      case EPOLL:
        return EpollSocketChannel.class;
      default:
        throw new IllegalArgumentException("Unknown I/O mode: " + mode);
    }
  }
  
  /**
   * Creates a pooled ByteBuf allocator with specified settings.
   * 
   * @param allowDirectBufs Whether to allow direct buffer allocation
   * @param allowCache Whether to enable buffer caching
   * @param numCores Number of CPU cores (affects pool sizing)
   * @return Configured PooledByteBufAllocator
   */
  public static PooledByteBufAllocator createPooledByteBufAllocator(
      boolean allowDirectBufs, boolean allowCache, int numCores) {
    
    int numDirectArenas = allowDirectBufs ? numCores : 0;
    int numHeapArenas = numCores;
    
    return new PooledByteBufAllocator(
      allowDirectBufs,        // preferDirect
      numHeapArenas,          // nHeapArena
      numDirectArenas,        // nDirectArena  
      8192,                   // pageSize
      11,                     // maxOrder
      64,                     // tinyCacheSize
      32,                     // smallCacheSize
      8,                      // normalCacheSize
      allowCache              // useCacheForAllThreads
    );
  }
  
  /**
   * Creates a frame decoder for the transport protocol.
   * 
   * @return Configured TransportFrameDecoder
   */
  public static TransportFrameDecoder createFrameDecoder() {
    return new TransportFrameDecoder();
  }
  
  /**
   * Configures common channel options for Spark transport.
   * 
   * @param bootstrap The bootstrap to configure
   * @param conf Transport configuration
   */
  public static void configureChannelOptions(Bootstrap bootstrap, TransportConf conf) {
    bootstrap.option(ChannelOption.ALLOCATOR, createPooledByteBufAllocator(
      conf.preferDirectBufs(), true, Runtime.getRuntime().availableProcessors()));
    bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs());
    bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
    bootstrap.option(ChannelOption.SO_REUSEADDR, true);
    bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf());
    bootstrap.option(ChannelOption.SO_SNDBUF, conf.sendBuf());
  }
  
  /**
   * Configures common channel options for server bootstrap.
   * 
   * @param bootstrap The server bootstrap to configure
   * @param conf Transport configuration
   */
  public static void configureServerChannelOptions(ServerBootstrap bootstrap, TransportConf conf) {
    bootstrap.option(ChannelOption.ALLOCATOR, createPooledByteBufAllocator(
      conf.preferDirectBufs(), true, Runtime.getRuntime().availableProcessors()));
    bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
    bootstrap.option(ChannelOption.SO_REUSEADDR, true);
    
    bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
    bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
    bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
  }
}

Specialized Utility Classes

TransportFrameDecoder

/**
 * Netty decoder for transport protocol frames.
 * Handles frame parsing and message boundary detection.
 */
public class TransportFrameDecoder extends ByteToMessageDecoder {
  /** Maximum frame size to prevent memory exhaustion */
  public static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;
  
  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    // Decodes length-prefixed frames from the transport protocol
    // Implementation handles partial frames and validates frame sizes
  }
  
  /**
   * Gets the maximum allowed frame size.
   * 
   * @return Maximum frame size in bytes
   */
  public int getMaxFrameSize() {
    return MAX_FRAME_SIZE;
  }
}

ByteArrayWritableChannel

/**
 * WritableByteChannel implementation backed by a byte array.
 * Useful for collecting data written to a channel into memory.
 */
public class ByteArrayWritableChannel implements WritableByteChannel {
  /**
   * Creates a byte array writable channel.
   * 
   * @param initialCapacity Initial capacity of the internal buffer
   */
  public ByteArrayWritableChannel(int initialCapacity);
  
  @Override
  public int write(ByteBuffer src) throws IOException {
    // Writes data from ByteBuffer to internal byte array
    int remaining = src.remaining();
    // Implementation copies data and grows array as needed
    return remaining;
  }
  
  @Override
  public boolean isOpen() {
    return open;
  }
  
  @Override
  public void close() throws IOException {
    open = false;
  }
  
  /**
   * Gets the current data as a byte array.
   * 
   * @return Copy of the accumulated data
   */
  public byte[] getData() {
    return Arrays.copyOf(buffer, position);
  }
  
  /**
   * Gets the number of bytes written.
   * 
   * @return Number of bytes in the buffer
   */
  public int size() {
    return position;
  }
  
  /**
   * Resets the channel to empty state.
   */
  public void reset() {
    position = 0;
  }
}

LimitedInputStream

/**
 * FilterInputStream that limits the number of bytes that can be read.
 * Useful for reading only a portion of a larger stream.
 */
public class LimitedInputStream extends FilterInputStream {
  /**
   * Creates a limited input stream.
   * 
   * @param in The underlying input stream
   * @param limit Maximum number of bytes to read
   */
  public LimitedInputStream(InputStream in, long limit);
  
  @Override
  public int read() throws IOException {
    if (remaining <= 0) {
      return -1; // EOF
    }
    
    int result = super.read();
    if (result != -1) {
      remaining--;
    }
    return result;
  }
  
  @Override
  public int read(byte[] b, int off, int len) throws IOException {
    if (remaining <= 0) {
      return -1; // EOF
    }
    
    len = (int) Math.min(len, remaining);
    int result = super.read(b, off, len);
    if (result > 0) {
      remaining -= result;
    }
    return result;
  }
  
  /**
   * Gets the number of bytes remaining to be read.
   * 
   * @return Remaining byte count
   */
  public long getRemaining() {
    return remaining;
  }
}

Usage Examples

Configuration Management

import org.apache.spark.network.util.*;
import java.util.HashMap;
import java.util.Map;

// Create configuration from multiple sources
public class ConfigurationExample {
  public static TransportConf createConfiguration() {
    // Start with system properties
    ConfigProvider systemConfig = new SystemPropertyConfigProvider();
    
    // Override with application-specific settings
    Map<String, String> appConfig = new HashMap<>();
    appConfig.put("spark.network.io.mode", "EPOLL");
    appConfig.put("spark.network.io.numConnectionsPerPeer", "3");
    appConfig.put("spark.network.io.serverThreads", "8");
    appConfig.put("spark.network.io.clientThreads", "8");
    appConfig.put("spark.network.io.preferDirectBufs", "true");
    appConfig.put("spark.network.io.connectionTimeout", "60s");
    
    // Create layered configuration
    ConfigProvider layeredConfig = new LayeredConfigProvider(
      new MapConfigProvider(appConfig),  // Higher priority
      systemConfig                       // Lower priority
    );
    
    return new TransportConf("spark.network", layeredConfig);
  }
  
  // Custom layered config provider
  private static class LayeredConfigProvider extends ConfigProvider {
    private final ConfigProvider[] providers;
    
    public LayeredConfigProvider(ConfigProvider... providers) {
      this.providers = providers;
    }
    
    @Override
    public String get(String name) {
      for (ConfigProvider provider : providers) {
        String value = provider.get(name);
        if (value != null) {
          return value;
        }
      }
      return null;
    }
  }
}

Byte Unit Conversions

import org.apache.spark.network.util.ByteUnit;

public class ByteUnitExample {
  public static void demonstrateConversions() {
    // Convert different units
    long sizeInBytes = ByteUnit.GiB.toBytes(2); // 2 GB in bytes
    long sizeInMB = ByteUnit.BYTE.toMiB(1024 * 1024 * 1024); // 1 GB in MB
    
    // Convert between units
    long kbToMb = ByteUnit.KiB.convertTo(1024, ByteUnit.MiB); // 1024 KB to MB
    long mbToGb = ByteUnit.MiB.convertTo(2048, ByteUnit.GiB); // 2048 MB to GB
    
    System.out.println("2 GiB = " + sizeInBytes + " bytes");
    System.out.println("1 GiB = " + sizeInMB + " MiB");
    System.out.println("1024 KiB = " + kbToMb + " MiB");
    System.out.println("2048 MiB = " + mbToGb + " GiB");
  }
  
  public static long parseConfigSize(String sizeStr) {
    // Parse configuration size strings
    if (sizeStr.endsWith("k") || sizeStr.endsWith("K")) {
      long value = Long.parseLong(sizeStr.substring(0, sizeStr.length() - 1));
      return ByteUnit.KiB.toBytes(value);
    } else if (sizeStr.endsWith("m") || sizeStr.endsWith("M")) {
      long value = Long.parseLong(sizeStr.substring(0, sizeStr.length() - 1));
      return ByteUnit.MiB.toBytes(value);
    } else if (sizeStr.endsWith("g") || sizeStr.endsWith("G")) {
      long value = Long.parseLong(sizeStr.substring(0, sizeStr.length() - 1));
      return ByteUnit.GiB.toBytes(value);
    } else {
      return Long.parseLong(sizeStr); // Assume bytes
    }
  }
}

Netty Integration

import org.apache.spark.network.util.NettyUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;

public class NettySetupExample {
  public static void setupNettyClient(TransportConf conf) {
    IOMode ioMode = IOMode.valueOf(conf.ioMode());
    
    // Create event loop
    EventLoopGroup workerGroup = NettyUtils.createEventLoop(
      ioMode, 
      conf.clientThreads(), 
      "spark-client"
    );
    
    // Create bootstrap
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(workerGroup)
             .channel(NettyUtils.getClientChannelClass(ioMode));
    
    // Configure options
    NettyUtils.configureChannelOptions(bootstrap, conf);
    
    // Set up pipeline
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline();
        
        // Add frame decoder
        pipeline.addLast("frameDecoder", NettyUtils.createFrameDecoder());
        
        // Add other handlers...
      }
    });
  }
  
  public static void setupNettyServer(TransportConf conf) {
    IOMode ioMode = IOMode.valueOf(conf.ioMode());
    
    // Create event loops
    EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1, "spark-server-boss");
    EventLoopGroup workerGroup = NettyUtils.createEventLoop(
      ioMode, 
      conf.serverThreads(), 
      "spark-server-worker"
    );
    
    // Create server bootstrap
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workerGroup)
             .channel(NettyUtils.getServerChannelClass(ioMode));
    
    // Configure options
    NettyUtils.configureServerChannelOptions(bootstrap, conf);
    
    // Set up child handler
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) {
        // Set up server pipeline
      }
    });
  }
}

Utility Operations

import org.apache.spark.network.util.JavaUtils;

public class UtilityExample {
  public void demonstrateUtils() {
    // Parse time strings
    long timeoutSec = JavaUtils.timeStringAsSec("30s");  // 30 seconds
    long timeoutMs = JavaUtils.timeStringAs("5m", TimeUnit.MILLISECONDS); // 5 minutes in ms
    
    // Parse byte strings
    long bufferSize = JavaUtils.byteStringAsBytes("64m"); // 64 MB in bytes
    
    // Format bytes
    String formatted = JavaUtils.bytesToString(1024 * 1024 * 1024); // "1 GB"
    
    // Safe resource cleanup
    FileInputStream fis = null;
    try {
      fis = new FileInputStream("data.txt");
      // Use stream...
    } finally {
      JavaUtils.closeQuietly(fis); // Won't throw exception
    }
    
    // Get configuration from system property or environment
    String dataDir = JavaUtils.getSystemProperty("SPARK_DATA_DIR", "/tmp/spark");
  }
  
  public void configurationValidation(TransportConf conf) {
    // Validate configuration values
    if (conf.connectionTimeoutMs() <= 0) {
      throw new IllegalArgumentException("Connection timeout must be positive");
    }
    
    if (conf.numConnectionsPerPeer() < 1) {
      throw new IllegalArgumentException("Must have at least 1 connection per peer");
    }
    
    // Check I/O mode availability
    IOMode ioMode = IOMode.valueOf(conf.ioMode());
    if (ioMode == IOMode.EPOLL && !isLinux()) {
      System.err.println("EPOLL mode only available on Linux, falling back to NIO");
      // Override configuration...
    }
  }
  
  private boolean isLinux() {
    return System.getProperty("os.name").toLowerCase().contains("linux");
  }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-10

docs

buffer-management.md

client-operations.md

configuration-utilities.md

index.md

message-protocol.md

sasl-authentication.md

server-operations.md

transport-setup.md

tile.json