CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--yarn-parent-2-10

YARN integration support for Apache Spark cluster computing, enabling Spark applications to run on Hadoop YARN clusters

Pending
Overview
Eval results
Files

utilities.mddocs/

Utilities and Configuration

Utility classes and configuration management for YARN-specific operations, distributed cache management, and executor container handling. These components provide essential support functionality for YARN integration.

Capabilities

YarnSparkHadoopUtil

YARN-specific Hadoop utilities extending Spark's core Hadoop integration with YARN-specific functionality.

/**
 * YARN-specific Hadoop utilities
 * Extends SparkHadoopUtil with YARN-specific operations and configurations
 */
class YarnSparkHadoopUtil extends SparkHadoopUtil {
  // YARN-specific security token handling
  // Hadoop configuration management for YARN
  // YARN service discovery and integration
  // Kerberos authentication support for YARN clusters
}

/**
 * Companion object with YARN-specific constants and utility methods
 */
object YarnSparkHadoopUtil {
  /** Memory overhead factor for containers (7% of container memory) */
  val MEMORY_OVERHEAD_FACTOR = 0.07
  
  /** Minimum memory overhead for containers in MB */
  val MEMORY_OVERHEAD_MIN = 384
  
  /** Wildcard host for resource requests */
  val ANY_HOST = "*"
  
  /** Default number of executors when not specified */
  val DEFAULT_NUMBER_EXECUTORS = 2
  
  /** Resource manager request priority */
  val RM_REQUEST_PRIORITY = 1
  
  /** Get YarnSparkHadoopUtil singleton instance */
  def get: YarnSparkHadoopUtil
}

Usage Examples:

import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil

// Access YARN-specific Hadoop utilities
val yarnUtil = YarnSparkHadoopUtil.get

// Use for YARN-specific operations like:
// - Security token management
// - YARN service discovery  
// - Hadoop configuration handling specific to YARN
// - Kerberos integration for secure clusters

ClientDistributedCacheManager

Manages distributed cache functionality for YARN applications, handling file and archive distribution to executor nodes.

/**
 * Manages distributed cache for YARN applications
 * Handles distribution of files, JARs, and archives to executor containers
 */
private[spark] class ClientDistributedCacheManager {
  // Distributed cache file management
  // Archive and JAR distribution coordination
  // Local resource preparation for container launch
  // Cache cleanup and resource lifecycle management
}

Usage Examples:

import org.apache.spark.deploy.yarn.ClientDistributedCacheManager

// Created internally by Client for managing distributed resources
val cacheManager = new ClientDistributedCacheManager()

// Handles:
// - Files specified via --files argument
// - Archives specified via --archives argument  
// - Additional JARs from --addJars argument
// - Proper cleanup when application completes

ExecutorRunnableUtil

Utility trait providing common functionality for executor container management across different YARN API versions.

/**
 * Utility trait for executor container management
 * Provides common executor launch and management functionality
 */
trait ExecutorRunnableUtil {
  // Common executor container launch logic
  // Environment variable setup for executors
  // Classpath configuration and JAR distribution
  // Resource allocation and container configuration
}

Usage Examples:

import org.apache.spark.deploy.yarn.ExecutorRunnableUtil

// Mixed into ExecutorRunnable implementations
class ExecutorRunnable extends ExecutorRunnableUtil {
  // Inherits common executor management functionality
  // Used for launching executor containers on YARN nodes
  // Handles environment setup and resource configuration
}

ExecutorRunnable

Version-specific executor container implementation classes for managing executor containers on YARN.

/**
 * Version-specific executor container implementation
 * Available in both alpha (deprecated) and stable modules
 * Manages executor containers on YARN NodeManager
 */
class ExecutorRunnable extends ExecutorRunnableUtil {
  // Executor container launch and lifecycle management
  // Integration with specific YARN API versions
  // Resource allocation and environment setup
  // Container monitoring and cleanup
}

Usage Examples:

import org.apache.spark.deploy.yarn.ExecutorRunnable

// Created by YarnAllocator for each executor container
val executorRunnable = new ExecutorRunnable(
  container = yarnContainer,
  conf = sparkConf,
  sparkJar = distributedSparkJar
)

// Handles complete executor container lifecycle:
// - Container launch on NodeManager
// - Executor JVM startup and configuration
// - Resource monitoring and cleanup

Distributed Cache Management

File Distribution

// Distributed cache handles various file types
class ClientDistributedCacheManager {
  // Regular files (--files argument)
  private def distributeFiles(files: String): Map[String, LocalResource] = {
    // Upload files to HDFS staging directory
    // Create LocalResource entries for YARN
    // Configure file permissions and visibility
    // Return resource map for container launch context
  }
  
  // Archive files (--archives argument)  
  private def distributeArchives(archives: String): Map[String, LocalResource] = {
    // Handle tar.gz, zip, and other archive formats
    // Configure automatic extraction in containers
    // Set up proper working directory structure
  }
  
  // JAR files (--addJars argument)
  private def distributeJars(jars: String): Map[String, LocalResource] = {
    // Add JARs to executor classpath
    // Handle both local and HDFS JAR locations
    // Optimize JAR distribution across cluster nodes
  }
}

Resource Lifecycle

// Complete resource lifecycle management
class ClientDistributedCacheManager {
  def setupDistributedCache(): Map[String, LocalResource] = {
    // 1. Analyze resource requirements from arguments
    // 2. Upload local resources to HDFS staging area
    // 3. Create YARN LocalResource descriptors
    // 4. Return resource map for container launch
  }
  
