YARN integration support for Apache Spark cluster computing, enabling Spark applications to run on Hadoop YARN clusters
—
Utility classes and configuration management for YARN-specific operations, distributed cache management, and executor container handling. These components provide essential support functionality for YARN integration.
YARN-specific Hadoop utilities extending Spark's core Hadoop integration with YARN-specific functionality.
/**
* YARN-specific Hadoop utilities
* Extends SparkHadoopUtil with YARN-specific operations and configurations
*/
class YarnSparkHadoopUtil extends SparkHadoopUtil {
// YARN-specific security token handling
// Hadoop configuration management for YARN
// YARN service discovery and integration
// Kerberos authentication support for YARN clusters
}
/**
* Companion object with YARN-specific constants and utility methods
*/
object YarnSparkHadoopUtil {
/** Memory overhead factor for containers (7% of container memory) */
val MEMORY_OVERHEAD_FACTOR = 0.07
/** Minimum memory overhead for containers in MB */
val MEMORY_OVERHEAD_MIN = 384
/** Wildcard host for resource requests */
val ANY_HOST = "*"
/** Default number of executors when not specified */
val DEFAULT_NUMBER_EXECUTORS = 2
/** Resource manager request priority */
val RM_REQUEST_PRIORITY = 1
/** Get YarnSparkHadoopUtil singleton instance */
def get: YarnSparkHadoopUtil
}Usage Examples:
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
// Access YARN-specific Hadoop utilities
val yarnUtil = YarnSparkHadoopUtil.get
// Use for YARN-specific operations like:
// - Security token management
// - YARN service discovery
// - Hadoop configuration handling specific to YARN
// - Kerberos integration for secure clustersManages distributed cache functionality for YARN applications, handling file and archive distribution to executor nodes.
/**
* Manages distributed cache for YARN applications
* Handles distribution of files, JARs, and archives to executor containers
*/
private[spark] class ClientDistributedCacheManager {
// Distributed cache file management
// Archive and JAR distribution coordination
// Local resource preparation for container launch
// Cache cleanup and resource lifecycle management
}Usage Examples:
import org.apache.spark.deploy.yarn.ClientDistributedCacheManager
// Created internally by Client for managing distributed resources
val cacheManager = new ClientDistributedCacheManager()
// Handles:
// - Files specified via --files argument
// - Archives specified via --archives argument
// - Additional JARs from --addJars argument
// - Proper cleanup when application completesUtility trait providing common functionality for executor container management across different YARN API versions.
/**
* Utility trait for executor container management
* Provides common executor launch and management functionality
*/
trait ExecutorRunnableUtil {
// Common executor container launch logic
// Environment variable setup for executors
// Classpath configuration and JAR distribution
// Resource allocation and container configuration
}Usage Examples:
import org.apache.spark.deploy.yarn.ExecutorRunnableUtil
// Mixed into ExecutorRunnable implementations
class ExecutorRunnable extends ExecutorRunnableUtil {
// Inherits common executor management functionality
// Used for launching executor containers on YARN nodes
// Handles environment setup and resource configuration
}Version-specific executor container implementation classes for managing executor containers on YARN.
/**
* Version-specific executor container implementation
* Available in both alpha (deprecated) and stable modules
* Manages executor containers on YARN NodeManager
*/
class ExecutorRunnable extends ExecutorRunnableUtil {
// Executor container launch and lifecycle management
// Integration with specific YARN API versions
// Resource allocation and environment setup
// Container monitoring and cleanup
}Usage Examples:
import org.apache.spark.deploy.yarn.ExecutorRunnable
// Created by YarnAllocator for each executor container
val executorRunnable = new ExecutorRunnable(
container = yarnContainer,
conf = sparkConf,
sparkJar = distributedSparkJar
)
// Handles complete executor container lifecycle:
// - Container launch on NodeManager
// - Executor JVM startup and configuration
// - Resource monitoring and cleanup// Distributed cache handles various file types
class ClientDistributedCacheManager {
// Regular files (--files argument)
private def distributeFiles(files: String): Map[String, LocalResource] = {
// Upload files to HDFS staging directory
// Create LocalResource entries for YARN
// Configure file permissions and visibility
// Return resource map for container launch context
}
// Archive files (--archives argument)
private def distributeArchives(archives: String): Map[String, LocalResource] = {
// Handle tar.gz, zip, and other archive formats
// Configure automatic extraction in containers
// Set up proper working directory structure
}
// JAR files (--addJars argument)
private def distributeJars(jars: String): Map[String, LocalResource] = {
// Add JARs to executor classpath
// Handle both local and HDFS JAR locations
// Optimize JAR distribution across cluster nodes
}
}// Complete resource lifecycle management
class ClientDistributedCacheManager {
def setupDistributedCache(): Map[String, LocalResource] = {
// 1. Analyze resource requirements from arguments
// 2. Upload local resources to HDFS staging area
// 3. Create YARN LocalResource descriptors
// 4. Return resource map for container launch
}
def cleanupStagingDir(): Unit = {
// Clean up HDFS staging directory after application completion
// Remove temporary files and directories
// Handle cleanup failures gracefully
}
}// ExecutorRunnable creates complete launch context
class ExecutorRunnable {
private def createContainerLaunchContext(): ContainerLaunchContext = {
// 1. Set up executor command line
val executorCommand = buildExecutorCommand()
// 2. Configure environment variables
val environment = setupExecutorEnvironment()
// 3. Set up local resources (JARs, files, archives)
val localResources = setupLocalResources()
// 4. Configure security tokens
val tokens = setupSecurityTokens()
// 5. Build complete launch context
ContainerLaunchContext.newInstance(
localResources, environment, executorCommand, tokens
)
}
}// Environment configuration for executor containers
trait ExecutorRunnableUtil {
protected def setupExecutorEnvironment(): Map[String, String] = {
Map(
"SPARK_HOME" -> sparkHome,
"CLASSPATH" -> buildExecutorClasspath(),
"JAVA_HOME" -> javaHome,
"PYTHONPATH" -> pythonPath,
"HADOOP_CONF_DIR" -> hadoopConfDir,
// YARN-specific environment variables
"CONTAINER_ID" -> containerId,
"APPLICATION_WEB_PROXY_BASE" -> webProxyBase
)
}
protected def buildExecutorClasspath(): String = {
// Spark JARs and dependencies
// User application JARs (--addJars)
// Hadoop and YARN libraries
// Custom classpaths from configuration
}
}// YarnSparkHadoopUtil handles configuration integration
class YarnSparkHadoopUtil {
def getYarnConfiguration(sparkConf: SparkConf): Configuration = {
// Load base Hadoop configuration
val hadoopConf = new Configuration()
// Apply Spark-specific YARN overrides
applySparkYarnConfiguration(hadoopConf, sparkConf)
// Handle security configuration for Kerberos clusters
setupSecurityConfiguration(hadoopConf)
hadoopConf
}
private def applySparkYarnConfiguration(
hadoopConf: Configuration,
sparkConf: SparkConf
): Unit = {
// Map Spark configuration to Hadoop configuration
// Set YARN-specific properties
// Configure resource manager addresses
// Set up queue and priority configurations
}
}// Security integration for Kerberos-enabled clusters
object YarnSparkHadoopUtil {
def obtainTokensForNamenodes(
paths: Set[Path],
conf: Configuration
): Map[String, Token[_]] = {
// Obtain delegation tokens for HDFS access
// Handle multiple namenode configurations
// Support for secure cluster authentication
}
def obtainTokensForHBase(conf: Configuration): Map[String, Token[_]] = {
// Obtain tokens for HBase integration
// Support for secure HBase clusters
}
def obtainTokensForHive(conf: Configuration): Map[String, Token[_]] = {
// Obtain tokens for Hive metastore access
// Support for secure Hive integration
}
}// Efficient resource distribution and caching
class ClientDistributedCacheManager {
private def optimizeResourceDistribution(): Unit = {
// Cache frequently used files in HDFS
// Use symbolic links for shared resources
// Minimize network transfer overhead
// Leverage YARN's distributed cache capabilities
}
}// Optimized executor container launch
class ExecutorRunnable {
private def launchContainerAsync(): Future[Unit] = {
// Parallel container launch to reduce startup time
// Pre-warm executor JVMs when possible
// Optimize classpath and resource loading
// Monitor launch success and retry on failures
}
}// Robust error handling for container operations
class ExecutorRunnable {
private def handleLaunchFailure(exception: Exception): Unit = {
// Log detailed failure information
// Classify failure types (resource, network, configuration)
// Implement retry logic for transient failures
// Report failures to ApplicationMaster for resource reallocation
}
}// Error handling for resource distribution
class ClientDistributedCacheManager {
private def handleDistributionFailure(resource: String, error: Throwable): Unit = {
// Log resource distribution failures
// Attempt alternative distribution strategies
// Fail fast for critical resources
// Provide detailed error messages for troubleshooting
}
}The utilities integrate deeply with Spark core components:
Integration with YARN cluster services:
Support for broader Hadoop ecosystem:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--yarn-parent-2-10