or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application-deployment.mdcluster-management.mdcommand-building.mdconfiguration-system.mdextension-points.mdindex.mdresource-management.mdsecurity-integration.mdyarn-shuffle-service.md
tile.json

security-integration.mddocs/

Security Integration

Security credential management and delegation token handling for secure YARN clusters with Kerberos authentication. This module provides extensible security integration patterns and automatic credential renewal for long-running applications.

Capabilities

ServiceCredentialProvider

Main extension point for implementing custom secure service credential providers. Enables integration with various secure services beyond the built-in Hadoop ecosystem.

trait ServiceCredentialProvider {
  def serviceName: String
  def credentialsRequired(hadoopConf: Configuration): Boolean  
  def obtainCredentials(hadoopConf: Configuration, sparkConf: SparkConf, creds: Credentials): Option[Long]
}

Core Methods:

serviceName: String

  • Returns unique identifier for the service
  • Used for service discovery and configuration
  • Should be lowercase and descriptive (e.g., "hdfs", "hive", "hbase")

credentialsRequired(hadoopConf: Configuration): Boolean

  • Determines if credentials are needed for this service
  • Examines Hadoop configuration for service-specific settings
  • Returns true if delegation tokens should be obtained

obtainCredentials(hadoopConf, sparkConf, creds): Option[Long]

  • Obtains delegation tokens for the service
  • Adds tokens to the provided Credentials object
  • Returns token renewal time in milliseconds, or None if no renewal needed

Implementation Example:

import org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.apache.spark.SparkConf

class MyServiceCredentialProvider extends ServiceCredentialProvider {
  override def serviceName: String = "myservice"
  
  override def credentialsRequired(hadoopConf: Configuration): Boolean = {
    // Check if service is enabled and security is required
    hadoopConf.getBoolean("myservice.security.enabled", false) &&
    hadoopConf.get("hadoop.security.authentication", "simple") == "kerberos"
  }
  
  override def obtainCredentials(
      hadoopConf: Configuration,
      sparkConf: SparkConf, 
      creds: Credentials): Option[Long] = {
    
    if (credentialsRequired(hadoopConf)) {
      // Connect to service and obtain delegation token
      val serviceClient = new MyServiceClient(hadoopConf)
      val token = serviceClient.getDelegationToken("spark-user")
      
      // Add token to credentials
      creds.addToken(token.getService, token)
      
      // Return renewal time (e.g., 24 hours from now)
      Some(System.currentTimeMillis() + 24 * 60 * 60 * 1000)
    } else {
      None
    }
  }
}

Service Registration:

// Register provider through ServiceLoader mechanism
// META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
com.example.MyServiceCredentialProvider

AMCredentialRenewer

Manages automatic credential renewal for long-running applications in secure clusters.

class AMCredentialRenewer(
  sparkConf: SparkConf,
  hadoopConf: Configuration,
  amClient: AMRMClient[_]
) {
  def start(): Unit
  def stop(): Unit
  def isRunning: Boolean
}

Credential Renewal Process:

  1. Scans for renewable delegation tokens
  2. Schedules renewal based on token expiration times
  3. Obtains fresh tokens before expiration
  4. Updates credentials in running executors
  5. Handles renewal failures gracefully

Usage Example:

// AMCredentialRenewer is automatically managed by ApplicationMaster
// Configuration controls renewal behavior

val sparkConf = new SparkConf()
  .set("spark.yarn.credentials.file", "/path/to/credentials")
  .set("spark.yarn.credentials.renewalTime", "24h")
  .set("spark.yarn.credentials.updateTime", "1h")

// Renewal happens automatically in secure clusters

Configuration Options:

  • spark.yarn.credentials.file: Path to delegation token file
  • spark.yarn.credentials.renewalTime: How often to renew tokens
  • spark.yarn.credentials.updateTime: How often to update executor credentials

YARNHadoopDelegationTokenManager

Manages Hadoop delegation tokens specifically for YARN applications, coordinating with registered ServiceCredentialProvider implementations.

class YARNHadoopDelegationTokenManager(
  sparkConf: SparkConf,
  hadoopConf: Configuration,
  scheduler: TaskScheduler
) {
  def obtainTokensForNamenodes(paths: Set[Path]): Unit
  def renewTokens(): Unit
  def stop(): Unit
}

