tessl install tessl/maven-org-apache-spark--spark-yarn_2-11@1.6.0Apache Spark YARN integration module that enables Spark applications to run on YARN clusters, providing cluster manager functionality for distributed Spark computing workloads
Apache Spark YARN integration module that enables Spark applications to run on YARN (Yet Another Resource Negotiator) clusters, providing cluster manager functionality for distributed Spark computing workloads. The module includes scheduler backends, resource allocation and management components, client interfaces for submitting applications to YARN, container placement strategies, and delegation token management for secure authentication in enterprise environments.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.11</artifactId>
<version>1.6.3</version>
</dependency>import org.apache.spark.deploy.yarn.Client
import org.apache.spark.deploy.yarn.ApplicationMaster
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend
import org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackendimport org.apache.spark.SparkConf
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.hadoop.conf.Configuration
// Configure SparkConf for YARN deployment
val sparkConf = new SparkConf()
.setAppName("MySparkApp")
.set("spark.yarn.am.memory", "512m")
.set("spark.yarn.am.cores", "1")
.set("spark.executor.memory", "1g")
.set("spark.executor.cores", "2")
// Set up client arguments for YARN submission
val args = Array(
"--jar", "/path/to/my-app.jar",
"--class", "com.example.MyMainClass",
"--num-executors", "2",
"--executor-memory", "1g",
"--executor-cores", "2"
)
// Create and run YARN client
val clientArgs = new ClientArguments(args, sparkConf)
val hadoopConf = new Configuration()
val client = new Client(clientArgs, hadoopConf, sparkConf)
// Submit application to YARN
val applicationId = client.submitApplication()
client.run()Apache Spark YARN module is built around several key components:
Client class handles application submission, monitoring, and communication with YARN ResourceManagerApplicationMaster manages the Spark application lifecycle within YARN containersYarnClientSchedulerBackend, YarnClusterSchedulerBackend) for client and cluster deployment modesYarnAllocator handles dynamic container allocation and executor managementCore functionality for submitting Spark applications to YARN clusters, including client-side submission, application monitoring, and resource management.
private[spark] class Client(
args: ClientArguments,
hadoopConf: Configuration,
sparkConf: SparkConf
) extends Logging {
def this(clientArgs: ClientArguments, spConf: SparkConf)
def submitApplication(): ApplicationId
def run(): Unit
def monitorApplication(
appId: ApplicationId,
returnOnRunning: Boolean = false,
logApplicationReport: Boolean = true
): (YarnApplicationState, FinalApplicationStatus)
def stop(): Unit
def getApplicationReport(appId: ApplicationId): ApplicationReport
}ApplicationMaster implementation that manages Spark application execution within YARN, handling resource allocation, executor management, and communication with ResourceManager.
class ApplicationMaster(
args: ApplicationMasterArguments,
client: YarnRMClient
) {
def run(): Int
def finish(status: FinalApplicationStatus, code: Int, msg: String): Unit
def unregister(status: FinalApplicationStatus, diagnostics: String): Unit
}
object ApplicationMaster {
def main(args: Array[String]): Unit
def sparkContextInitialized(sc: SparkContext): Unit
def sparkContextStopped(sc: SparkContext): Boolean
}YARN-specific scheduler backends that integrate Spark's task scheduling with YARN's resource management for both client and cluster deployment modes.
class YarnClientSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext
) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
def start(): Unit
def stop(): Unit
def applicationId(): String
}
class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext
) extends YarnSchedulerBackend(scheduler, sc) {
def start(): Unit
def applicationId(): String
def applicationAttemptId(): Option[String]
def getDriverLogUrls: Option[Map[String, String]]
}Dynamic resource allocation system that manages executor containers, handles container placement strategies, and provides optimal resource utilization on YARN clusters.
private[yarn] class YarnAllocator(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: Configuration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
securityMgr: SecurityManager
) extends Logging {
def allocateResources(): Unit
def killExecutor(executorId: String): Unit
def updateResourceRequests(): Unit
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit
def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit
def getNumExecutorsRunning: Int
def getNumExecutorsFailed: Int
}Comprehensive security system for YARN deployments including Kerberos authentication, delegation token management, and secure communication with Hadoop services.
class YarnSparkHadoopUtil extends SparkHadoopUtil {
def obtainTokensForNamenodes(
paths: Set[Path],
conf: Configuration,
creds: Credentials
): Unit
def obtainTokenForHiveMetastore(conf: Configuration): Option[Credentials]
def startExecutorDelegationTokenRenewer(sparkConf: SparkConf): Unit
def stopExecutorDelegationTokenRenewer(): Unit
def getContainerId: ContainerId
}
class ExecutorDelegationTokenUpdater(
sparkConf: SparkConf,
hadoopConf: Configuration
) {
def updateCredentialsIfRequired(): Unit
def stop(): Unit
}YARN-specific configuration properties and utility functions for environment setup, command building, and integration with Hadoop ecosystem components.
object YarnSparkHadoopUtil {
val MEMORY_OVERHEAD_FACTOR: Double
val MEMORY_OVERHEAD_MIN: Int
val ANY_HOST: String
val DEFAULT_NUMBER_EXECUTORS: Int
def get: YarnSparkHadoopUtil
def addPathToEnvironment(
env: HashMap[String, String],
key: String,
value: String,
classPathSeparator: String
): Unit
def getInitialTargetExecutorNumber(
conf: SparkConf,
numExecutors: Int
): Int
}Key YARN-specific Spark configuration properties:
spark.yarn.queue: YARN queue name to submit the application tospark.yarn.am.memory: Amount of memory to use for the ApplicationMasterspark.yarn.am.cores: Number of cores to use for the ApplicationMasterspark.yarn.executor.memoryOverhead: Amount of non-heap memory per executorspark.yarn.maxAppAttempts: Maximum number of application attemptsspark.yarn.submit.waitAppCompletion: Whether to wait for completionspark.yarn.principal: Kerberos principal for secure clustersspark.yarn.keytab: Kerberos keytab file pathspark.yarn.tags: Comma-separated list of YARN application tagsFor complete configuration reference, see Configuration and Utilities.
Main entry points for interacting with YARN functionality: