or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcast-accumulators.mdcontext-configuration.mdindex.mdjava-api.mdkey-value-operations.mdrdd-operations.mdstatus-monitoring.mdstorage-persistence.mdtask-context.md

status-monitoring.mddocs/

0

# Status Tracking and Monitoring

1

2

Spark provides comprehensive APIs for monitoring job and stage progress, executor status, and application metrics. This enables real-time tracking of application performance and resource utilization.

3

4

## SparkStatusTracker

5

6

The main interface for programmatic access to job and stage status information.

7

8

```scala { .api }

9

class SparkStatusTracker {

10

// Job Monitoring

11

def getJobIdsForGroup(jobGroup: String): Array[Int]

12

def getActiveStageIds(): Array[Int]

13

def getActiveJobIds(): Array[Int]

14

def getJobInfo(jobId: Int): Option[SparkJobInfo]

15

def getStageInfo(stageId: Int): Option[SparkStageInfo]

16

def getStageInfos(): Array[SparkStageInfo]

17

18

// Executor Monitoring

19

def getExecutorInfos(): Array[SparkExecutorInfo]

20

}

21

```

22

23

## Status Information Classes

24

25

### SparkJobInfo

26

27

```java { .api }

28

public class SparkJobInfo {

29

public int jobId();

30

public int[] stageIds();

31

public JobExecutionStatus status();

32

public int numTasks();

33

public int numActiveTasks();

34

public int numCompletedTasks();

35

public int numSkippedTasks();

36

public int numFailedTasks();

37

public int numKilledTasks();

38

public int numActiveStages();

39

public int numCompletedStages();

40

public int numSkippedStages();

41

public int numFailedStages();

42

}

43

44

public enum JobExecutionStatus {

45

RUNNING, SUCCEEDED, FAILED, UNKNOWN

46

}

47

```

48

49

### SparkStageInfo

50

51

```java { .api }

52

public class SparkStageInfo {

53

public int stageId();

54

public int currentAttemptId();

55

public long submissionTime();

56

public String name();

57

public int numTasks();

58

public int numActiveTasks();

59

public int numCompleteTasks();

60

public int numFailedTasks();

61

public int numKilledTasks();

62

public long executorRunTime();

63

public long executorCpuTime();

64

public long submissionTime();

65

public long firstTaskLaunchedTime();

66

public long completionTime();

67

public long failureReason();

68

}

69

```

70

71

### SparkExecutorInfo

72

73

```java { .api }

74

public class SparkExecutorInfo {

75

public String executorId();

76

public String host();

77

public int port();

78

public long cacheSize();

79

public int numRunningTasks();

80

public int numCompletedTasks();

81

public int numFailedTasks();

82

public long usedOnHeapStorageMemory();

83

public long usedOffHeapStorageMemory();

84

public long totalOnHeapStorageMemory();

85

public long totalOffHeapStorageMemory();

86

public long maxMemory();

87

public long maxOnHeapStorageMemory();

88

public long maxOffHeapStorageMemory();

89

}

90

```

91

92

## Basic Status Monitoring

93

94

### Job and Stage Tracking

95

96

