Apache Spark YARN resource manager integration component that enables Spark applications to run on Hadoop YARN clusters
Security credential management and delegation token handling for secure YARN clusters with Kerberos authentication. This module provides extensible security integration patterns and automatic credential renewal for long-running applications.
Main extension point for implementing custom secure service credential providers. Enables integration with various secure services beyond the built-in Hadoop ecosystem.
trait ServiceCredentialProvider {
def serviceName: String
def credentialsRequired(hadoopConf: Configuration): Boolean
def obtainCredentials(hadoopConf: Configuration, sparkConf: SparkConf, creds: Credentials): Option[Long]
}Core Methods:
serviceName: String
credentialsRequired(hadoopConf: Configuration): Boolean
obtainCredentials(hadoopConf, sparkConf, creds): Option[Long]
Implementation Example:
import org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.apache.spark.SparkConf
class MyServiceCredentialProvider extends ServiceCredentialProvider {
override def serviceName: String = "myservice"
override def credentialsRequired(hadoopConf: Configuration): Boolean = {
// Check if service is enabled and security is required
hadoopConf.getBoolean("myservice.security.enabled", false) &&
hadoopConf.get("hadoop.security.authentication", "simple") == "kerberos"
}
override def obtainCredentials(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = {
if (credentialsRequired(hadoopConf)) {
// Connect to service and obtain delegation token
val serviceClient = new MyServiceClient(hadoopConf)
val token = serviceClient.getDelegationToken("spark-user")
// Add token to credentials
creds.addToken(token.getService, token)
// Return renewal time (e.g., 24 hours from now)
Some(System.currentTimeMillis() + 24 * 60 * 60 * 1000)
} else {
None
}
}
}Service Registration:
// Register provider through ServiceLoader mechanism
// META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
com.example.MyServiceCredentialProviderManages automatic credential renewal for long-running applications in secure clusters.
class AMCredentialRenewer(
sparkConf: SparkConf,
hadoopConf: Configuration,
amClient: AMRMClient[_]
) {
def start(): Unit
def stop(): Unit
def isRunning: Boolean
}Credential Renewal Process:
Usage Example:
// AMCredentialRenewer is automatically managed by ApplicationMaster
// Configuration controls renewal behavior
val sparkConf = new SparkConf()
.set("spark.yarn.credentials.file", "/path/to/credentials")
.set("spark.yarn.credentials.renewalTime", "24h")
.set("spark.yarn.credentials.updateTime", "1h")
// Renewal happens automatically in secure clustersConfiguration Options:
spark.yarn.credentials.file: Path to delegation token filespark.yarn.credentials.renewalTime: How often to renew tokensspark.yarn.credentials.updateTime: How often to update executor credentialsManages Hadoop delegation tokens specifically for YARN applications, coordinating with registered ServiceCredentialProvider implementations.
class YARNHadoopDelegationTokenManager(
sparkConf: SparkConf,
hadoopConf: Configuration,
scheduler: TaskScheduler
) {
def obtainTokensForNamenodes(paths: Set[Path]): Unit
def renewTokens(): Unit
def stop(): Unit
}Token Management:
// HDFS tokens are obtained automatically for:
val sparkConf = new SparkConf()
.set("spark.yarn.access.hadoopFileSystems", "hdfs://namenode1:8020,hdfs://namenode2:8020")
.set("spark.hadoop.fs.defaultFS", "hdfs://namenode1:8020")
// Automatic token acquisition for configured filesystems// Hive metastore tokens when using Spark SQL
val sparkConf = new SparkConf()
.set("spark.sql.hive.metastore.version", "2.3.0")
.set("spark.hadoop.hive.metastore.uris", "thrift://hive-metastore:9083")
.set("spark.hadoop.hive.metastore.sasl.enabled", "true")
// Tokens obtained automatically when Hive is configured// HBase delegation tokens for secure HBase clusters
val sparkConf = new SparkConf()
.set("spark.hadoop.hbase.security.authentication", "kerberos")
.set("spark.hadoop.hbase.master.kerberos.principal", "hbase/_HOST@REALM")
.set("spark.hadoop.hbase.regionserver.kerberos.principal", "hbase/_HOST@REALM")
// Custom HBase credential provider can be implementedval sparkConf = new SparkConf()
.set("spark.yarn.principal", "spark/hostname@REALM")
.set("spark.yarn.keytab", "/path/to/spark.keytab")
.set("spark.yarn.access.hadoopFileSystems", "hdfs://namenode:8020")Authentication Flow:
// Running as proxy user in secure cluster
val sparkConf = new SparkConf()
.set("spark.yarn.principal", "spark/hostname@REALM")
.set("spark.yarn.keytab", "/path/to/spark.keytab")
.set("spark.sql.hive.hiveserver2.jdbc.url.principal", "hive/_HOST@REALM")
// Spark service principal can proxy for end users// Credential file lifecycle
val credentialFile = "/tmp/spark-credentials-" + UUID.randomUUID()
// Tokens written to file for executor distribution
val creds = new Credentials()
// ... populate credentials
creds.writeTokenStorageFile(credentialFile, hadoopConf)
// File distributed to executors via YARN LocalResource
val localResource = LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(new Path(credentialFile)),
LocalResourceType.FILE,
LocalResourceVisibility.PRIVATE,
fileStatus.getLen,
fileStatus.getModificationTime
)// Executors receive updated tokens through RPC
case class UpdateDelegationTokens(tokens: Array[Byte])
// ApplicationMaster broadcasts token updates
def updateExecutorCredentials(newTokens: Credentials): Unit = {
val tokenBytes = SparkHadoopUtil.get.serialize(newTokens)
val message = UpdateDelegationTokens(tokenBytes)
// Send to all executors
scheduler.executorIds.foreach { executorId =>
scheduler.executorEndpointRef(executorId).send(message)
}
}// Enable security in YARN mode
val sparkConf = new SparkConf()
.set("spark.authenticate", "true")
.set("spark.authenticate.secret", "shared-secret")
.set("spark.network.crypto.enabled", "true")
.set("spark.io.encryption.enabled", "true")// YARN security configuration
val sparkConf = new SparkConf()
.set("spark.yarn.security.credentials.hadoopfs.enabled", "true")
.set("spark.yarn.security.credentials.hive.enabled", "true")
.set("spark.yarn.security.credentials.hbase.enabled", "true")
.set("spark.yarn.maxAppAttempts", "1") // Reduce attempts in secure mode// SSL configuration for secure communication
val sparkConf = new SparkConf()
.set("spark.ssl.enabled", "true")
.set("spark.ssl.keyStore", "/path/to/keystore.jks")
.set("spark.ssl.keyStorePassword", "keystore-password")
.set("spark.ssl.trustStore", "/path/to/truststore.jks")
.set("spark.ssl.trustStorePassword", "truststore-password")// Common authentication errors
throw new IOException("Failed to authenticate with Kerberos KDC")
throw new AccessControlException("User not authorized for queue: production")
throw new TokenException("Delegation token has expired")// Token renewal error handling
try {
credentialRenewer.renewTokens()
} catch {
case e: IOException =>
logError("Failed to renew delegation tokens", e)
// Attempt re-authentication with keytab
authenticateWithKeytab()
case e: InterruptedException =>
logWarning("Token renewal interrupted")
Thread.currentThread().interrupt()
}// Security policy enforcement
def validateSecureAccess(user: String, resource: String): Unit = {
if (!securityManager.checkAccess(user, resource)) {
throw new AccessControlException(s"User $user denied access to $resource")
}
}class CustomAuthenticationProvider extends ServiceCredentialProvider {
override def serviceName: String = "custom-auth"
override def obtainCredentials(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = {
// Custom authentication logic
val authToken = performCustomAuth(hadoopConf, sparkConf)
creds.addToken(new Text("custom-service"), authToken)
// Return renewal time
Some(System.currentTimeMillis() + renewalIntervalMs)
}
private def performCustomAuth(hadoopConf: Configuration, sparkConf: SparkConf): Token[_] = {
// Implement custom authentication protocol
// Return delegation token for the service
???
}
}// Security configuration for multi-cluster access
val sparkConf = new SparkConf()
.set("spark.yarn.access.hadoopFileSystems",
"hdfs://cluster1:8020,hdfs://cluster2:8020,hdfs://cluster3:8020")
.set("spark.hadoop.fs.hdfs.impl.disable.cache", "true") // Avoid connection caching
// Tokens obtained for all configured clusters// Security event monitoring
class SecurityEventListener extends SparkListener {
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
logInfo("Application started in secure mode")
auditSecurityConfiguration()
}
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
logInfo(s"Executor ${executorAdded.executorId} added with security context")
validateExecutorSecurity(executorAdded.executorId)
}
}Token Expiration:
// Symptoms: Applications fail after running for extended periods
// Solutions:
// 1. Configure automatic renewal
val conf = new SparkConf()
.set("spark.yarn.credentials.renewalTime", "12h")
// 2. Use long-lived keytabs instead of tickets
.set("spark.yarn.principal", "spark/_HOST@REALM")
.set("spark.yarn.keytab", "/etc/security/keytabs/spark.headless.keytab")Cross-Realm Authentication:
// Configure cross-realm trust
val sparkConf = new SparkConf()
.set("spark.hadoop.hadoop.security.auth_to_local", "RULE:[2:$1@$0](.*@REALM2)s/@REALM2/@REALM1/")
.set("spark.yarn.principal", "spark/_HOST@REALM1")Service Discovery Issues:
// Ensure service credential providers are on classpath
// Check ServiceLoader registration
val providers = ServiceLoader.load(classOf[ServiceCredentialProvider])
providers.forEach(p => logInfo(s"Found provider: ${p.serviceName}"))Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-yarn-2-11