0
# Status Tracking and Monitoring
1
2
Spark provides comprehensive APIs for monitoring job and stage progress, executor status, and application metrics. This enables real-time tracking of application performance and resource utilization.
3
4
## SparkStatusTracker
5
6
The main interface for programmatic access to job and stage status information.
7
8
```scala { .api }
9
class SparkStatusTracker {
10
// Job Monitoring
11
def getJobIdsForGroup(jobGroup: String): Array[Int]
12
def getActiveStageIds(): Array[Int]
13
def getActiveJobIds(): Array[Int]
14
def getJobInfo(jobId: Int): Option[SparkJobInfo]
15
def getStageInfo(stageId: Int): Option[SparkStageInfo]
16
def getStageInfos(): Array[SparkStageInfo]
17
18
// Executor Monitoring
19
def getExecutorInfos(): Array[SparkExecutorInfo]
20
}
21
```
22
23
## Status Information Classes
24
25
### SparkJobInfo
26
27
```java { .api }
28
public class SparkJobInfo {
29
public int jobId();
30
public int[] stageIds();
31
public JobExecutionStatus status();
32
public int numTasks();
33
public int numActiveTasks();
34
public int numCompletedTasks();
35
public int numSkippedTasks();
36
public int numFailedTasks();
37
public int numKilledTasks();
38
public int numActiveStages();
39
public int numCompletedStages();
40
public int numSkippedStages();
41
public int numFailedStages();
42
}
43
44
public enum JobExecutionStatus {
45
RUNNING, SUCCEEDED, FAILED, UNKNOWN
46
}
47
```
48
49
### SparkStageInfo
50
51
```java { .api }
52
public class SparkStageInfo {
53
public int stageId();
54
public int currentAttemptId();
55
public long submissionTime();
56
public String name();
57
public int numTasks();
58
public int numActiveTasks();
59
public int numCompleteTasks();
60
public int numFailedTasks();
61
public int numKilledTasks();
62
public long executorRunTime();
63
public long executorCpuTime();
64
public long submissionTime();
65
public long firstTaskLaunchedTime();
66
public long completionTime();
67
public long failureReason();
68
}
69
```
70
71
### SparkExecutorInfo
72
73
```java { .api }
74
public class SparkExecutorInfo {
75
public String executorId();
76
public String host();
77
public int port();
78
public long cacheSize();
79
public int numRunningTasks();
80
public int numCompletedTasks();
81
public int numFailedTasks();
82
public long usedOnHeapStorageMemory();
83
public long usedOffHeapStorageMemory();
84
public long totalOnHeapStorageMemory();
85
public long totalOffHeapStorageMemory();
86
public long maxMemory();
87
public long maxOnHeapStorageMemory();
88
public long maxOffHeapStorageMemory();
89
}
90
```
91
92
## Basic Status Monitoring
93
94
### Job and Stage Tracking
95
96
```scala
97
import org.apache.spark.{SparkContext, SparkConf}
98
99
val sc = new SparkContext(new SparkConf().setAppName("Status Monitoring").setMaster("local[*]"))
100
val statusTracker = sc.statusTracker
101
102
// Set job group for tracking
103
sc.setJobGroup("data-processing", "Main data processing pipeline")
104
105
// Start some work
106
val data = sc.parallelize(1 to 1000000, numSlices = 10)
107
val processed = data.map(_ * 2).filter(_ > 100).cache()
108
109
// Monitor job progress in a separate thread
110
val monitoringThread = new Thread(new Runnable {
111
def run(): Unit = {
112
while (!Thread.currentThread().isInterrupted) {
113
try {
114
// Get active jobs
115
val activeJobs = statusTracker.getActiveJobIds()
116
println(s"Active jobs: ${activeJobs.mkString(", ")}")
117
118
// Get jobs for our group
119
val groupJobs = statusTracker.getJobIdsForGroup("data-processing")
120
println(s"Jobs in data-processing group: ${groupJobs.mkString(", ")}")
121
122
// Monitor each active job
123
activeJobs.foreach { jobId =>
124
statusTracker.getJobInfo(jobId) match {
125
case Some(jobInfo) =>
126
println(s"Job $jobId:")
127
println(s" Status: ${jobInfo.status()}")
128
println(s" Tasks: ${jobInfo.numCompletedTasks()}/${jobInfo.numTasks()} completed")
129
println(s" Stages: ${jobInfo.numCompletedStages()}/${jobInfo.stageIds().length} completed")
130
131
case None =>
132
println(s"Job $jobId: No info available")
133
}
134
}
135
136
// Monitor active stages
137
val activeStages = statusTracker.getActiveStageIds()
138
activeStages.foreach { stageId =>
139
statusTracker.getStageInfo(stageId) match {
140
case Some(stageInfo) =>
141
val progress = if (stageInfo.numTasks() > 0) {
142
(stageInfo.numCompleteTasks().toDouble / stageInfo.numTasks()) * 100
143
} else 0.0
144
145
println(s"Stage $stageId (${stageInfo.name()}):")
146
println(f" Progress: $progress%.1f%% (${stageInfo.numCompleteTasks()}/${stageInfo.numTasks()} tasks)")
147
println(s" Active tasks: ${stageInfo.numActiveTasks()}")
148
println(s" Failed tasks: ${stageInfo.numFailedTasks()}")
149
150
case None =>
151
println(s"Stage $stageId: No info available")
152
}
153
}
154
155
Thread.sleep(2000) // Update every 2 seconds
156
} catch {
157
case _: InterruptedException => return
158
case e: Exception => println(s"Monitoring error: ${e.getMessage}")
159
}
160
}
161
}
162
})
163
164
monitoringThread.start()
165
166
// Trigger computation
167
val result = processed.count()
168
println(s"Final result: $result")
169
170
// Stop monitoring
171
monitoringThread.interrupt()
172
monitoringThread.join()
173
```
174
175
### Executor Monitoring
176
177
```scala
178
def printExecutorStatus(statusTracker: SparkStatusTracker): Unit = {
179
val executors = statusTracker.getExecutorInfos()
180
181
println(s"Total executors: ${executors.length}")
182
println("-" * 80)
183
184
executors.foreach { executor =>
185
val memoryUsagePercent = if (executor.maxMemory() > 0) {
186
((executor.usedOnHeapStorageMemory() + executor.usedOffHeapStorageMemory()).toDouble / executor.maxMemory()) * 100
187
} else 0.0
188
189
println(s"Executor ${executor.executorId()} (${executor.host()}:${executor.port()}):")
190
println(s" Running tasks: ${executor.numRunningTasks()}")
191
println(s" Completed tasks: ${executor.numCompletedTasks()}")
192
println(s" Failed tasks: ${executor.numFailedTasks()}")
193
println(f" Memory usage: $memoryUsagePercent%.1f%% (${formatBytes(executor.usedOnHeapStorageMemory() + executor.usedOffHeapStorageMemory())}/${formatBytes(executor.maxMemory())})")
194
println(s" Cache size: ${formatBytes(executor.cacheSize())}")
195
println()
196
}
197
}
198
199
def formatBytes(bytes: Long): String = {
200
val units = Array("B", "KB", "MB", "GB", "TB")
201
var size = bytes.toDouble
202
var unitIndex = 0
203
204
while (size >= 1024 && unitIndex < units.length - 1) {
205
size /= 1024
206
unitIndex += 1
207
}
208
209
f"$size%.1f ${units(unitIndex)}"
210
}
211
212
// Monitor executor status
213
printExecutorStatus(statusTracker)
214
```
215
216
## Advanced Monitoring Patterns
217
218
### Progress Tracking with Callbacks
219
220
```scala
221
import scala.collection.mutable
222
import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
223
224
class JobProgressMonitor(sc: SparkContext) {
225
private val statusTracker = sc.statusTracker
226
private val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1)
227
private val progressCallbacks = mutable.Map[Int, Double => Unit]()
228
229
def monitorJob(jobGroup: String, callback: Double => Unit): Unit = {
230
scheduler.scheduleAtFixedRate(new Runnable {
231
def run(): Unit = {
232
try {
233
val jobs = statusTracker.getJobIdsForGroup(jobGroup)
234
if (jobs.nonEmpty) {
235
val totalProgress = jobs.map { jobId =>
236
statusTracker.getJobInfo(jobId) match {
237
case Some(jobInfo) =>
238
if (jobInfo.numTasks() > 0) {
239
jobInfo.numCompletedTasks().toDouble / jobInfo.numTasks()
240
} else 0.0
241
case None => 0.0
242
}
243
}.sum / jobs.length
244
245
callback(totalProgress * 100)
246
}
247
} catch {
248
case e: Exception =>
249
println(s"Monitoring error: ${e.getMessage}")
250
}
251
}
252
}, 0, 1, TimeUnit.SECONDS)
253
}
254
255
def shutdown(): Unit = {
256
scheduler.shutdown()
257
}
258
}
259
260
// Usage
261
val monitor = new JobProgressMonitor(sc)
262
263
monitor.monitorJob("data-processing", { progress =>
264
println(f"Overall progress: $progress%.1f%%")
265
266
// Send progress to external monitoring system
267
if (progress % 10 == 0) { // Log every 10%
268
logProgressToMonitoringSystem("data-processing", progress)
269
}
270
})
271
272
// Start processing
273
sc.setJobGroup("data-processing", "Processing large dataset")
274
val results = largeDataset.map(complexProcessing).collect()
275
276
// Cleanup
277
monitor.shutdown()
278
```
279
280
### Resource Utilization Tracking
281
282
```scala
283
class ResourceMonitor(sc: SparkContext) {
284
private val statusTracker = sc.statusTracker
285
286
case class ResourceSnapshot(
287
timestamp: Long,
288
totalExecutors: Int,
289
activeExecutors: Int,
290
totalMemory: Long,
291
usedMemory: Long,
292
totalTasks: Int,
293
runningTasks: Int,
294
completedTasks: Int,
295
failedTasks: Int
296
)
297
298
def takeSnapshot(): ResourceSnapshot = {
299
val executors = statusTracker.getExecutorInfos()
300
val activeJobs = statusTracker.getActiveJobIds()
301
302
val totalMemory = executors.map(_.maxMemory()).sum
303
val usedMemory = executors.map(e => e.usedOnHeapStorageMemory() + e.usedOffHeapStorageMemory()).sum
304
val totalTasks = activeJobs.flatMap(statusTracker.getJobInfo).map(_.numTasks()).sum
305
val runningTasks = executors.map(_.numRunningTasks()).sum
306
val completedTasks = executors.map(_.numCompletedTasks()).sum
307
val failedTasks = executors.map(_.numFailedTasks()).sum
308
309
ResourceSnapshot(
310
timestamp = System.currentTimeMillis(),
311
totalExecutors = executors.length,
312
activeExecutors = executors.count(_.numRunningTasks() > 0),
313
totalMemory = totalMemory,
314
usedMemory = usedMemory,
315
totalTasks = totalTasks,
316
runningTasks = runningTasks,
317
completedTasks = completedTasks,
318
failedTasks = failedTasks
319
)
320
}
321
322
def monitorResources(intervalSeconds: Int = 10): Unit = {
323
val scheduler = Executors.newScheduledThreadPool(1)
324
325
scheduler.scheduleAtFixedRate(new Runnable {
326
override def run(): Unit = {
327
val snapshot = takeSnapshot()
328
329
val memoryUtilization = if (snapshot.totalMemory > 0) {
330
(snapshot.usedMemory.toDouble / snapshot.totalMemory) * 100
331
} else 0.0
332
333
val executorUtilization = if (snapshot.totalExecutors > 0) {
334
(snapshot.activeExecutors.toDouble / snapshot.totalExecutors) * 100
335
} else 0.0
336
337
println(f"Resource Snapshot (${new java.util.Date(snapshot.timestamp)}):")
338
println(f" Executors: ${snapshot.activeExecutors}/${snapshot.totalExecutors} active ($executorUtilization%.1f%%)")
339
println(f" Memory: ${formatBytes(snapshot.usedMemory)}/${formatBytes(snapshot.totalMemory)} used ($memoryUtilization%.1f%%)")
340
println(f" Tasks: ${snapshot.runningTasks} running, ${snapshot.completedTasks} completed, ${snapshot.failedTasks} failed")
341
println()
342
343
// Send to monitoring system
344
sendResourceMetrics(snapshot)
345
}
346
}, 0, intervalSeconds, TimeUnit.SECONDS)
347
}
348
349
private def sendResourceMetrics(snapshot: ResourceSnapshot): Unit = {
350
// Implementation to send metrics to external monitoring system
351
// e.g., Prometheus, Grafana, CloudWatch, etc.
352
}
353
}
354
355
// Usage
356
val resourceMonitor = new ResourceMonitor(sc)
357
resourceMonitor.monitorResources(intervalSeconds = 5)
358
```
359
360
### Performance Bottleneck Detection
361
362
```scala
363
class PerformanceAnalyzer(sc: SparkContext) {
364
private val statusTracker = sc.statusTracker
365
366
case class PerformanceMetrics(
367
stageId: Int,
368
stageName: String,
369
duration: Long,
370
taskCount: Int,
371
failureRate: Double,
372
avgTaskTime: Double,
373
slowTasks: Int
374
)
375
376
def analyzeStagePerformance(): Array[PerformanceMetrics] = {
377
val stages = statusTracker.getStageInfos()
378
379
stages.map { stage =>
380
val duration = if (stage.completionTime() > 0) {
381
stage.completionTime() - stage.submissionTime()
382
} else {
383
System.currentTimeMillis() - stage.submissionTime()
384
}
385
386
val failureRate = if (stage.numTasks() > 0) {
387
stage.numFailedTasks().toDouble / stage.numTasks()
388
} else 0.0
389
390
val avgTaskTime = if (stage.numCompleteTasks() > 0) {
391
stage.executorRunTime().toDouble / stage.numCompleteTasks()
392
} else 0.0
393
394
// Detect slow tasks (tasks taking > 2x average time)
395
val slowTasks = 0 // Would need more detailed task-level metrics
396
397
PerformanceMetrics(
398
stageId = stage.stageId(),
399
stageName = stage.name(),
400
duration = duration,
401
taskCount = stage.numTasks(),
402
failureRate = failureRate,
403
avgTaskTime = avgTaskTime,
404
slowTasks = slowTasks
405
)
406
}
407
}
408
409
def detectBottlenecks(): Unit = {
410
val metrics = analyzeStagePerformance()
411
412
println("Performance Analysis:")
413
println("=" * 80)
414
415
// Sort by duration to find longest-running stages
416
val slowestStages = metrics.sortBy(-_.duration).take(5)
417
418
println("Slowest Stages:")
419
slowestStages.foreach { metric =>
420
println(f" Stage ${metric.stageId} (${metric.stageName}): ${metric.duration}ms")
421
println(f" Tasks: ${metric.taskCount}, Avg time: ${metric.avgTaskTime}%.1fms")
422
println(f" Failure rate: ${metric.failureRate * 100}%.1f%%")
423
}
424
425
// Find stages with high failure rates
426
val failingStages = metrics.filter(_.failureRate > 0.1).sortBy(-_.failureRate)
427
428
if (failingStages.nonEmpty) {
429
println("\nStages with High Failure Rates:")
430
failingStages.foreach { metric =>
431
println(f" Stage ${metric.stageId} (${metric.stageName}): ${metric.failureRate * 100}%.1f%% failure rate")
432
}
433
}
434
435
// Recommendations
436
println("\nRecommendations:")
437
metrics.foreach { metric =>
438
if (metric.failureRate > 0.2) {
439
println(s" Stage ${metric.stageId}: High failure rate - check for data skew or resource constraints")
440
}
441
if (metric.avgTaskTime > 30000) { // > 30 seconds
442
println(s" Stage ${metric.stageId}: Long average task time - consider repartitioning or optimizing computation")
443
}
444
}
445
}
446
}
447
448
// Usage
449
val analyzer = new PerformanceAnalyzer(sc)
450
451
// Run analysis after job completion
452
analyzer.detectBottlenecks()
453
```
454
455
### Custom Metrics Dashboard
456
457
```scala
458
import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
459
import scala.collection.JavaConverters._
460
461
class SparkDashboard(sc: SparkContext) {
462
private val statusTracker = sc.statusTracker
463
private val metrics = new ConcurrentHashMap[String, Any]()
464
private val scheduler = Executors.newScheduledThreadPool(2)
465
466
case class DashboardMetrics(
467
timestamp: Long,
468
activeJobs: Int,
469
completedJobs: Int,
470
failedJobs: Int,
471
activeStages: Int,
472
totalExecutors: Int,
473
activeExecutors: Int,
474
totalMemoryGB: Double,
475
usedMemoryGB: Double,
476
totalTasks: Int,
477
runningTasks: Int,
478
completedTasks: Int,
479
failedTasks: Int,
480
avgTaskDuration: Double
481
)
482
483
def startMonitoring(): Unit = {
484
// Update metrics every 5 seconds
485
scheduler.scheduleAtFixedRate(new Runnable {
486
override def run(): Unit = updateMetrics()
487
}, 0, 5, TimeUnit.SECONDS)
488
489
// Print dashboard every 30 seconds
490
scheduler.scheduleAtFixedRate(new Runnable {
491
override def run(): Unit = printDashboard()
492
}, 10, 30, TimeUnit.SECONDS)
493
}
494
495
private def updateMetrics(): Unit = {
496
try {
497
val executors = statusTracker.getExecutorInfos()
498
val activeJobs = statusTracker.getActiveJobIds()
499
val activeStages = statusTracker.getActiveStageIds()
500
501
val jobInfos = activeJobs.flatMap(statusTracker.getJobInfo)
502
val completedJobs = jobInfos.count(_.status().toString == "SUCCEEDED")
503
val failedJobs = jobInfos.count(_.status().toString == "FAILED")
504
505
val totalMemory = executors.map(_.maxMemory()).sum
506
val usedMemory = executors.map(e => e.usedOnHeapStorageMemory() + e.usedOffHeapStorageMemory()).sum
507
508
val totalTasks = jobInfos.map(_.numTasks()).sum
509
val runningTasks = jobInfos.map(_.numActiveTasks()).sum
510
val completedTasks = executors.map(_.numCompletedTasks()).sum
511
val failedTasks = executors.map(_.numFailedTasks()).sum
512
513
val dashboardMetrics = DashboardMetrics(
514
timestamp = System.currentTimeMillis(),
515
activeJobs = activeJobs.length,
516
completedJobs = completedJobs,
517
failedJobs = failedJobs,
518
activeStages = activeStages.length,
519
totalExecutors = executors.length,
520
activeExecutors = executors.count(_.numRunningTasks() > 0),
521
totalMemoryGB = totalMemory / (1024.0 * 1024 * 1024),
522
usedMemoryGB = usedMemory / (1024.0 * 1024 * 1024),
523
totalTasks = totalTasks,
524
runningTasks = runningTasks,
525
completedTasks = completedTasks,
526
failedTasks = failedTasks,
527
avgTaskDuration = 0.0 // Would need task-level timing data
528
)
529
530
metrics.put("dashboard", dashboardMetrics)
531
} catch {
532
case e: Exception =>
533
println(s"Error updating metrics: ${e.getMessage}")
534
}
535
}
536
537
private def printDashboard(): Unit = {
538
metrics.get("dashboard") match {
539
case dashboard: DashboardMetrics =>
540
val memoryUtilization = if (dashboard.totalMemoryGB > 0) {
541
(dashboard.usedMemoryGB / dashboard.totalMemoryGB) * 100
542
} else 0.0
543
544
val executorUtilization = if (dashboard.totalExecutors > 0) {
545
(dashboard.activeExecutors.toDouble / dashboard.totalExecutors) * 100
546
} else 0.0
547
548
println("\n" + "=" * 60)
549
println(f"SPARK APPLICATION DASHBOARD - ${new java.util.Date(dashboard.timestamp)}")
550
println("=" * 60)
551
println(f"Jobs: Active: ${dashboard.activeJobs}%3d | Completed: ${dashboard.completedJobs}%3d | Failed: ${dashboard.failedJobs}%3d")
552
println(f"Stages: Active: ${dashboard.activeStages}%3d")
553
println(f"Executors: Active: ${dashboard.activeExecutors}%3d/${dashboard.totalExecutors}%3d ($executorUtilization%.1f%%)")
554
println(f"Memory: Used: ${dashboard.usedMemoryGB}%.1f/${dashboard.totalMemoryGB}%.1f GB ($memoryUtilization%.1f%%)")
555
println(f"Tasks: Running: ${dashboard.runningTasks}%4d | Completed: ${dashboard.completedTasks}%6d | Failed: ${dashboard.failedTasks}%4d")
556
557
if (dashboard.failedTasks > 0) {
558
val failureRate = (dashboard.failedTasks.toDouble / (dashboard.completedTasks + dashboard.failedTasks)) * 100
559
println(f" Failure Rate: $failureRate%.2f%%")
560
}
561
562
println("=" * 60)
563
564
case _ =>
565
println("Dashboard metrics not available")
566
}
567
}
568
569
def getMetrics(): DashboardMetrics = {
570
metrics.get("dashboard").asInstanceOf[DashboardMetrics]
571
}
572
573
def shutdown(): Unit = {
574
scheduler.shutdown()
575
try {
576
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
577
scheduler.shutdownNow()
578
}
579
} catch {
580
case _: InterruptedException =>
581
scheduler.shutdownNow()
582
}
583
}
584
}
585
586
// Usage
587
val dashboard = new SparkDashboard(sc)
588
dashboard.startMonitoring()
589
590
// Run your Spark jobs
591
val results = processLargeDataset()
592
593
// Shutdown monitoring when done
594
dashboard.shutdown()
595
```
596
597
## Integration with External Monitoring
598
599
### Prometheus Metrics Export
600
601
```scala
602
import java.io.StringWriter
603
import java.net.{HttpURLConnection, URL}
604
605
class PrometheusExporter(sc: SparkContext, prometheusGatewayUrl: String) {
606
private val statusTracker = sc.statusTracker
607
608
def exportMetrics(jobName: String): Unit = {
609
val metrics = collectMetrics()
610
val prometheusFormat = formatForPrometheus(metrics, jobName)
611
pushToPrometheus(prometheusFormat, jobName)
612
}
613
614
private def collectMetrics(): Map[String, Double] = {
615
val executors = statusTracker.getExecutorInfos()
616
val activeJobs = statusTracker.getActiveJobIds()
617
val activeStages = statusTracker.getActiveStageIds()
618
619
Map(
620
"spark_active_jobs" -> activeJobs.length.toDouble,
621
"spark_active_stages" -> activeStages.length.toDouble,
622
"spark_total_executors" -> executors.length.toDouble,
623
"spark_active_executors" -> executors.count(_.numRunningTasks() > 0).toDouble,
624
"spark_total_memory_bytes" -> executors.map(_.maxMemory()).sum.toDouble,
625
"spark_used_memory_bytes" -> executors.map(e => e.usedOnHeapStorageMemory() + e.usedOffHeapStorageMemory()).sum.toDouble,
626
"spark_running_tasks" -> executors.map(_.numRunningTasks()).sum.toDouble,
627
"spark_completed_tasks" -> executors.map(_.numCompletedTasks()).sum.toDouble,
628
"spark_failed_tasks" -> executors.map(_.numFailedTasks()).sum.toDouble
629
)
630
}
631
632
private def formatForPrometheus(metrics: Map[String, Double], jobName: String): String = {
633
val writer = new StringWriter()
634
635
metrics.foreach { case (metricName, value) =>
636
writer.write(s"$metricName{job=\"$jobName\"} $value\n")
637
}
638
639
writer.toString
640
}
641
642
private def pushToPrometheus(data: String, jobName: String): Unit = {
643
try {
644
val url = new URL(s"$prometheusGatewayUrl/metrics/job/$jobName")
645
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
646
647
connection.setRequestMethod("POST")
648
connection.setDoOutput(true)
649
connection.setRequestProperty("Content-Type", "text/plain")
650
651
val outputStream = connection.getOutputStream
652
outputStream.write(data.getBytes("UTF-8"))
653
outputStream.close()
654
655
val responseCode = connection.getResponseCode
656
if (responseCode != 200) {
657
println(s"Failed to push metrics: HTTP $responseCode")
658
}
659
660
} catch {
661
case e: Exception =>
662
println(s"Error pushing metrics to Prometheus: ${e.getMessage}")
663
}
664
}
665
}
666
667
// Usage
668
val exporter = new PrometheusExporter(sc, "http://prometheus-gateway:9091")
669
670
// Export metrics periodically during job execution
671
val scheduler = Executors.newScheduledThreadPool(1)
672
scheduler.scheduleAtFixedRate(new Runnable {
673
override def run(): Unit = {
674
exporter.exportMetrics("spark-data-processing")
675
}
676
}, 0, 30, TimeUnit.SECONDS)
677
```
678
679
## Best Practices
680
681
### Monitoring Strategy
682
- Monitor at multiple levels: application, job, stage, and task
683
- Set up alerts for high failure rates or resource exhaustion
684
- Track trends over time, not just current values
685
- Use sampling for high-frequency metrics to avoid overhead
686
687
### Performance Optimization
688
- Status monitoring has minimal overhead but avoid excessive polling
689
- Cache status information when making multiple queries
690
- Use separate threads for monitoring to avoid blocking main computation
691
- Implement circuit breakers for external monitoring system calls
692
693
### Resource Management
694
```scala
695
// Efficient monitoring pattern
696
class EfficientMonitor(sc: SparkContext) {
697
private val statusTracker = sc.statusTracker
698
private var lastUpdate = 0L
699
private var cachedSnapshot: Option[ResourceSnapshot] = None
700
701
def getResourceStatus(maxAgeMs: Long = 5000): ResourceSnapshot = {
702
val now = System.currentTimeMillis()
703
704
if (cachedSnapshot.isEmpty || (now - lastUpdate) > maxAgeMs) {
705
cachedSnapshot = Some(takeSnapshot())
706
lastUpdate = now
707
}
708
709
cachedSnapshot.get
710
}
711
712
private def takeSnapshot(): ResourceSnapshot = {
713
// Implementation to collect metrics
714
// ...
715
}
716
}
717
```
718
719
### Error Handling and Reliability
720
- Handle cases where jobs/stages complete between status checks
721
- Implement retry logic for transient monitoring failures
722
- Gracefully handle missing or null status information
723
- Log monitoring errors separately from application errors