Apache Spark YARN resource manager integration component that enables Spark applications to run on Hadoop YARN clusters
npx @tessl/cli install tessl/maven-org-apache-spark--spark-yarn_2-11@2.4.0Apache Spark YARN resource manager integration component that enables Spark applications to run on Hadoop YARN clusters. This module provides cluster managers, schedulers, and backends specifically designed for YARN environments, enabling seamless integration between Spark's distributed computing capabilities and YARN's resource management.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.11</artifactId>
<version>2.4.8</version>
</dependency>import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAllocator}
import org.apache.spark.scheduler.cluster.{YarnClusterManager, SchedulerExtensionService, SchedulerExtensionServiceBinding}
import org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationAttemptId, Container, ContainerRequest}
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.spark.util.ClockFor Java shuffle service integration:
import org.apache.spark.network.yarn.YarnShuffleService;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;import org.apache.spark.{SparkConf, SparkContext}
// Configure Spark for YARN client mode
val conf = new SparkConf()
.setAppName("MyApp")
.setMaster("yarn")
.set("spark.submit.deployMode", "client")
.set("spark.yarn.queue", "default")
val sc = new SparkContext(conf)
// Use SparkContext normally
sc.stop()import org.apache.spark.{SparkConf, SparkContext}
// Configure Spark for YARN cluster mode
val conf = new SparkConf()
.setAppName("MyApp")
.setMaster("yarn")
.set("spark.submit.deployMode", "cluster")
.set("spark.yarn.queue", "production")
val sc = new SparkContext(conf)
// Use SparkContext normally
sc.stop()import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.SparkConf
val sparkConf = new SparkConf()
.setAppName("MyYarnApp")
.set("spark.yarn.queue", "default")
val args = Array(
"--jar", "/path/to/my-app.jar",
"--class", "com.example.MyMainClass"
)
val clientArgs = new ClientArguments(args)
val client = new Client(clientArgs, sparkConf)
val applicationId = client.submitApplication()
println(s"Application submitted with ID: $applicationId")The YARN integration follows Spark's pluggable cluster manager architecture:
YarnClusterManager registered as service providerYarnClientSchedulerBackend) and cluster (YarnClusterSchedulerBackend) modesYarnAllocator manages container allocation and executor lifecycleCore cluster manager integration that enables Spark to run on YARN clusters through the external cluster manager SPI.
class YarnClusterManager extends ExternalClusterManager {
def canCreate(masterURL: String): Boolean
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
}Client API for submitting and managing YARN applications programmatically, supporting both client and cluster deployment modes.
class Client(args: ClientArguments, sparkConf: SparkConf) {
def submitApplication(): ApplicationId
def run(): Unit
def stop(): Unit
def monitorApplication(appId: ApplicationId, returnOnRunning: Boolean, logApplicationReport: Boolean): YarnAppReport
def getApplicationReport(appId: ApplicationId): ApplicationReport
}Container allocation and executor lifecycle management within YARN resource constraints and scheduling policies.
class YarnAllocator(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource],
resolver: SparkRackResolver,
clock: Clock = new SystemClock()
) {
def getNumExecutorsRunning: Int
def getNumExecutorsFailed: Int
def numContainersPendingAllocate: Int
def allocateResources(): Unit
def killExecutor(executorId: String): Unit
def stop(): Unit
}External shuffle service that runs on YARN NodeManagers to provide shuffle data management for Spark applications, improving executor stability and enabling dynamic allocation.
public class YarnShuffleService extends AuxiliaryService {
protected void serviceInit(Configuration conf) throws Exception;
protected void serviceStart() throws Exception;
protected void serviceStop() throws Exception;
public void initializeApplication(ApplicationInitializationContext context) throws Exception;
public void stopApplication(ApplicationTerminationContext context) throws Exception;
}Security credential management and delegation token handling for secure YARN clusters with Kerberos authentication.
trait ServiceCredentialProvider {
def serviceName: String
def credentialsRequired(hadoopConf: Configuration): Boolean
def obtainCredentials(hadoopConf: Configuration, sparkConf: SparkConf, creds: Credentials): Option[Long]
}Pluggable extension system for custom scheduler services and functionality in YARN deployments.
trait SchedulerExtensionService {
def start(binding: SchedulerExtensionServiceBinding): Unit
def stop(): Unit
}YARN-specific configuration options for controlling resource allocation, security, and deployment behavior.
// Key configuration entries
val APPLICATION_TAGS: ConfigEntry[Seq[String]]
val MAX_APP_ATTEMPTS: ConfigEntry[Int]
val QUEUE_NAME: ConfigEntry[String]
val SPARK_ARCHIVE: OptionalConfigEntry[String]
val USER_CLASS_PATH_FIRST: ConfigEntry[Boolean]YARN-specific utilities for building container launch commands and managing Spark distribution.
object YarnSparkHadoopUtil {
def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit
val MEMORY_OVERHEAD_FACTOR: Double
val MEMORY_OVERHEAD_MIN: Long
val RM_REQUEST_PRIORITY: Priority
}
object YarnCommandBuilderUtils {
def quoteForBatchScript(arg: String): String
def findJarsDir(sparkHome: String): String
}import org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
class MyCredentialProvider extends ServiceCredentialProvider {
override def serviceName: String = "my-service"
override def credentialsRequired(hadoopConf: Configuration): Boolean = {
// Check if credentials are needed
hadoopConf.get("my.service.enabled", "false").toBoolean
}
override def obtainCredentials(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = {
// Obtain and add credentials
// Return renewal time in milliseconds, or None if no renewal needed
None
}
}import org.apache.spark.scheduler.cluster.SchedulerExtensionService
class MySchedulerExtension extends SchedulerExtensionService {
override def start(binding: SchedulerExtensionServiceBinding): Unit = {
// Initialize extension with access to scheduler components
}
override def stop(): Unit = {
// Cleanup extension resources
}
}Common exceptions thrown by YARN integration:
SparkException: Thrown for unsupported deploy modes or configuration errorsIOException: File system operations during staging and cleanupYarnException: YARN-specific errors during application submission or managementSecurityException: Credential or authentication failures in secure clusters