```scala

97

import org.apache.spark.{SparkContext, SparkConf}

98

99

val sc = new SparkContext(new SparkConf().setAppName("Status Monitoring").setMaster("local[*]"))

100

val statusTracker = sc.statusTracker

101

102

// Set job group for tracking

103

sc.setJobGroup("data-processing", "Main data processing pipeline")

104

105

// Start some work

106

val data = sc.parallelize(1 to 1000000, numSlices = 10)

107

val processed = data.map(_ * 2).filter(_ > 100).cache()

108

109

// Monitor job progress in a separate thread

110

val monitoringThread = new Thread(new Runnable {

111

def run(): Unit = {

112

while (!Thread.currentThread().isInterrupted) {

113

try {

114

// Get active jobs

115

val activeJobs = statusTracker.getActiveJobIds()

116

println(s"Active jobs: ${activeJobs.mkString(", ")}")

117

118

// Get jobs for our group

119

val groupJobs = statusTracker.getJobIdsForGroup("data-processing")

120

println(s"Jobs in data-processing group: ${groupJobs.mkString(", ")}")

121

122

// Monitor each active job

123

activeJobs.foreach { jobId =>

124

statusTracker.getJobInfo(jobId) match {

125

case Some(jobInfo) =>

126

println(s"Job $jobId:")

127

println(s" Status: ${jobInfo.status()}")

128

println(s" Tasks: ${jobInfo.numCompletedTasks()}/${jobInfo.numTasks()} completed")

129

println(s" Stages: ${jobInfo.numCompletedStages()}/${jobInfo.stageIds().length} completed")

130

131

case None =>

132

println(s"Job $jobId: No info available")

133

}

134

}

135

136

// Monitor active stages

137

val activeStages = statusTracker.getActiveStageIds()

138

activeStages.foreach { stageId =>

139

statusTracker.getStageInfo(stageId) match {

140

case Some(stageInfo) =>

141

val progress = if (stageInfo.numTasks() > 0) {

142

(stageInfo.numCompleteTasks().toDouble / stageInfo.numTasks()) * 100

143

} else 0.0

144

145

println(s"Stage $stageId (${stageInfo.name()}):")

146

println(f" Progress: $progress%.1f%% (${stageInfo.numCompleteTasks()}/${stageInfo.numTasks()} tasks)")

147

println(s" Active tasks: ${stageInfo.numActiveTasks()}")

148

println(s" Failed tasks: ${stageInfo.numFailedTasks()}")

149

150

case None =>

151

println(s"Stage $stageId: No info available")

152

}

153

}

154

155

Thread.sleep(2000) // Update every 2 seconds

156

} catch {

157

case _: InterruptedException => return

158

case e: Exception => println(s"Monitoring error: ${e.getMessage}")

159

}

160

}

161

}

162

})

163

164

monitoringThread.start()

165

166

// Trigger computation

167

val result = processed.count()

168

println(s"Final result: $result")

169

170

// Stop monitoring

171

monitoringThread.interrupt()

172

monitoringThread.join()

173

```

174

175

### Executor Monitoring

176

177

```scala

178

def printExecutorStatus(statusTracker: SparkStatusTracker): Unit = {

179

val executors = statusTracker.getExecutorInfos()

180

181

println(s"Total executors: ${executors.length}")

182

println("-" * 80)

183

184

executors.foreach { executor =>

185

val memoryUsagePercent = if (executor.maxMemory() > 0) {

186

((executor.usedOnHeapStorageMemory() + executor.usedOffHeapStorageMemory()).toDouble / executor.maxMemory()) * 100

187

} else 0.0

188

189

println(s"Executor ${executor.executorId()} (${executor.host()}:${executor.port()}):")

190

println(s" Running tasks: ${executor.numRunningTasks()}")

191

println(s" Completed tasks: ${executor.numCompletedTasks()}")

192

println(s" Failed tasks: ${executor.numFailedTasks()}")

193

println(f" Memory usage: $memoryUsagePercent%.1f%% (${formatBytes(executor.usedOnHeapStorageMemory() + executor.usedOffHeapStorageMemory())}/${formatBytes(executor.maxMemory())})")

194

println(s" Cache size: ${formatBytes(executor.cacheSize())}")

195

println()

196

}

197

}

198

199

def formatBytes(bytes: Long): String = {

200

val units = Array("B", "KB", "MB", "GB", "TB")

201

var size = bytes.toDouble

202

var unitIndex = 0

203

204

while (size >= 1024 && unitIndex < units.length - 1) {

205

size /= 1024

206

unitIndex += 1

207

}

208

209

f"$size%.1f ${units(unitIndex)}"

210

}

211

212

// Monitor executor status

213

printExecutorStatus(statusTracker)

214

```

215

216

