Apache Spark Core provides distributed computing capabilities including RDD abstractions, task scheduling, memory management, and fault recovery.
Apache Spark Core provides modern resource management capabilities including resource profiles, executor and task resource requests, and fine-grained resource allocation for heterogeneous workloads.
Defines resource requirements for executors and tasks, enabling heterogeneous resource allocation within a single Spark application.
class ResourceProfile(
val executorResources: Map[String, ExecutorResourceRequest],
val taskResources: Map[String, TaskResourceRequest]
) {
def id: Int
def equals(other: Any): Boolean
def hashCode(): Int
def toString: String
}
object ResourceProfile {
val DEFAULT_RESOURCE_PROFILE_ID = 0
def getOrCreateDefaultProfile(conf: SparkConf): ResourceProfile
}Builder pattern for constructing resource profiles with fluent API.
class ResourceProfileBuilder {
def require(resourceRequest: ExecutorResourceRequest): ResourceProfileBuilder
def require(resourceRequest: TaskResourceRequest): ResourceProfileBuilder
def build(): ResourceProfile
}class ExecutorResourceRequest(
val resourceName: String,
val amount: Long,
val discoveryScript: String = "",
val vendor: String = ""
) {
def resourceName: String
def amount: Long
def discoveryScript: String
def vendor: String
def equals(other: Any): Boolean
def hashCode(): Int
def toString: String
}
object ExecutorResourceRequest {
val CORES = "cores"
val MEMORY = "memory"
val OVERHEAD_MEM = "memoryOverhead"
val PYSPARK_MEM = "pysparkMemory"
val OFFHEAP_MEM = "offHeapMemory"
val FPGA = "fpga"
val GPU = "gpu"
}class TaskResourceRequest(
val resourceName: String,
val amount: Double
) {
def resourceName: String
def amount: Double
def equals(other: Any): Boolean
def hashCode(): Int
def toString: String
}
object TaskResourceRequest {
val CPUS = "cpus"
val FPGA = "fpga"
val GPU = "gpu"
}Runtime resource information available to tasks and executors.
class ResourceInformation(
val name: String,
val addresses: Array[String]
) {
def name: String
def addresses: Array[String]
def equals(other: Any): Boolean
def hashCode(): Int
def toString: String
}Methods for associating RDDs with specific resource profiles.
// From RDD[T]
def withResources(profile: ResourceProfile): RDD[T]
def getResourceProfile(): ResourceProfileSparkContext methods for resource profile management.
// From SparkContext
def addResourceProfile(rp: ResourceProfile): Unit
def getResourceProfile(id: Int): Option[ResourceProfile]
def getResourceProfiles(): Map[Int, ResourceProfile]
def requestTotalExecutors(numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int], nodeBlacklist: Set[String]): Boolean
def requestExecutors(numAdditionalExecutors: Int): Boolean
def killExecutors(executorIds: Seq[String]): Seq[String]
def killExecutor(executorId: String): Boolean
def getExecutorMemoryStatus: Map[String, (Long, Long)]Key resource management configuration options.
spark.dynamicAllocation.enabled - Enable dynamic executor allocationspark.dynamicAllocation.minExecutors - Minimum number of executorsspark.dynamicAllocation.maxExecutors - Maximum number of executorsspark.dynamicAllocation.initialExecutors - Initial number of executorsspark.dynamicAllocation.executorIdleTimeout - Timeout for idle executorsspark.dynamicAllocation.schedulerBacklogTimeout - Timeout for pending tasksspark.executor.resource.{resourceName}.discoveryScript - Script to discover resourcespark.executor.resource.{resourceName}.vendor - Resource vendorspark.task.resource.{resourceName}.amount - Amount of resource per taskspark.executor.resource.gpu.amount - Number of GPUs per executorspark.executor.resource.gpu.discoveryScript - GPU discovery scriptspark.task.resource.gpu.amount - GPU fraction per taskimport org.apache.spark.resource._
// Create resource profile with specific requirements
val resourceProfile = new ResourceProfileBuilder()
.require(new ExecutorResourceRequest("memory", 8, "", "")) // 8GB memory
.require(new ExecutorResourceRequest("cores", 4, "", "")) // 4 cores
.require(new TaskResourceRequest("cpus", 1.0)) // 1 CPU per task
.build()
// Register profile with SparkContext
sc.addResourceProfile(resourceProfile)
// Use profile with RDD
val data = sc.parallelize(1 to 1000)
val resourcedRDD = data.withResources(resourceProfile)
val result = resourcedRDD.map(heavyComputation).collect()// GPU-enabled resource profile
val gpuProfile = new ResourceProfileBuilder()
.require(new ExecutorResourceRequest("memory", 16, "", ""))
.require(new ExecutorResourceRequest("cores", 8, "", ""))
.require(new ExecutorResourceRequest("gpu", 2, "/opt/spark/gpu-discovery.sh", "nvidia"))
.require(new TaskResourceRequest("cpus", 2.0))
.require(new TaskResourceRequest("gpu", 0.5)) // Half GPU per task
.build()
sc.addResourceProfile(gpuProfile)
// Use for GPU-intensive workloads
val gpuData = sc.parallelize(largeDataset)
val gpuProcessedData = gpuData
.withResources(gpuProfile)
.map(gpuAcceleratedProcessing)
.collect()// Light processing profile
val lightProfile = new ResourceProfileBuilder()
.require(new ExecutorResourceRequest("memory", 2, "", "")) // 2GB
.require(new ExecutorResourceRequest("cores", 2, "", "")) // 2 cores
.require(new TaskResourceRequest("cpus", 0.5)) // Half CPU per task
.build()
// Heavy processing profile
val heavyProfile = new ResourceProfileBuilder()
.require(new ExecutorResourceRequest("memory", 16, "", "")) // 16GB
.require(new ExecutorResourceRequest("cores", 8, "", "")) // 8 cores
.require(new TaskResourceRequest("cpus", 2.0)) // 2 CPUs per task
.build()
sc.addResourceProfile(lightProfile)
sc.addResourceProfile(heavyProfile)
// Apply different profiles to different stages
val rawData = sc.textFile("input.txt")
// Light processing for data cleaning
val cleanedData = rawData
.withResources(lightProfile)
.map(simpleCleanup)
.filter(isValid)
// Heavy processing for complex analytics
val analyticsResults = cleanedData
.withResources(heavyProfile)
.map(complexAnalytics)
.reduce(combineResults)// Configure dynamic allocation through SparkConf
val conf = new SparkConf()
.setAppName("Dynamic Resource Example")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", "2")
.set("spark.dynamicAllocation.maxExecutors", "20")
.set("spark.dynamicAllocation.initialExecutors", "5")
.set("spark.dynamicAllocation.executorIdleTimeout", "60s")
.set("spark.dynamicAllocation.schedulerBacklogTimeout", "10s")
val sc = new SparkContext(conf)
// Request additional executors programmatically
val additionalExecutors = 5
val requested = sc.requestExecutors(additionalExecutors)
println(s"Requested $additionalExecutors additional executors: $requested")
// Monitor executor status
val memoryStatus = sc.getExecutorMemoryStatus
memoryStatus.foreach { case (executorId, (maxMem, remainingMem)) =>
val usedMem = maxMem - remainingMem
println(s"Executor $executorId: ${usedMem / (1024 * 1024)}MB used of ${maxMem / (1024 * 1024)}MB")
}
// Kill specific executors if needed
val executorsToKill = Seq("executor-1", "executor-2")
val killed = sc.killExecutors(executorsToKill)
println(s"Killed executors: ${killed.mkString(", ")}")val data = sc.parallelize(1 to 10000, 100)
val processedData = data.mapPartitions { iter =>
val context = TaskContext.get()
// Get available resources for this task
val resources = context.resources()
val cpus = context.cpus()
// Adapt processing based on available resources
val processingStrategy = resources.get("gpu") match {
case Some(gpuInfo) =>
println(s"Using GPU acceleration: ${gpuInfo.addresses.mkString(", ")}")
"gpu-accelerated"
case None =>
if (cpus >= 4) {
println(s"Using multi-threaded CPU processing with $cpus cores")
"multi-threaded"
} else {
println(s"Using single-threaded processing with $cpus cores")
"single-threaded"
}
}
// Process data according to available resources
iter.map { value =>
processingStrategy match {
case "gpu-accelerated" => gpuProcess(value)
case "multi-threaded" => parallelProcess(value, cpus)
case "single-threaded" => serialProcess(value)
}
}
}
processedData.collect()// FPGA resource profile for specialized workloads
val fpgaProfile = new ResourceProfileBuilder()
.require(new ExecutorResourceRequest("memory", 32, "", ""))
.require(new ExecutorResourceRequest("cores", 16, "", ""))
.require(new ExecutorResourceRequest("fpga", 1, "/opt/spark/fpga-discovery.sh", "intel"))
.require(new TaskResourceRequest("cpus", 4.0))
.require(new TaskResourceRequest("fpga", 1.0)) // One FPGA per task
.build()
sc.addResourceProfile(fpgaProfile)
// Use FPGA for specialized computation
val fpgaWorkload = sc.parallelize(specializedDataset)
val fpgaResults = fpgaWorkload
.withResources(fpgaProfile)
.mapPartitions { iter =>
val context = TaskContext.get()
val fpgaResources = context.resources().get("fpga")
fpgaResources match {
case Some(fpga) =>
// Initialize FPGA with available device
val fpgaDevice = initializeFPGA(fpga.addresses.head)
try {
iter.map(processOnFPGA(_, fpgaDevice))
} finally {
fpgaDevice.close()
}
case None =>
// Fallback to CPU processing
iter.map(processByCPU)
}
}
.collect()// Get all registered resource profiles
val profiles = sc.getResourceProfiles()
profiles.foreach { case (id, profile) =>
println(s"Profile ID: $id")
println(s"Executor Resources: ${profile.executorResources}")
println(s"Task Resources: ${profile.taskResources}")
println("---")
}
// Monitor RDD resource associations
val data = sc.parallelize(1 to 100)
val profiledRDD = data.withResources(heavyProfile)
println(s"RDD resource profile ID: ${profiledRDD.getResourceProfile().id}")
println(s"Default profile ID: ${ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID}")Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-core-2-12