or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

context-config.mdindex.mdjava-api.mdrdd-operations.mdresource-management.mdserialization.mdshared-variables.mdstorage-caching.mdtask-context.md
tile.json

resource-management.mddocs/

Resource Management

Apache Spark Core provides modern resource management capabilities including resource profiles, executor and task resource requests, and fine-grained resource allocation for heterogeneous workloads.

ResourceProfile

Defines resource requirements for executors and tasks, enabling heterogeneous resource allocation within a single Spark application.

class ResourceProfile(
  val executorResources: Map[String, ExecutorResourceRequest],
  val taskResources: Map[String, TaskResourceRequest]
) {
  def id: Int
  def equals(other: Any): Boolean
  def hashCode(): Int
  def toString: String
}

object ResourceProfile {
  val DEFAULT_RESOURCE_PROFILE_ID = 0
  def getOrCreateDefaultProfile(conf: SparkConf): ResourceProfile
}

ResourceProfileBuilder

Builder pattern for constructing resource profiles with fluent API.

class ResourceProfileBuilder {
  def require(resourceRequest: ExecutorResourceRequest): ResourceProfileBuilder
  def require(resourceRequest: TaskResourceRequest): ResourceProfileBuilder
  def build(): ResourceProfile
}

Resource Request Types

ExecutorResourceRequest

class ExecutorResourceRequest(
  val resourceName: String,
  val amount: Long,
  val discoveryScript: String = "",
  val vendor: String = ""
) {
  def resourceName: String
  def amount: Long
  def discoveryScript: String
  def vendor: String
  def equals(other: Any): Boolean
  def hashCode(): Int
  def toString: String
}

object ExecutorResourceRequest {
  val CORES = "cores"
  val MEMORY = "memory"
  val OVERHEAD_MEM = "memoryOverhead"
  val PYSPARK_MEM = "pysparkMemory"
  val OFFHEAP_MEM = "offHeapMemory"
  val FPGA = "fpga"
  val GPU = "gpu"
}

TaskResourceRequest

class TaskResourceRequest(
  val resourceName: String,
  val amount: Double
) {
  def resourceName: String
  def amount: Double
  def equals(other: Any): Boolean
  def hashCode(): Int
  def toString: String
}

object TaskResourceRequest {
  val CPUS = "cpus"
  val FPGA = "fpga"
  val GPU = "gpu"
}

Resource Information

Runtime resource information available to tasks and executors.

class ResourceInformation(
  val name: String,
  val addresses: Array[String]
) {
  def name: String
  def addresses: Array[String]
  def equals(other: Any): Boolean
  def hashCode(): Int
  def toString: String
}

RDD Resource Profiles

Methods for associating RDDs with specific resource profiles.

// From RDD[T]
def withResources(profile: ResourceProfile): RDD[T]
def getResourceProfile(): ResourceProfile

SparkContext Resource Methods

SparkContext methods for resource profile management.

// From SparkContext
def addResourceProfile(rp: ResourceProfile): Unit
def getResourceProfile(id: Int): Option[ResourceProfile]
def getResourceProfiles(): Map[Int, ResourceProfile]
def requestTotalExecutors(numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int], nodeBlacklist: Set[String]): Boolean
def requestExecutors(numAdditionalExecutors: Int): Boolean
def killExecutors(executorIds: Seq[String]): Seq[String]
def killExecutor(executorId: String): Boolean
def getExecutorMemoryStatus: Map[String, (Long, Long)]

Configuration Properties

Key resource management configuration options.

Dynamic Allocation

  • spark.dynamicAllocation.enabled - Enable dynamic executor allocation
  • spark.dynamicAllocation.minExecutors - Minimum number of executors
  • spark.dynamicAllocation.maxExecutors - Maximum number of executors
  • spark.dynamicAllocation.initialExecutors - Initial number of executors
  • spark.dynamicAllocation.executorIdleTimeout - Timeout for idle executors
  • spark.dynamicAllocation.schedulerBacklogTimeout - Timeout for pending tasks

Resource Discovery

  • spark.executor.resource.{resourceName}.discoveryScript - Script to discover resource
  • spark.executor.resource.{resourceName}.vendor - Resource vendor
  • spark.task.resource.{resourceName}.amount - Amount of resource per task

GPU Configuration

  • spark.executor.resource.gpu.amount - Number of GPUs per executor
  • spark.executor.resource.gpu.discoveryScript - GPU discovery script
  • spark.task.resource.gpu.amount - GPU fraction per task

Usage Examples

Basic Resource Profile

import org.apache.spark.resource._

// Create resource profile with specific requirements
val resourceProfile = new ResourceProfileBuilder()
  .require(new ExecutorResourceRequest("memory", 8, "", ""))  // 8GB memory  
  .require(new ExecutorResourceRequest("cores", 4, "", ""))   // 4 cores
  .require(new TaskResourceRequest("cpus", 1.0))              // 1 CPU per task
  .build()

// Register profile with SparkContext
sc.addResourceProfile(resourceProfile)

// Use profile with RDD
val data = sc.parallelize(1 to 1000)
val resourcedRDD = data.withResources(resourceProfile)

val result = resourcedRDD.map(heavyComputation).collect()

GPU Resource Profile

// GPU-enabled resource profile
val gpuProfile = new ResourceProfileBuilder()
  .require(new ExecutorResourceRequest("memory", 16, "", ""))
  .require(new ExecutorResourceRequest("cores", 8, "", ""))
  .require(new ExecutorResourceRequest("gpu", 2, "/opt/spark/gpu-discovery.sh", "nvidia"))
  .require(new TaskResourceRequest("cpus", 2.0))
  .require(new TaskResourceRequest("gpu", 0.5))  // Half GPU per task
  .build()

sc.addResourceProfile(gpuProfile)

// Use for GPU-intensive workloads
val gpuData = sc.parallelize(largeDataset)
val gpuProcessedData = gpuData
  .withResources(gpuProfile)
  .map(gpuAcceleratedProcessing)
  .collect()

Mixed Workload Resource Management

// Light processing profile
val lightProfile = new ResourceProfileBuilder()
  .require(new ExecutorResourceRequest("memory", 2, "", ""))  // 2GB
  .require(new ExecutorResourceRequest("cores", 2, "", ""))   // 2 cores
  .require(new TaskResourceRequest("cpus", 0.5))              // Half CPU per task
  .build()

// Heavy processing profile  
val heavyProfile = new ResourceProfileBuilder()
  .require(new ExecutorResourceRequest("memory", 16, "", "")) // 16GB
  .require(new ExecutorResourceRequest("cores", 8, "", ""))   // 8 cores
  .require(new TaskResourceRequest("cpus", 2.0))              // 2 CPUs per task
  .build()

sc.addResourceProfile(lightProfile)
sc.addResourceProfile(heavyProfile)

// Apply different profiles to different stages
val rawData = sc.textFile("input.txt")

// Light processing for data cleaning
val cleanedData = rawData
  .withResources(lightProfile)
  .map(simpleCleanup)
  .filter(isValid)

// Heavy processing for complex analytics
val analyticsResults = cleanedData
  .withResources(heavyProfile)
  .map(complexAnalytics)
  .reduce(combineResults)

Dynamic Resource Allocation

// Configure dynamic allocation through SparkConf
val conf = new SparkConf()
  .setAppName("Dynamic Resource Example")
  .set("spark.dynamicAllocation.enabled", "true")
  .set("spark.dynamicAllocation.minExecutors", "2")
  .set("spark.dynamicAllocation.maxExecutors", "20")
  .set("spark.dynamicAllocation.initialExecutors", "5")
  .set("spark.dynamicAllocation.executorIdleTimeout", "60s")
  .set("spark.dynamicAllocation.schedulerBacklogTimeout", "10s")

val sc = new SparkContext(conf)

// Request additional executors programmatically
val additionalExecutors = 5
val requested = sc.requestExecutors(additionalExecutors)
println(s"Requested $additionalExecutors additional executors: $requested")

// Monitor executor status
val memoryStatus = sc.getExecutorMemoryStatus
memoryStatus.foreach { case (executorId, (maxMem, remainingMem)) =>
  val usedMem = maxMem - remainingMem
  println(s"Executor $executorId: ${usedMem / (1024 * 1024)}MB used of ${maxMem / (1024 * 1024)}MB")
}

// Kill specific executors if needed
val executorsToKill = Seq("executor-1", "executor-2")
val killed = sc.killExecutors(executorsToKill)
println(s"Killed executors: ${killed.mkString(", ")}")

Resource-Aware Task Scheduling

val data = sc.parallelize(1 to 10000, 100)

val processedData = data.mapPartitions { iter =>
  val context = TaskContext.get()
  
  // Get available resources for this task
  val resources = context.resources()
  val cpus = context.cpus()
  
  // Adapt processing based on available resources
  val processingStrategy = resources.get("gpu") match {
    case Some(gpuInfo) =>
      println(s"Using GPU acceleration: ${gpuInfo.addresses.mkString(", ")}")
      "gpu-accelerated"
    case None =>
      if (cpus >= 4) {
        println(s"Using multi-threaded CPU processing with $cpus cores")
        "multi-threaded"
      } else {
        println(s"Using single-threaded processing with $cpus cores")
        "single-threaded"
      }
  }
  
  // Process data according to available resources
  iter.map { value =>
    processingStrategy match {
      case "gpu-accelerated" => gpuProcess(value)
      case "multi-threaded" => parallelProcess(value, cpus)
      case "single-threaded" => serialProcess(value)
    }
  }
}

processedData.collect()

FPGA Resource Management

// FPGA resource profile for specialized workloads
val fpgaProfile = new ResourceProfileBuilder()
  .require(new ExecutorResourceRequest("memory", 32, "", ""))
  .require(new ExecutorResourceRequest("cores", 16, "", ""))
  .require(new ExecutorResourceRequest("fpga", 1, "/opt/spark/fpga-discovery.sh", "intel"))
  .require(new TaskResourceRequest("cpus", 4.0))
  .require(new TaskResourceRequest("fpga", 1.0))  // One FPGA per task
  .build()

sc.addResourceProfile(fpgaProfile)

// Use FPGA for specialized computation
val fpgaWorkload = sc.parallelize(specializedDataset)
val fpgaResults = fpgaWorkload
  .withResources(fpgaProfile)
  .mapPartitions { iter =>
    val context = TaskContext.get()
    val fpgaResources = context.resources().get("fpga")
    
    fpgaResources match {
      case Some(fpga) =>
        // Initialize FPGA with available device
        val fpgaDevice = initializeFPGA(fpga.addresses.head)
        try {
          iter.map(processOnFPGA(_, fpgaDevice))
        } finally {
          fpgaDevice.close()
        }
      case None =>
        // Fallback to CPU processing
        iter.map(processByCPU)
    }
  }
  .collect()

Resource Profile Monitoring

// Get all registered resource profiles
val profiles = sc.getResourceProfiles()
profiles.foreach { case (id, profile) =>
  println(s"Profile ID: $id")
  println(s"Executor Resources: ${profile.executorResources}")
  println(s"Task Resources: ${profile.taskResources}")
  println("---")
}

// Monitor RDD resource associations
val data = sc.parallelize(1 to 100)
val profiledRDD = data.withResources(heavyProfile)

println(s"RDD resource profile ID: ${profiledRDD.getResourceProfile().id}")
println(s"Default profile ID: ${ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID}")

Best Practices

Resource Profile Design

  1. Match workload characteristics: Choose resources based on computation type
  2. Consider task parallelism: Balance task resources with executor resources
  3. Plan for heterogeneity: Use different profiles for different processing stages
  4. Monitor utilization: Track resource usage to optimize allocations
  5. Test configurations: Benchmark different resource combinations

Dynamic Allocation

  1. Set appropriate bounds: Configure min/max executors based on workload
  2. Tune timeouts: Adjust idle and backlog timeouts for responsiveness
  3. Monitor scaling: Watch for oscillation in executor counts
  4. Consider costs: Balance performance with resource costs
  5. Plan for failures: Ensure minimum executors for fault tolerance

GPU/FPGA Usage

  1. Efficient utilization: Ensure full utilization of accelerator resources
  2. Memory management: Consider GPU/FPGA memory limitations
  3. Fallback strategies: Provide CPU fallbacks for resource unavailability
  4. Driver compatibility: Ensure proper driver installation and discovery
  5. Task granularity: Size tasks appropriately for accelerator efficiency