## Advanced Monitoring Patterns

217

218

### Progress Tracking with Callbacks

219

220

```scala

221

import scala.collection.mutable

222

import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}

223

224

class JobProgressMonitor(sc: SparkContext) {

225

private val statusTracker = sc.statusTracker

226

private val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1)

227

private val progressCallbacks = mutable.Map[Int, Double => Unit]()

228

229

def monitorJob(jobGroup: String, callback: Double => Unit): Unit = {

230

scheduler.scheduleAtFixedRate(new Runnable {

231

def run(): Unit = {

232

try {

233

val jobs = statusTracker.getJobIdsForGroup(jobGroup)

234

if (jobs.nonEmpty) {

235

val totalProgress = jobs.map { jobId =>

236

statusTracker.getJobInfo(jobId) match {

237

case Some(jobInfo) =>

238

if (jobInfo.numTasks() > 0) {

239

jobInfo.numCompletedTasks().toDouble / jobInfo.numTasks()

240

} else 0.0

241

case None => 0.0

242

}

243

}.sum / jobs.length

244

245

callback(totalProgress * 100)

246

}

247

} catch {

248

case e: Exception =>

249

println(s"Monitoring error: ${e.getMessage}")

250

}

251

}

252

}, 0, 1, TimeUnit.SECONDS)

253

}

254

255

def shutdown(): Unit = {

256

scheduler.shutdown()

257

}

258

}

259

260

// Usage

261

val monitor = new JobProgressMonitor(sc)

262

263

monitor.monitorJob("data-processing", { progress =>

264

println(f"Overall progress: $progress%.1f%%")

265

266

// Send progress to external monitoring system

267

if (progress % 10 == 0) { // Log every 10%

268

logProgressToMonitoringSystem("data-processing", progress)

269

}

270

})

271

272

// Start processing

273

sc.setJobGroup("data-processing", "Processing large dataset")

274

val results = largeDataset.map(complexProcessing).collect()

275

276

// Cleanup

277

monitor.shutdown()

278

```

279

280

### Resource Utilization Tracking

281

282

```scala

283

class ResourceMonitor(sc: SparkContext) {

284

private val statusTracker = sc.statusTracker

285

286

case class ResourceSnapshot(

287

timestamp: Long,

288

totalExecutors: Int,

289

activeExecutors: Int,

290

totalMemory: Long,

291

usedMemory: Long,

292

totalTasks: Int,

293

runningTasks: Int,

294

completedTasks: Int,

295

failedTasks: Int

296

)

297

298

def takeSnapshot(): ResourceSnapshot = {

299

val executors = statusTracker.getExecutorInfos()

300

val activeJobs = statusTracker.getActiveJobIds()

301

302

val totalMemory = executors.map(_.maxMemory()).sum

303

val usedMemory = executors.map(e => e.usedOnHeapStorageMemory() + e.usedOffHeapStorageMemory()).sum

304

val totalTasks = activeJobs.flatMap(statusTracker.getJobInfo).map(_.numTasks()).sum

305

val runningTasks = executors.map(_.numRunningTasks()).sum

306

val completedTasks = executors.map(_.numCompletedTasks()).sum

307

val failedTasks = executors.map(_.numFailedTasks()).sum

308

309

ResourceSnapshot(

310

timestamp = System.currentTimeMillis(),

311

totalExecutors = executors.length,

312

activeExecutors = executors.count(_.numRunningTasks() > 0),

313

totalMemory = totalMemory,

314

usedMemory = usedMemory,

315

totalTasks = totalTasks,

316

runningTasks = runningTasks,

317

completedTasks = completedTasks,

318

failedTasks = failedTasks

319

)

320

}

321

322

def monitorResources(intervalSeconds: Int = 10): Unit = {

323

val scheduler = Executors.newScheduledThreadPool(1)

324

325

scheduler.scheduleAtFixedRate(new Runnable {

326

override def run(): Unit = {

327

val snapshot = takeSnapshot()

328

329

val memoryUtilization = if (snapshot.totalMemory > 0) {

330

(snapshot.usedMemory.toDouble / snapshot.totalMemory) * 100

331

} else 0.0

332

333

val executorUtilization = if (snapshot.totalExecutors > 0) {

334

(snapshot.activeExecutors.toDouble / snapshot.totalExecutors) * 100

335

} else 0.0

336

337

println(f"Resource Snapshot (${new java.util.Date(snapshot.timestamp)}):")

338

println(f" Executors: ${snapshot.activeExecutors}/${snapshot.totalExecutors} active ($executorUtilization%.1f%%)")

339

println(f" Memory: ${formatBytes(snapshot.usedMemory)}/${formatBytes(snapshot.totalMemory)} used ($memoryUtilization%.1f%%)")

340

println(f" Tasks: ${snapshot.runningTasks} running, ${snapshot.completedTasks} completed, ${snapshot.failedTasks} failed")

341

println()

342

343

// Send to monitoring system

344

sendResourceMetrics(snapshot)

345

}

346

}, 0, intervalSeconds, TimeUnit.SECONDS)

347

}

348

349

private def sendResourceMetrics(snapshot: ResourceSnapshot): Unit = {

350

// Implementation to send metrics to external monitoring system

351

// e.g., Prometheus, Grafana, CloudWatch, etc.

352

}

353

}

354

355

// Usage

356

val resourceMonitor = new ResourceMonitor(sc)

357

resourceMonitor.monitorResources(intervalSeconds = 5)

358

```

