Apache Spark Core provides modern resource management capabilities including resource profiles, executor and task resource requests, and fine-grained resource allocation for heterogeneous workloads.
Defines resource requirements for executors and tasks, enabling heterogeneous resource allocation within a single Spark application.
class ResourceProfile(
val executorResources: Map[String, ExecutorResourceRequest],
val taskResources: Map[String, TaskResourceRequest]
) {
def id: Int
def equals(other: Any): Boolean
def hashCode(): Int
def toString: String
}
object ResourceProfile {
val DEFAULT_RESOURCE_PROFILE_ID = 0
def getOrCreateDefaultProfile(conf: SparkConf): ResourceProfile
}Builder pattern for constructing resource profiles with fluent API.
class ResourceProfileBuilder {
def require(resourceRequest: ExecutorResourceRequest): ResourceProfileBuilder
def require(resourceRequest: TaskResourceRequest): ResourceProfileBuilder
def build(): ResourceProfile
}class ExecutorResourceRequest(
val resourceName: String,
val amount: Long,
val discoveryScript: String = "",
val vendor: String = ""
) {
def resourceName: String
def amount: Long
def discoveryScript: String
def vendor: String
def equals(other: Any): Boolean
def hashCode(): Int
def toString: String
}
object ExecutorResourceRequest {
val CORES = "cores"
val MEMORY = "memory"
val OVERHEAD_MEM = "memoryOverhead"
val PYSPARK_MEM = "pysparkMemory"
val OFFHEAP_MEM = "offHeapMemory"
val FPGA = "fpga"
val GPU = "gpu"
}class TaskResourceRequest(
val resourceName: String,
val amount: Double
) {
def resourceName: String
def amount: Double
def equals(other: Any): Boolean
def hashCode(): Int
def toString: String
}
object TaskResourceRequest {
val CPUS = "cpus"
val FPGA = "fpga"
val GPU = "gpu"
}Runtime resource information available to tasks and executors.
class ResourceInformation(
val name: String,
val addresses: Array[String]
) {
def name: String
def addresses: Array[String]
def equals(other: Any): Boolean
def hashCode(): Int
def toString: String
}Methods for associating RDDs with specific resource profiles.
// From RDD[T]
def withResources(profile: ResourceProfile): RDD[T]
def getResourceProfile(): ResourceProfileSparkContext methods for resource profile management.
// From SparkContext
def addResourceProfile(rp: ResourceProfile): Unit
def getResourceProfile(id: Int): Option[ResourceProfile]
def getResourceProfiles(): Map[Int, ResourceProfile]
def requestTotalExecutors(numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int], nodeBlacklist: Set[String]): Boolean
def requestExecutors(numAdditionalExecutors: Int): Boolean
def killExecutors(executorIds: Seq[String]): Seq[String]
def killExecutor(executorId: String): Boolean
def getExecutorMemoryStatus: Map[String, (Long, Long)]Key resource management configuration options.
spark.dynamicAllocation.enabled - Enable dynamic executor allocationspark.dynamicAllocation.minExecutors - Minimum number of executorsspark.dynamicAllocation.maxExecutors - Maximum number of executorsspark.dynamicAllocation.initialExecutors - Initial number of executorsspark.dynamicAllocation.executorIdleTimeout - Timeout for idle executorsspark.dynamicAllocation.schedulerBacklogTimeout - Timeout for pending tasksspark.executor.resource.{resourceName}.discoveryScript - Script to discover resourcespark.executor.resource.{resourceName}.vendor - Resource vendorspark.task.resource.{resourceName}.amount - Amount of resource per taskspark.executor.resource.gpu.amount - Number of GPUs per executorspark.executor.resource.gpu.discoveryScript - GPU discovery scriptspark.task.resource.gpu.amount - GPU fraction per taskimport org.apache.spark.resource._
// Create resource profile with specific requirements
val resourceProfile = new ResourceProfileBuilder()
.require(new ExecutorResourceRequest("memory", 8, "", "")) // 8GB memory
.require(new ExecutorResourceRequest("cores", 4, "", "")) // 4 cores
.require(new TaskResourceRequest("cpus", 1.0)) // 1 CPU per task
.build()
// Register profile with SparkContext
sc.addResourceProfile(resourceProfile)
// Use profile with RDD
val data = sc.parallelize(1 to 1000)
val resourcedRDD = data.withResources(resourceProfile)
val result = resourcedRDD.map(heavyComputation).collect()// GPU-enabled resource profile
val gpuProfile = new ResourceProfileBuilder()
.require(new ExecutorResourceRequest("memory", 16, "", ""))
.require(new ExecutorResourceRequest("cores", 8, "", ""))
.require(new ExecutorResourceRequest("gpu", 2, "/opt/spark/gpu-discovery.sh", "nvidia"))
.require(new TaskResourceRequest("cpus", 2.0))
.require(new TaskResourceRequest("gpu", 0.5)) // Half GPU per task
.build()
sc.addResourceProfile(gpuProfile)
// Use for GPU-intensive workloads
val gpuData = sc.parallelize(largeDataset)
val gpuProcessedData = gpuData
.withResources(gpuProfile)
.map(gpuAcceleratedProcessing)
.collect()// Light processing profile
val lightProfile = new ResourceProfileBuilder()
.require(new ExecutorResourceRequest("memory", 2, "", "")) // 2GB
.require(new ExecutorResourceRequest("cores", 2, "", "")) // 2 cores
.require(new TaskResourceRequest("cpus", 0.5)) // Half CPU per task
.build()
// Heavy processing profile
val heavyProfile = new ResourceProfileBuilder()
.require(new ExecutorResourceRequest("memory", 16, "", "")) // 16GB
.require(new ExecutorResourceRequest("cores", 8, "", "")) // 8 cores
.require(new TaskResourceRequest("cpus", 2.0)) // 2 CPUs per task
.build()
sc.addResourceProfile(lightProfile)
sc.addResourceProfile(heavyProfile)
// Apply different profiles to different stages
val rawData = sc.textFile("input.txt")
// Light processing for data cleaning
val cleanedData = rawData
.withResources(lightProfile)
.map(simpleCleanup)
.filter(isValid)
// Heavy processing for complex analytics
val analyticsResults = cleanedData
.withResources(heavyProfile)
.map(complexAnalytics)
.reduce(combineResults)// Configure dynamic allocation through SparkConf
val conf = new SparkConf()
.setAppName("Dynamic Resource Example")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", "2")
.set("spark.dynamicAllocation.maxExecutors", "20")
.set("spark.dynamicAllocation.initialExecutors", "5")
.set("spark.dynamicAllocation.executorIdleTimeout", "60s")
.set("spark.dynamicAllocation.schedulerBacklogTimeout", "10s")
val sc = new SparkContext(conf)
// Request additional executors programmatically
val additionalExecutors = 5
val requested = sc.requestExecutors(additionalExecutors)
println(s"Requested $additionalExecutors additional executors: $requested")
// Monitor executor status
val memoryStatus = sc.getExecutorMemoryStatus
memoryStatus.foreach { case (executorId, (maxMem, remainingMem)) =>
val usedMem = maxMem - remainingMem
println(s"Executor $executorId: ${usedMem / (1024 * 1024)}MB used of ${maxMem / (1024 * 1024)}MB")
}
// Kill specific executors if needed
val executorsToKill = Seq("executor-1", "executor-2")
val killed = sc.killExecutors(executorsToKill)
println(s"Killed executors: ${killed.mkString(", ")}")val data = sc.parallelize(1 to 10000, 100)
val processedData = data.mapPartitions { iter =>
val context = TaskContext.get()
// Get available resources for this task
val resources = context.resources()
val cpus = context.cpus()
// Adapt processing based on available resources
val processingStrategy = resources.get("gpu") match {
case Some(gpuInfo) =>
println(s"Using GPU acceleration: ${gpuInfo.addresses.mkString(", ")}")
"gpu-accelerated"
case None =>
if (cpus >= 4) {
println(s"Using multi-threaded CPU processing with $cpus cores")
"multi-threaded"
} else {
println(s"Using single-threaded processing with $cpus cores")
"single-threaded"
}
}
// Process data according to available resources
iter.map { value =>
processingStrategy match {
case "gpu-accelerated" => gpuProcess(value)
case "multi-threaded" => parallelProcess(value, cpus)
case "single-threaded" => serialProcess(value)
}
}
}
processedData.collect()// FPGA resource profile for specialized workloads
val fpgaProfile = new ResourceProfileBuilder()
.require(new ExecutorResourceRequest("memory", 32, "", ""))
.require(new ExecutorResourceRequest("cores", 16, "", ""))
.require(new ExecutorResourceRequest("fpga", 1, "/opt/spark/fpga-discovery.sh", "intel"))
.require(new TaskResourceRequest("cpus", 4.0))
.require(new TaskResourceRequest("fpga", 1.0)) // One FPGA per task
.build()
sc.addResourceProfile(fpgaProfile)
// Use FPGA for specialized computation
val fpgaWorkload = sc.parallelize(specializedDataset)
val fpgaResults = fpgaWorkload
.withResources(fpgaProfile)
.mapPartitions { iter =>
val context = TaskContext.get()
val fpgaResources = context.resources().get("fpga")
fpgaResources match {
case Some(fpga) =>
// Initialize FPGA with available device
val fpgaDevice = initializeFPGA(fpga.addresses.head)
try {
iter.map(processOnFPGA(_, fpgaDevice))
} finally {
fpgaDevice.close()
}
case None =>
// Fallback to CPU processing
iter.map(processByCPU)
}
}
.collect()// Get all registered resource profiles
val profiles = sc.getResourceProfiles()
profiles.foreach { case (id, profile) =>
println(s"Profile ID: $id")
println(s"Executor Resources: ${profile.executorResources}")
println(s"Task Resources: ${profile.taskResources}")
println("---")
}
// Monitor RDD resource associations
val data = sc.parallelize(1 to 100)
val profiledRDD = data.withResources(heavyProfile)
println(s"RDD resource profile ID: ${profiledRDD.getResourceProfile().id}")
println(s"Default profile ID: ${ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID}")