Token Management:

  • Obtains tokens for HDFS NameNodes based on input/output paths
  • Coordinates with all registered ServiceCredentialProvider instances
  • Handles token renewal scheduling and execution
  • Distributes updated tokens to running executors

Built-in Security Providers

HDFS Integration

// HDFS tokens are obtained automatically for:
val sparkConf = new SparkConf()
  .set("spark.yarn.access.hadoopFileSystems", "hdfs://namenode1:8020,hdfs://namenode2:8020")
  .set("spark.hadoop.fs.defaultFS", "hdfs://namenode1:8020")

// Automatic token acquisition for configured filesystems

Hive Integration

// Hive metastore tokens when using Spark SQL
val sparkConf = new SparkConf()
  .set("spark.sql.hive.metastore.version", "2.3.0")
  .set("spark.hadoop.hive.metastore.uris", "thrift://hive-metastore:9083")
  .set("spark.hadoop.hive.metastore.sasl.enabled", "true")

// Tokens obtained automatically when Hive is configured

HBase Integration

// HBase delegation tokens for secure HBase clusters
val sparkConf = new SparkConf()
  .set("spark.hadoop.hbase.security.authentication", "kerberos")
  .set("spark.hadoop.hbase.master.kerberos.principal", "hbase/_HOST@REALM")
  .set("spark.hadoop.hbase.regionserver.kerberos.principal", "hbase/_HOST@REALM")

// Custom HBase credential provider can be implemented

Kerberos Integration

Principal and Keytab Configuration

val sparkConf = new SparkConf()
  .set("spark.yarn.principal", "spark/hostname@REALM")
  .set("spark.yarn.keytab", "/path/to/spark.keytab")
  .set("spark.yarn.access.hadoopFileSystems", "hdfs://namenode:8020")

Authentication Flow:

  1. ApplicationMaster authenticates using principal/keytab
  2. Obtains delegation tokens for configured services
  3. Distributes tokens to executor containers
  4. Renews tokens before expiration
  5. Updates executors with fresh tokens

Proxy User Support

// Running as proxy user in secure cluster
val sparkConf = new SparkConf()
  .set("spark.yarn.principal", "spark/hostname@REALM")  
  .set("spark.yarn.keytab", "/path/to/spark.keytab")
  .set("spark.sql.hive.hiveserver2.jdbc.url.principal", "hive/_HOST@REALM")

// Spark service principal can proxy for end users

Credential Distribution

Token File Management

// Credential file lifecycle
val credentialFile = "/tmp/spark-credentials-" + UUID.randomUUID()

// Tokens written to file for executor distribution
val creds = new Credentials()
// ... populate credentials
creds.writeTokenStorageFile(credentialFile, hadoopConf)

// File distributed to executors via YARN LocalResource
val localResource = LocalResource.newInstance(
  ConverterUtils.getYarnUrlFromPath(new Path(credentialFile)),
  LocalResourceType.FILE,
  LocalResourceVisibility.PRIVATE,
  fileStatus.getLen,
  fileStatus.getModificationTime
)

Dynamic Token Updates

// Executors receive updated tokens through RPC
case class UpdateDelegationTokens(tokens: Array[Byte])

// ApplicationMaster broadcasts token updates
def updateExecutorCredentials(newTokens: Credentials): Unit = {
  val tokenBytes = SparkHadoopUtil.get.serialize(newTokens)
  val message = UpdateDelegationTokens(tokenBytes)
  
  // Send to all executors
  scheduler.executorIds.foreach { executorId =>
    scheduler.executorEndpointRef(executorId).send(message)
  }
}

Security Configuration

Core Security Settings

// Enable security in YARN mode
val sparkConf = new SparkConf()
  .set("spark.authenticate", "true")
  .set("spark.authenticate.secret", "shared-secret")
  .set("spark.network.crypto.enabled", "true")
  .set("spark.io.encryption.enabled", "true")

YARN-Specific Security