359

360

### Performance Bottleneck Detection

361

362

```scala

363

class PerformanceAnalyzer(sc: SparkContext) {

364

private val statusTracker = sc.statusTracker

365

366

case class PerformanceMetrics(

367

stageId: Int,

368

stageName: String,

369

duration: Long,

370

taskCount: Int,

371

failureRate: Double,

372

avgTaskTime: Double,

373

slowTasks: Int

374

)

375

376

def analyzeStagePerformance(): Array[PerformanceMetrics] = {

377

val stages = statusTracker.getStageInfos()

378

379

stages.map { stage =>

380

val duration = if (stage.completionTime() > 0) {

381

stage.completionTime() - stage.submissionTime()

382

} else {

383

System.currentTimeMillis() - stage.submissionTime()

384

}

385

386

val failureRate = if (stage.numTasks() > 0) {

387

stage.numFailedTasks().toDouble / stage.numTasks()

388

} else 0.0

389

390

val avgTaskTime = if (stage.numCompleteTasks() > 0) {

391

stage.executorRunTime().toDouble / stage.numCompleteTasks()

392

} else 0.0

393

394

// Detect slow tasks (tasks taking > 2x average time)

395

val slowTasks = 0 // Would need more detailed task-level metrics

396

397

PerformanceMetrics(

398

stageId = stage.stageId(),

399

stageName = stage.name(),

400

duration = duration,

401

taskCount = stage.numTasks(),

402

failureRate = failureRate,

403

avgTaskTime = avgTaskTime,

404

slowTasks = slowTasks

405

)

406

}

407

}

408

409

def detectBottlenecks(): Unit = {

410

val metrics = analyzeStagePerformance()

411

412

println("Performance Analysis:")

413

println("=" * 80)

414

415

// Sort by duration to find longest-running stages

416

val slowestStages = metrics.sortBy(-_.duration).take(5)

417

418

println("Slowest Stages:")

419

slowestStages.foreach { metric =>

420

println(f" Stage ${metric.stageId} (${metric.stageName}): ${metric.duration}ms")

421

println(f" Tasks: ${metric.taskCount}, Avg time: ${metric.avgTaskTime}%.1fms")

422

println(f" Failure rate: ${metric.failureRate * 100}%.1f%%")

423

}

424

425

// Find stages with high failure rates

426

val failingStages = metrics.filter(_.failureRate > 0.1).sortBy(-_.failureRate)

427

428

if (failingStages.nonEmpty) {

429

println("\nStages with High Failure Rates:")

430

failingStages.foreach { metric =>

431

println(f" Stage ${metric.stageId} (${metric.stageName}): ${metric.failureRate * 100}%.1f%% failure rate")

432

}

433

}

434

435

// Recommendations

436

println("\nRecommendations:")

437

metrics.foreach { metric =>

438

if (metric.failureRate > 0.2) {

439

println(s" Stage ${metric.stageId}: High failure rate - check for data skew or resource constraints")

440

}

441

if (metric.avgTaskTime > 30000) { // > 30 seconds

442

println(s" Stage ${metric.stageId}: Long average task time - consider repartitioning or optimizing computation")

443

}

444

}

445

}

446

}

447

448

// Usage

449

val analyzer = new PerformanceAnalyzer(sc)

450

451

// Run analysis after job completion

452

analyzer.detectBottlenecks()

453

```

