Apache Spark Core - The foundational component of Apache Spark providing distributed computing capabilities including RDDs, transformations, actions, and cluster management.
—
Spark provides comprehensive APIs for monitoring job and stage progress, executor status, and application metrics. This enables real-time tracking of application performance and resource utilization.
The main interface for programmatic access to job and stage status information.
class SparkStatusTracker {
// Job Monitoring
def getJobIdsForGroup(jobGroup: String): Array[Int]
def getActiveStageIds(): Array[Int]
def getActiveJobIds(): Array[Int]
def getJobInfo(jobId: Int): Option[SparkJobInfo]
def getStageInfo(stageId: Int): Option[SparkStageInfo]
def getStageInfos(): Array[SparkStageInfo]
// Executor Monitoring
def getExecutorInfos(): Array[SparkExecutorInfo]
}public class SparkJobInfo {
public int jobId();
public int[] stageIds();
public JobExecutionStatus status();
public int numTasks();
public int numActiveTasks();
public int numCompletedTasks();
public int numSkippedTasks();
public int numFailedTasks();
public int numKilledTasks();
public int numActiveStages();
public int numCompletedStages();
public int numSkippedStages();
public int numFailedStages();
}
public enum JobExecutionStatus {
RUNNING, SUCCEEDED, FAILED, UNKNOWN
}public class SparkStageInfo {
public int stageId();
public int currentAttemptId();
public long submissionTime();
public String name();
public int numTasks();
public int numActiveTasks();
public int numCompleteTasks();
public int numFailedTasks();
public int numKilledTasks();
public long executorRunTime();
public long executorCpuTime();
public long submissionTime();
public long firstTaskLaunchedTime();
public long completionTime();
public long failureReason();
}public class SparkExecutorInfo {
public String executorId();
public String host();
public int port();
public long cacheSize();
public int numRunningTasks();
public int numCompletedTasks();
public int numFailedTasks();
public long usedOnHeapStorageMemory();
public long usedOffHeapStorageMemory();
public long totalOnHeapStorageMemory();
public long totalOffHeapStorageMemory();
public long maxMemory();
public long maxOnHeapStorageMemory();
public long maxOffHeapStorageMemory();
}import org.apache.spark.{SparkContext, SparkConf}
val sc = new SparkContext(new SparkConf().setAppName("Status Monitoring").setMaster("local[*]"))
val statusTracker = sc.statusTracker
// Set job group for tracking
sc.setJobGroup("data-processing", "Main data processing pipeline")
// Start some work
val data = sc.parallelize(1 to 1000000, numSlices = 10)
val processed = data.map(_ * 2).filter(_ > 100).cache()
// Monitor job progress in a separate thread
val monitoringThread = new Thread(new Runnable {
def run(): Unit = {
while (!Thread.currentThread().isInterrupted) {
try {
// Get active jobs
val activeJobs = statusTracker.getActiveJobIds()
println(s"Active jobs: ${activeJobs.mkString(", ")}")
// Get jobs for our group
val groupJobs = statusTracker.getJobIdsForGroup("data-processing")
println(s"Jobs in data-processing group: ${groupJobs.mkString(", ")}")
// Monitor each active job
activeJobs.foreach { jobId =>
statusTracker.getJobInfo(jobId) match {
case Some(jobInfo) =>
println(s"Job $jobId:")
println(s" Status: ${jobInfo.status()}")
println(s" Tasks: ${jobInfo.numCompletedTasks()}/${jobInfo.numTasks()} completed")
println(s" Stages: ${jobInfo.numCompletedStages()}/${jobInfo.stageIds().length} completed")
case None =>
println(s"Job $jobId: No info available")
}
}
// Monitor active stages
val activeStages = statusTracker.getActiveStageIds()
activeStages.foreach { stageId =>
statusTracker.getStageInfo(stageId) match {
case Some(stageInfo) =>
val progress = if (stageInfo.numTasks() > 0) {
(stageInfo.numCompleteTasks().toDouble / stageInfo.numTasks()) * 100
} else 0.0
println(s"Stage $stageId (${stageInfo.name()}):")
println(f" Progress: $progress%.1f%% (${stageInfo.numCompleteTasks()}/${stageInfo.numTasks()} tasks)")
println(s" Active tasks: ${stageInfo.numActiveTasks()}")
println(s" Failed tasks: ${stageInfo.numFailedTasks()}")
case None =>
println(s"Stage $stageId: No info available")
}
}
Thread.sleep(2000) // Update every 2 seconds
} catch {
case _: InterruptedException => return
case e: Exception => println(s"Monitoring error: ${e.getMessage}")
}
}
}
})
monitoringThread.start()
// Trigger computation
val result = processed.count()
println(s"Final result: $result")
// Stop monitoring
monitoringThread.interrupt()
monitoringThread.join()def printExecutorStatus(statusTracker: SparkStatusTracker): Unit = {
val executors = statusTracker.getExecutorInfos()
println(s"Total executors: ${executors.length}")
println("-" * 80)
executors.foreach { executor =>
val memoryUsagePercent = if (executor.maxMemory() > 0) {
((executor.usedOnHeapStorageMemory() + executor.usedOffHeapStorageMemory()).toDouble / executor.maxMemory()) * 100
} else 0.0
println(s"Executor ${executor.executorId()} (${executor.host()}:${executor.port()}):")
println(s" Running tasks: ${executor.numRunningTasks()}")
println(s" Completed tasks: ${executor.numCompletedTasks()}")
println(s" Failed tasks: ${executor.numFailedTasks()}")
println(f" Memory usage: $memoryUsagePercent%.1f%% (${formatBytes(executor.usedOnHeapStorageMemory() + executor.usedOffHeapStorageMemory())}/${formatBytes(executor.maxMemory())})")
println(s" Cache size: ${formatBytes(executor.cacheSize())}")
println()
}
}
def formatBytes(bytes: Long): String = {
val units = Array("B", "KB", "MB", "GB", "TB")
var size = bytes.toDouble
var unitIndex = 0
while (size >= 1024 && unitIndex < units.length - 1) {
size /= 1024
unitIndex += 1
}
f"$size%.1f ${units(unitIndex)}"
}
// Monitor executor status
printExecutorStatus(statusTracker)import scala.collection.mutable
import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
class JobProgressMonitor(sc: SparkContext) {
private val statusTracker = sc.statusTracker
private val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1)
private val progressCallbacks = mutable.Map[Int, Double => Unit]()
def monitorJob(jobGroup: String, callback: Double => Unit): Unit = {
scheduler.scheduleAtFixedRate(new Runnable {
def run(): Unit = {
try {
val jobs = statusTracker.getJobIdsForGroup(jobGroup)
if (jobs.nonEmpty) {
val totalProgress = jobs.map { jobId =>
statusTracker.getJobInfo(jobId) match {
case Some(jobInfo) =>
if (jobInfo.numTasks() > 0) {
jobInfo.numCompletedTasks().toDouble / jobInfo.numTasks()
} else 0.0
case None => 0.0
}
}.sum / jobs.length
callback(totalProgress * 100)
}
} catch {
case e: Exception =>
println(s"Monitoring error: ${e.getMessage}")
}
}
}, 0, 1, TimeUnit.SECONDS)
}
def shutdown(): Unit = {
scheduler.shutdown()
}
}
// Usage
val monitor = new JobProgressMonitor(sc)
monitor.monitorJob("data-processing", { progress =>
println(f"Overall progress: $progress%.1f%%")
// Send progress to external monitoring system
if (progress % 10 == 0) { // Log every 10%
logProgressToMonitoringSystem("data-processing", progress)
}
})
// Start processing
sc.setJobGroup("data-processing", "Processing large dataset")
val results = largeDataset.map(complexProcessing).collect()
// Cleanup
monitor.shutdown()class ResourceMonitor(sc: SparkContext) {
private val statusTracker = sc.statusTracker
case class ResourceSnapshot(
timestamp: Long,
totalExecutors: Int,
activeExecutors: Int,
totalMemory: Long,
usedMemory: Long,
totalTasks: Int,
runningTasks: Int,
completedTasks: Int,
failedTasks: Int
)
def takeSnapshot(): ResourceSnapshot = {
val executors = statusTracker.getExecutorInfos()
val activeJobs = statusTracker.getActiveJobIds()
val totalMemory = executors.map(_.maxMemory()).sum
val usedMemory = executors.map(e => e.usedOnHeapStorageMemory() + e.usedOffHeapStorageMemory()).sum
val totalTasks = activeJobs.flatMap(statusTracker.getJobInfo).map(_.numTasks()).sum
val runningTasks = executors.map(_.numRunningTasks()).sum
val completedTasks = executors.map(_.numCompletedTasks()).sum
val failedTasks = executors.map(_.numFailedTasks()).sum
ResourceSnapshot(
timestamp = System.currentTimeMillis(),
totalExecutors = executors.length,
activeExecutors = executors.count(_.numRunningTasks() > 0),
totalMemory = totalMemory,
usedMemory = usedMemory,
totalTasks = totalTasks,
runningTasks = runningTasks,
completedTasks = completedTasks,
failedTasks = failedTasks
)
}
def monitorResources(intervalSeconds: Int = 10): Unit = {
val scheduler = Executors.newScheduledThreadPool(1)
scheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
val snapshot = takeSnapshot()
val memoryUtilization = if (snapshot.totalMemory > 0) {
(snapshot.usedMemory.toDouble / snapshot.totalMemory) * 100
} else 0.0
val executorUtilization = if (snapshot.totalExecutors > 0) {
(snapshot.activeExecutors.toDouble / snapshot.totalExecutors) * 100
} else 0.0
println(f"Resource Snapshot (${new java.util.Date(snapshot.timestamp)}):")
println(f" Executors: ${snapshot.activeExecutors}/${snapshot.totalExecutors} active ($executorUtilization%.1f%%)")
println(f" Memory: ${formatBytes(snapshot.usedMemory)}/${formatBytes(snapshot.totalMemory)} used ($memoryUtilization%.1f%%)")
println(f" Tasks: ${snapshot.runningTasks} running, ${snapshot.completedTasks} completed, ${snapshot.failedTasks} failed")
println()
// Send to monitoring system
sendResourceMetrics(snapshot)
}
}, 0, intervalSeconds, TimeUnit.SECONDS)
}
private def sendResourceMetrics(snapshot: ResourceSnapshot): Unit = {
// Implementation to send metrics to external monitoring system
// e.g., Prometheus, Grafana, CloudWatch, etc.
}
}
// Usage
val resourceMonitor = new ResourceMonitor(sc)
resourceMonitor.monitorResources(intervalSeconds = 5)class PerformanceAnalyzer(sc: SparkContext) {
private val statusTracker = sc.statusTracker
case class PerformanceMetrics(
stageId: Int,
stageName: String,
duration: Long,
taskCount: Int,
failureRate: Double,
avgTaskTime: Double,
slowTasks: Int
)
def analyzeStagePerformance(): Array[PerformanceMetrics] = {
val stages = statusTracker.getStageInfos()
stages.map { stage =>
val duration = if (stage.completionTime() > 0) {
stage.completionTime() - stage.submissionTime()
} else {
System.currentTimeMillis() - stage.submissionTime()
}
val failureRate = if (stage.numTasks() > 0) {
stage.numFailedTasks().toDouble / stage.numTasks()
} else 0.0
val avgTaskTime = if (stage.numCompleteTasks() > 0) {
stage.executorRunTime().toDouble / stage.numCompleteTasks()
} else 0.0
// Detect slow tasks (tasks taking > 2x average time)
val slowTasks = 0 // Would need more detailed task-level metrics
PerformanceMetrics(
stageId = stage.stageId(),
stageName = stage.name(),
duration = duration,
taskCount = stage.numTasks(),
failureRate = failureRate,
avgTaskTime = avgTaskTime,
slowTasks = slowTasks
)
}
}
def detectBottlenecks(): Unit = {
val metrics = analyzeStagePerformance()
println("Performance Analysis:")
println("=" * 80)
// Sort by duration to find longest-running stages
val slowestStages = metrics.sortBy(-_.duration).take(5)
println("Slowest Stages:")
slowestStages.foreach { metric =>
println(f" Stage ${metric.stageId} (${metric.stageName}): ${metric.duration}ms")
println(f" Tasks: ${metric.taskCount}, Avg time: ${metric.avgTaskTime}%.1fms")
println(f" Failure rate: ${metric.failureRate * 100}%.1f%%")
}
// Find stages with high failure rates
val failingStages = metrics.filter(_.failureRate > 0.1).sortBy(-_.failureRate)
if (failingStages.nonEmpty) {
println("\nStages with High Failure Rates:")
failingStages.foreach { metric =>
println(f" Stage ${metric.stageId} (${metric.stageName}): ${metric.failureRate * 100}%.1f%% failure rate")
}
}
// Recommendations
println("\nRecommendations:")
metrics.foreach { metric =>
if (metric.failureRate > 0.2) {
println(s" Stage ${metric.stageId}: High failure rate - check for data skew or resource constraints")
}
if (metric.avgTaskTime > 30000) { // > 30 seconds
println(s" Stage ${metric.stageId}: Long average task time - consider repartitioning or optimizing computation")
}
}
}
}
// Usage
val analyzer = new PerformanceAnalyzer(sc)
// Run analysis after job completion
analyzer.detectBottlenecks()import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
import scala.collection.JavaConverters._
class SparkDashboard(sc: SparkContext) {
private val statusTracker = sc.statusTracker
private val metrics = new ConcurrentHashMap[String, Any]()
private val scheduler = Executors.newScheduledThreadPool(2)
case class DashboardMetrics(
timestamp: Long,
activeJobs: Int,
completedJobs: Int,
failedJobs: Int,
activeStages: Int,
totalExecutors: Int,
activeExecutors: Int,
totalMemoryGB: Double,
usedMemoryGB: Double,
totalTasks: Int,
runningTasks: Int,
completedTasks: Int,
failedTasks: Int,
avgTaskDuration: Double
)
def startMonitoring(): Unit = {
// Update metrics every 5 seconds
scheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = updateMetrics()
}, 0, 5, TimeUnit.SECONDS)
// Print dashboard every 30 seconds
scheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = printDashboard()
}, 10, 30, TimeUnit.SECONDS)
}
private def updateMetrics(): Unit = {
try {
val executors = statusTracker.getExecutorInfos()
val activeJobs = statusTracker.getActiveJobIds()
val activeStages = statusTracker.getActiveStageIds()
val jobInfos = activeJobs.flatMap(statusTracker.getJobInfo)
val completedJobs = jobInfos.count(_.status().toString == "SUCCEEDED")
val failedJobs = jobInfos.count(_.status().toString == "FAILED")
val totalMemory = executors.map(_.maxMemory()).sum
val usedMemory = executors.map(e => e.usedOnHeapStorageMemory() + e.usedOffHeapStorageMemory()).sum
val totalTasks = jobInfos.map(_.numTasks()).sum
val runningTasks = jobInfos.map(_.numActiveTasks()).sum
val completedTasks = executors.map(_.numCompletedTasks()).sum
val failedTasks = executors.map(_.numFailedTasks()).sum
val dashboardMetrics = DashboardMetrics(
timestamp = System.currentTimeMillis(),
activeJobs = activeJobs.length,
completedJobs = completedJobs,
failedJobs = failedJobs,
activeStages = activeStages.length,
totalExecutors = executors.length,
activeExecutors = executors.count(_.numRunningTasks() > 0),
totalMemoryGB = totalMemory / (1024.0 * 1024 * 1024),
usedMemoryGB = usedMemory / (1024.0 * 1024 * 1024),
totalTasks = totalTasks,
runningTasks = runningTasks,
completedTasks = completedTasks,
failedTasks = failedTasks,
avgTaskDuration = 0.0 // Would need task-level timing data
)
metrics.put("dashboard", dashboardMetrics)
} catch {
case e: Exception =>
println(s"Error updating metrics: ${e.getMessage}")
}
}
private def printDashboard(): Unit = {
metrics.get("dashboard") match {
case dashboard: DashboardMetrics =>
val memoryUtilization = if (dashboard.totalMemoryGB > 0) {
(dashboard.usedMemoryGB / dashboard.totalMemoryGB) * 100
} else 0.0
val executorUtilization = if (dashboard.totalExecutors > 0) {
(dashboard.activeExecutors.toDouble / dashboard.totalExecutors) * 100
} else 0.0
println("\n" + "=" * 60)
println(f"SPARK APPLICATION DASHBOARD - ${new java.util.Date(dashboard.timestamp)}")
println("=" * 60)
println(f"Jobs: Active: ${dashboard.activeJobs}%3d | Completed: ${dashboard.completedJobs}%3d | Failed: ${dashboard.failedJobs}%3d")
println(f"Stages: Active: ${dashboard.activeStages}%3d")
println(f"Executors: Active: ${dashboard.activeExecutors}%3d/${dashboard.totalExecutors}%3d ($executorUtilization%.1f%%)")
println(f"Memory: Used: ${dashboard.usedMemoryGB}%.1f/${dashboard.totalMemoryGB}%.1f GB ($memoryUtilization%.1f%%)")
println(f"Tasks: Running: ${dashboard.runningTasks}%4d | Completed: ${dashboard.completedTasks}%6d | Failed: ${dashboard.failedTasks}%4d")
if (dashboard.failedTasks > 0) {
val failureRate = (dashboard.failedTasks.toDouble / (dashboard.completedTasks + dashboard.failedTasks)) * 100
println(f" Failure Rate: $failureRate%.2f%%")
}
println("=" * 60)
case _ =>
println("Dashboard metrics not available")
}
}
def getMetrics(): DashboardMetrics = {
metrics.get("dashboard").asInstanceOf[DashboardMetrics]
}
def shutdown(): Unit = {
scheduler.shutdown()
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow()
}
} catch {
case _: InterruptedException =>
scheduler.shutdownNow()
}
}
}
// Usage
val dashboard = new SparkDashboard(sc)
dashboard.startMonitoring()
// Run your Spark jobs
val results = processLargeDataset()
// Shutdown monitoring when done
dashboard.shutdown()import java.io.StringWriter
import java.net.{HttpURLConnection, URL}
class PrometheusExporter(sc: SparkContext, prometheusGatewayUrl: String) {
private val statusTracker = sc.statusTracker
def exportMetrics(jobName: String): Unit = {
val metrics = collectMetrics()
val prometheusFormat = formatForPrometheus(metrics, jobName)
pushToPrometheus(prometheusFormat, jobName)
}
private def collectMetrics(): Map[String, Double] = {
val executors = statusTracker.getExecutorInfos()
val activeJobs = statusTracker.getActiveJobIds()
val activeStages = statusTracker.getActiveStageIds()
Map(
"spark_active_jobs" -> activeJobs.length.toDouble,
"spark_active_stages" -> activeStages.length.toDouble,
"spark_total_executors" -> executors.length.toDouble,
"spark_active_executors" -> executors.count(_.numRunningTasks() > 0).toDouble,
"spark_total_memory_bytes" -> executors.map(_.maxMemory()).sum.toDouble,
"spark_used_memory_bytes" -> executors.map(e => e.usedOnHeapStorageMemory() + e.usedOffHeapStorageMemory()).sum.toDouble,
"spark_running_tasks" -> executors.map(_.numRunningTasks()).sum.toDouble,
"spark_completed_tasks" -> executors.map(_.numCompletedTasks()).sum.toDouble,
"spark_failed_tasks" -> executors.map(_.numFailedTasks()).sum.toDouble
)
}
private def formatForPrometheus(metrics: Map[String, Double], jobName: String): String = {
val writer = new StringWriter()
metrics.foreach { case (metricName, value) =>
writer.write(s"$metricName{job=\"$jobName\"} $value\n")
}
writer.toString
}
private def pushToPrometheus(data: String, jobName: String): Unit = {
try {
val url = new URL(s"$prometheusGatewayUrl/metrics/job/$jobName")
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestMethod("POST")
connection.setDoOutput(true)
connection.setRequestProperty("Content-Type", "text/plain")
val outputStream = connection.getOutputStream
outputStream.write(data.getBytes("UTF-8"))
outputStream.close()
val responseCode = connection.getResponseCode
if (responseCode != 200) {
println(s"Failed to push metrics: HTTP $responseCode")
}
} catch {
case e: Exception =>
println(s"Error pushing metrics to Prometheus: ${e.getMessage}")
}
}
}
// Usage
val exporter = new PrometheusExporter(sc, "http://prometheus-gateway:9091")
// Export metrics periodically during job execution
val scheduler = Executors.newScheduledThreadPool(1)
scheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
exporter.exportMetrics("spark-data-processing")
}
}, 0, 30, TimeUnit.SECONDS)// Efficient monitoring pattern
class EfficientMonitor(sc: SparkContext) {
private val statusTracker = sc.statusTracker
private var lastUpdate = 0L
private var cachedSnapshot: Option[ResourceSnapshot] = None
def getResourceStatus(maxAgeMs: Long = 5000): ResourceSnapshot = {
val now = System.currentTimeMillis()
if (cachedSnapshot.isEmpty || (now - lastUpdate) > maxAgeMs) {
cachedSnapshot = Some(takeSnapshot())
lastUpdate = now
}
cachedSnapshot.get
}
private def takeSnapshot(): ResourceSnapshot = {
// Implementation to collect metrics
// ...
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-core-2-11