// YARN security configuration
val sparkConf = new SparkConf()
  .set("spark.yarn.security.credentials.hadoopfs.enabled", "true")
  .set("spark.yarn.security.credentials.hive.enabled", "true")  
  .set("spark.yarn.security.credentials.hbase.enabled", "true")
  .set("spark.yarn.maxAppAttempts", "1")  // Reduce attempts in secure mode

SSL/TLS Configuration

// SSL configuration for secure communication
val sparkConf = new SparkConf()
  .set("spark.ssl.enabled", "true")
  .set("spark.ssl.keyStore", "/path/to/keystore.jks")
  .set("spark.ssl.keyStorePassword", "keystore-password")
  .set("spark.ssl.trustStore", "/path/to/truststore.jks")
  .set("spark.ssl.trustStorePassword", "truststore-password")

Error Handling

Authentication Failures

// Common authentication errors
throw new IOException("Failed to authenticate with Kerberos KDC")
throw new AccessControlException("User not authorized for queue: production")
throw new TokenException("Delegation token has expired")

Token Renewal Failures

// Token renewal error handling
try {
  credentialRenewer.renewTokens()
} catch {
  case e: IOException =>
    logError("Failed to renew delegation tokens", e)
    // Attempt re-authentication with keytab
    authenticateWithKeytab()
  case e: InterruptedException =>
    logWarning("Token renewal interrupted")
    Thread.currentThread().interrupt()
}

Security Policy Violations

// Security policy enforcement
def validateSecureAccess(user: String, resource: String): Unit = {
  if (!securityManager.checkAccess(user, resource)) {
    throw new AccessControlException(s"User $user denied access to $resource")
  }
}

Advanced Security Patterns

Custom Authentication

class CustomAuthenticationProvider extends ServiceCredentialProvider {
  override def serviceName: String = "custom-auth"
  
  override def obtainCredentials(
      hadoopConf: Configuration,
      sparkConf: SparkConf,
      creds: Credentials): Option[Long] = {
    
    // Custom authentication logic
    val authToken = performCustomAuth(hadoopConf, sparkConf)
    creds.addToken(new Text("custom-service"), authToken)
    
    // Return renewal time
    Some(System.currentTimeMillis() + renewalIntervalMs)
  }
  
  private def performCustomAuth(hadoopConf: Configuration, sparkConf: SparkConf): Token[_] = {
    // Implement custom authentication protocol
    // Return delegation token for the service
    ???
  }
}

Multi-Cluster Security

// Security configuration for multi-cluster access
val sparkConf = new SparkConf()
  .set("spark.yarn.access.hadoopFileSystems", 
       "hdfs://cluster1:8020,hdfs://cluster2:8020,hdfs://cluster3:8020")
  .set("spark.hadoop.fs.hdfs.impl.disable.cache", "true")  // Avoid connection caching

// Tokens obtained for all configured clusters

Security Monitoring

// Security event monitoring
class SecurityEventListener extends SparkListener {
  override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
    logInfo("Application started in secure mode")
    auditSecurityConfiguration()
  }
  
  override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
    logInfo(s"Executor ${executorAdded.executorId} added with security context")
    validateExecutorSecurity(executorAdded.executorId)
  }
}

Troubleshooting Security Issues

Common Issues

Token Expiration:

// Symptoms: Applications fail after running for extended periods
// Solutions: 
// 1. Configure automatic renewal
val conf = new SparkConf()
  .set("spark.yarn.credentials.renewalTime", "12h")
  
// 2. Use long-lived keytabs instead of tickets
  .set("spark.yarn.principal", "spark/_HOST@REALM")
  .set("spark.yarn.keytab", "/etc/security/keytabs/spark.headless.keytab")

Cross-Realm Authentication:

// Configure cross-realm trust
val sparkConf = new SparkConf()
  .set("spark.hadoop.hadoop.security.auth_to_local", "RULE:[2:$1@$0](.*@REALM2)s/@REALM2/@REALM1/")
  .set("spark.yarn.principal", "spark/_HOST@REALM1")

Service Discovery Issues:

// Ensure service credential providers are on classpath
// Check ServiceLoader registration
val providers = ServiceLoader.load(classOf[ServiceCredentialProvider])
providers.forEach(p => logInfo(s"Found provider: ${p.serviceName}"))