454

455

### Custom Metrics Dashboard

456

457

```scala

458

import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}

459

import scala.collection.JavaConverters._

460

461

class SparkDashboard(sc: SparkContext) {

462

private val statusTracker = sc.statusTracker

463

private val metrics = new ConcurrentHashMap[String, Any]()

464

private val scheduler = Executors.newScheduledThreadPool(2)

465

466

case class DashboardMetrics(

467

timestamp: Long,

468

activeJobs: Int,

469

completedJobs: Int,

470

failedJobs: Int,

471

activeStages: Int,

472

totalExecutors: Int,

473

activeExecutors: Int,

474

totalMemoryGB: Double,

475

usedMemoryGB: Double,

476

totalTasks: Int,

477

runningTasks: Int,

478

completedTasks: Int,

479

failedTasks: Int,

480

avgTaskDuration: Double

481

)

482

483

def startMonitoring(): Unit = {

484

// Update metrics every 5 seconds

485

scheduler.scheduleAtFixedRate(new Runnable {

486

override def run(): Unit = updateMetrics()

487

}, 0, 5, TimeUnit.SECONDS)

488

489

// Print dashboard every 30 seconds

490

scheduler.scheduleAtFixedRate(new Runnable {

491

override def run(): Unit = printDashboard()

492

}, 10, 30, TimeUnit.SECONDS)

493

}

494

495

private def updateMetrics(): Unit = {

496

try {

497

val executors = statusTracker.getExecutorInfos()

498

val activeJobs = statusTracker.getActiveJobIds()

499

val activeStages = statusTracker.getActiveStageIds()

500

501

val jobInfos = activeJobs.flatMap(statusTracker.getJobInfo)

502

val completedJobs = jobInfos.count(_.status().toString == "SUCCEEDED")

503

val failedJobs = jobInfos.count(_.status().toString == "FAILED")

504

505

val totalMemory = executors.map(_.maxMemory()).sum

506

val usedMemory = executors.map(e => e.usedOnHeapStorageMemory() + e.usedOffHeapStorageMemory()).sum

507

508

val totalTasks = jobInfos.map(_.numTasks()).sum

509

val runningTasks = jobInfos.map(_.numActiveTasks()).sum

510

val completedTasks = executors.map(_.numCompletedTasks()).sum

511

val failedTasks = executors.map(_.numFailedTasks()).sum

512

513

val dashboardMetrics = DashboardMetrics(

514

timestamp = System.currentTimeMillis(),

515

activeJobs = activeJobs.length,

516

completedJobs = completedJobs,

517

failedJobs = failedJobs,

518

activeStages = activeStages.length,

519

totalExecutors = executors.length,

520

activeExecutors = executors.count(_.numRunningTasks() > 0),

521

totalMemoryGB = totalMemory / (1024.0 * 1024 * 1024),

522

usedMemoryGB = usedMemory / (1024.0 * 1024 * 1024),

523

totalTasks = totalTasks,

524

runningTasks = runningTasks,

525

completedTasks = completedTasks,

526

failedTasks = failedTasks,

527

avgTaskDuration = 0.0 // Would need task-level timing data

528

)

529

530

metrics.put("dashboard", dashboardMetrics)

531

} catch {

532

case e: Exception =>

533

println(s"Error updating metrics: ${e.getMessage}")

534

}

535

}

536

537

private def printDashboard(): Unit = {

538

metrics.get("dashboard") match {

539

case dashboard: DashboardMetrics =>

540

val memoryUtilization = if (dashboard.totalMemoryGB > 0) {

541

(dashboard.usedMemoryGB / dashboard.totalMemoryGB) * 100

542

} else 0.0

543

544

val executorUtilization = if (dashboard.totalExecutors > 0) {

545

(dashboard.activeExecutors.toDouble / dashboard.totalExecutors) * 100

546

} else 0.0

547

548

println("\n" + "=" * 60)

549

println(f"SPARK APPLICATION DASHBOARD - ${new java.util.Date(dashboard.timestamp)}")

550

println("=" * 60)

551

println(f"Jobs: Active: ${dashboard.activeJobs}%3d | Completed: ${dashboard.completedJobs}%3d | Failed: ${dashboard.failedJobs}%3d")

552

println(f"Stages: Active: ${dashboard.activeStages}%3d")

553

println(f"Executors: Active: ${dashboard.activeExecutors}%3d/${dashboard.totalExecutors}%3d ($executorUtilization%.1f%%)")

554

println(f"Memory: Used: ${dashboard.usedMemoryGB}%.1f/${dashboard.totalMemoryGB}%.1f GB ($memoryUtilization%.1f%%)")

555

println(f"Tasks: Running: ${dashboard.runningTasks}%4d | Completed: ${dashboard.completedTasks}%6d | Failed: ${dashboard.failedTasks}%4d")

556

557

if (dashboard.failedTasks > 0) {

558

val failureRate = (dashboard.failedTasks.toDouble / (dashboard.completedTasks + dashboard.failedTasks)) * 100

559

println(f" Failure Rate: $failureRate%.2f%%")

560

}

561

562

println("=" * 60)

563

564

case _ =>

565

println("Dashboard metrics not available")

566

}

567

}

568

569

def getMetrics(): DashboardMetrics = {

570

metrics.get("dashboard").asInstanceOf[DashboardMetrics]

571

}

572

573

def shutdown(): Unit = {

574

scheduler.shutdown()

575

try {

576

if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {

577

scheduler.shutdownNow()

578

}

579

} catch {

580

case _: InterruptedException =>

581

scheduler.shutdownNow()

582

}

583

}

584

}

585

586

// Usage

587

val dashboard = new SparkDashboard(sc)

588

dashboard.startMonitoring()

589

590

// Run your Spark jobs

591

val results = processLargeDataset()

592

593

// Shutdown monitoring when done

594

dashboard.shutdown()

595

```