  def cleanupStagingDir(): Unit = {
    // Clean up HDFS staging directory after application completion
    // Remove temporary files and directories
    // Handle cleanup failures gracefully
  }
}

Executor Container Management

Container Launch Context

// ExecutorRunnable creates complete launch context
class ExecutorRunnable {
  private def createContainerLaunchContext(): ContainerLaunchContext = {
    // 1. Set up executor command line
    val executorCommand = buildExecutorCommand()
    
    // 2. Configure environment variables
    val environment = setupExecutorEnvironment()
    
    // 3. Set up local resources (JARs, files, archives)
    val localResources = setupLocalResources()
    
    // 4. Configure security tokens
    val tokens = setupSecurityTokens()
    
    // 5. Build complete launch context
    ContainerLaunchContext.newInstance(
      localResources, environment, executorCommand, tokens
    )
  }
}

Executor Environment Setup

// Environment configuration for executor containers
trait ExecutorRunnableUtil {
  protected def setupExecutorEnvironment(): Map[String, String] = {
    Map(
      "SPARK_HOME" -> sparkHome,
      "CLASSPATH" -> buildExecutorClasspath(),
      "JAVA_HOME" -> javaHome,
      "PYTHONPATH" -> pythonPath,
      "HADOOP_CONF_DIR" -> hadoopConfDir,
      // YARN-specific environment variables
      "CONTAINER_ID" -> containerId,
      "APPLICATION_WEB_PROXY_BASE" -> webProxyBase
    )
  }
  
  protected def buildExecutorClasspath(): String = {
    // Spark JARs and dependencies
    // User application JARs (--addJars)
    // Hadoop and YARN libraries
    // Custom classpaths from configuration
  }
}

Configuration Utilities

YARN Configuration Integration

// YarnSparkHadoopUtil handles configuration integration
class YarnSparkHadoopUtil {
  def getYarnConfiguration(sparkConf: SparkConf): Configuration = {
    // Load base Hadoop configuration
    val hadoopConf = new Configuration()
    
    // Apply Spark-specific YARN overrides
    applySparkYarnConfiguration(hadoopConf, sparkConf)
    
    // Handle security configuration for Kerberos clusters
    setupSecurityConfiguration(hadoopConf)
    
    hadoopConf
  }
  
  private def applySparkYarnConfiguration(
    hadoopConf: Configuration, 
    sparkConf: SparkConf
  ): Unit = {
    // Map Spark configuration to Hadoop configuration
    // Set YARN-specific properties
    // Configure resource manager addresses
    // Set up queue and priority configurations
  }
}

Security Token Management

// Security integration for Kerberos-enabled clusters
object YarnSparkHadoopUtil {
  def obtainTokensForNamenodes(
    paths: Set[Path], 
    conf: Configuration
  ): Map[String, Token[_]] = {
    // Obtain delegation tokens for HDFS access
    // Handle multiple namenode configurations
    // Support for secure cluster authentication
  }
  
  def obtainTokensForHBase(conf: Configuration): Map[String, Token[_]] = {
    // Obtain tokens for HBase integration
    // Support for secure HBase clusters
  }
  
  def obtainTokensForHive(conf: Configuration): Map[String, Token[_]] = {
    // Obtain tokens for Hive metastore access
    // Support for secure Hive integration
  }
}

Performance Optimizations

Resource Caching

// Efficient resource distribution and caching
class ClientDistributedCacheManager {
  private def optimizeResourceDistribution(): Unit = {
    // Cache frequently used files in HDFS
    // Use symbolic links for shared resources
    // Minimize network transfer overhead
    // Leverage YARN's distributed cache capabilities
  }
}

Container Launch Optimization

// Optimized executor container launch
class ExecutorRunnable {
  private def launchContainerAsync(): Future[Unit] = {
    // Parallel container launch to reduce startup time
    // Pre-warm executor JVMs when possible
    // Optimize classpath and resource loading
    // Monitor launch success and retry on failures
  }
}

Error Handling and Diagnostics

Container Launch Failures

// Robust error handling for container operations
class ExecutorRunnable {
  private def handleLaunchFailure(exception: Exception): Unit = {
    // Log detailed failure information
    // Classify failure types (resource, network, configuration)
    // Implement retry logic for transient failures
    // Report failures to ApplicationMaster for resource reallocation
  }
}

Distributed Cache Failures

// Error handling for resource distribution
class ClientDistributedCacheManager {
  private def handleDistributionFailure(resource: String, error: Throwable): Unit = {
    // Log resource distribution failures
    // Attempt alternative distribution strategies
    // Fail fast for critical resources
    // Provide detailed error messages for troubleshooting
  }
}

Integration Points

Spark Core Integration

The utilities integrate deeply with Spark core components:

  • SparkContext: Configuration and environment setup
  • SparkConf: YARN-specific configuration properties
  • SecurityManager: Token management and authentication
  • SparkHadoopUtil: Base Hadoop utility functionality

YARN Service Integration

Integration with YARN cluster services:

  • ResourceManager: Application submission and monitoring
  • NodeManager: Container launch and management
  • Timeline Service: Application history and metrics
  • Web Application Proxy: Secure application web UI access

Hadoop Ecosystem Integration

Support for broader Hadoop ecosystem:

  • HDFS: Distributed file system access and security
  • HBase: NoSQL database integration with token support
  • Hive: Data warehouse integration and metastore access
  • Security: Kerberos authentication and delegation tokens

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--yarn-parent-2-10

docs

application-master.md

index.md

resource-management.md

scheduler-backends.md

utilities.md

yarn-client.md

tile.json