596

597

## Integration with External Monitoring

598

599

### Prometheus Metrics Export

600

601

```scala

602

import java.io.StringWriter

603

import java.net.{HttpURLConnection, URL}

604

605

class PrometheusExporter(sc: SparkContext, prometheusGatewayUrl: String) {

606

private val statusTracker = sc.statusTracker

607

608

def exportMetrics(jobName: String): Unit = {

609

val metrics = collectMetrics()

610

val prometheusFormat = formatForPrometheus(metrics, jobName)

611

pushToPrometheus(prometheusFormat, jobName)

612

}

613

614

private def collectMetrics(): Map[String, Double] = {

615

val executors = statusTracker.getExecutorInfos()

616

val activeJobs = statusTracker.getActiveJobIds()

617

val activeStages = statusTracker.getActiveStageIds()

618

619

Map(

620

"spark_active_jobs" -> activeJobs.length.toDouble,

621

"spark_active_stages" -> activeStages.length.toDouble,

622

"spark_total_executors" -> executors.length.toDouble,

623

"spark_active_executors" -> executors.count(_.numRunningTasks() > 0).toDouble,

624

"spark_total_memory_bytes" -> executors.map(_.maxMemory()).sum.toDouble,

625

"spark_used_memory_bytes" -> executors.map(e => e.usedOnHeapStorageMemory() + e.usedOffHeapStorageMemory()).sum.toDouble,

626

"spark_running_tasks" -> executors.map(_.numRunningTasks()).sum.toDouble,

627

"spark_completed_tasks" -> executors.map(_.numCompletedTasks()).sum.toDouble,

628

"spark_failed_tasks" -> executors.map(_.numFailedTasks()).sum.toDouble

629

)

630

}

631

632

private def formatForPrometheus(metrics: Map[String, Double], jobName: String): String = {

633

val writer = new StringWriter()

634

635

metrics.foreach { case (metricName, value) =>

636

writer.write(s"$metricName{job=\"$jobName\"} $value\n")

637

}

638

639

writer.toString

640

}

641

642

private def pushToPrometheus(data: String, jobName: String): Unit = {

643

try {

644

val url = new URL(s"$prometheusGatewayUrl/metrics/job/$jobName")

645

val connection = url.openConnection().asInstanceOf[HttpURLConnection]

646

647

connection.setRequestMethod("POST")

648

connection.setDoOutput(true)

649

connection.setRequestProperty("Content-Type", "text/plain")

650

651

val outputStream = connection.getOutputStream

652

outputStream.write(data.getBytes("UTF-8"))

653

outputStream.close()

654

655

val responseCode = connection.getResponseCode

656

if (responseCode != 200) {

657

println(s"Failed to push metrics: HTTP $responseCode")

658

}

659

660

} catch {

661

case e: Exception =>

662

println(s"Error pushing metrics to Prometheus: ${e.getMessage}")

663

}

664

}

665

}

666

667

// Usage

668

val exporter = new PrometheusExporter(sc, "http://prometheus-gateway:9091")

669

670

// Export metrics periodically during job execution

671

val scheduler = Executors.newScheduledThreadPool(1)

672

scheduler.scheduleAtFixedRate(new Runnable {

673

override def run(): Unit = {

674

exporter.exportMetrics("spark-data-processing")

675

}

676

}, 0, 30, TimeUnit.SECONDS)

677

```

678

679

## Best Practices

680

681

### Monitoring Strategy

682

- Monitor at multiple levels: application, job, stage, and task

683

- Set up alerts for high failure rates or resource exhaustion

684

- Track trends over time, not just current values

685

- Use sampling for high-frequency metrics to avoid overhead

686

687

### Performance Optimization

688

- Status monitoring has minimal overhead but avoid excessive polling

689

- Cache status information when making multiple queries

690

- Use separate threads for monitoring to avoid blocking main computation

691

- Implement circuit breakers for external monitoring system calls

692

693

### Resource Management

694

```scala

695

// Efficient monitoring pattern

696

class EfficientMonitor(sc: SparkContext) {

697

private val statusTracker = sc.statusTracker

698

private var lastUpdate = 0L

699

private var cachedSnapshot: Option[ResourceSnapshot] = None

700

701

def getResourceStatus(maxAgeMs: Long = 5000): ResourceSnapshot = {

702

val now = System.currentTimeMillis()

703

704

if (cachedSnapshot.isEmpty || (now - lastUpdate) > maxAgeMs) {

705

cachedSnapshot = Some(takeSnapshot())

706

lastUpdate = now

707

}

708

709

cachedSnapshot.get

710

}

711

712

private def takeSnapshot(): ResourceSnapshot = {

713

// Implementation to collect metrics

714

// ...

715

}

716

}

717

```

718

719

### Error Handling and Reliability

720

- Handle cases where jobs/stages complete between status checks

721

- Implement retry logic for transient monitoring failures

722

- Gracefully handle missing or null status information

723

- Log monitoring errors separately from application errors