CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-core-2-11

Apache Spark Core - The foundational component of Apache Spark providing distributed computing capabilities including RDDs, transformations, actions, and cluster management.

Pending
Overview
Eval results
Files

status-monitoring.mddocs/

Status Tracking and Monitoring

Spark provides comprehensive APIs for monitoring job and stage progress, executor status, and application metrics. This enables real-time tracking of application performance and resource utilization.

SparkStatusTracker

The main interface for programmatic access to job and stage status information.

class SparkStatusTracker {
  // Job Monitoring
  def getJobIdsForGroup(jobGroup: String): Array[Int]
  def getActiveStageIds(): Array[Int] 
  def getActiveJobIds(): Array[Int]
  def getJobInfo(jobId: Int): Option[SparkJobInfo]
  def getStageInfo(stageId: Int): Option[SparkStageInfo]
  def getStageInfos(): Array[SparkStageInfo]
  
  // Executor Monitoring
  def getExecutorInfos(): Array[SparkExecutorInfo]
}

Status Information Classes

SparkJobInfo

public class SparkJobInfo {
    public int jobId();
    public int[] stageIds();
    public JobExecutionStatus status();
    public int numTasks();
    public int numActiveTasks();
    public int numCompletedTasks();
    public int numSkippedTasks();
    public int numFailedTasks();
    public int numKilledTasks();
    public int numActiveStages();
    public int numCompletedStages();
    public int numSkippedStages();
    public int numFailedStages();
}

public enum JobExecutionStatus {
    RUNNING, SUCCEEDED, FAILED, UNKNOWN
}

SparkStageInfo

public class SparkStageInfo {
    public int stageId();
    public int currentAttemptId();
    public long submissionTime();
    public String name();
    public int numTasks();
    public int numActiveTasks();
    public int numCompleteTasks();
    public int numFailedTasks();
    public int numKilledTasks();
    public long executorRunTime();
    public long executorCpuTime();
    public long submissionTime();
    public long firstTaskLaunchedTime();
    public long completionTime();
    public long failureReason();
}

SparkExecutorInfo

public class SparkExecutorInfo {
    public String executorId();
    public String host();
    public int port();
    public long cacheSize();
    public int numRunningTasks();
    public int numCompletedTasks();
    public int numFailedTasks();
    public long usedOnHeapStorageMemory();
    public long usedOffHeapStorageMemory();
    public long totalOnHeapStorageMemory();
    public long totalOffHeapStorageMemory();
    public long maxMemory();
    public long maxOnHeapStorageMemory();
    public long maxOffHeapStorageMemory();
}

Basic Status Monitoring

Job and Stage Tracking

import org.apache.spark.{SparkContext, SparkConf}

val sc = new SparkContext(new SparkConf().setAppName("Status Monitoring").setMaster("local[*]"))
val statusTracker = sc.statusTracker

// Set job group for tracking
sc.setJobGroup("data-processing", "Main data processing pipeline")

// Start some work
val data = sc.parallelize(1 to 1000000, numSlices = 10)
val processed = data.map(_ * 2).filter(_ > 100).cache()

// Monitor job progress in a separate thread
val monitoringThread = new Thread(new Runnable {
  def run(): Unit = {
    while (!Thread.currentThread().isInterrupted) {
      try {
        // Get active jobs
        val activeJobs = statusTracker.getActiveJobIds()
        println(s"Active jobs: ${activeJobs.mkString(", ")}")
        
        // Get jobs for our group
        val groupJobs = statusTracker.getJobIdsForGroup("data-processing")
        println(s"Jobs in data-processing group: ${groupJobs.mkString(", ")}")
        
        // Monitor each active job
        activeJobs.foreach { jobId =>
          statusTracker.getJobInfo(jobId) match {
            case Some(jobInfo) =>
              println(s"Job $jobId:")
              println(s"  Status: ${jobInfo.status()}")
              println(s"  Tasks: ${jobInfo.numCompletedTasks()}/${jobInfo.numTasks()} completed")
              println(s"  Stages: ${jobInfo.numCompletedStages()}/${jobInfo.stageIds().length} completed")
              
            case None =>
              println(s"Job $jobId: No info available")
          }
        }
        
        // Monitor active stages
        val activeStages = statusTracker.getActiveStageIds()
        activeStages.foreach { stageId =>
          statusTracker.getStageInfo(stageId) match {
            case Some(stageInfo) =>
              val progress = if (stageInfo.numTasks() > 0) {
                (stageInfo.numCompleteTasks().toDouble / stageInfo.numTasks()) * 100
              } else 0.0
              
              println(s"Stage $stageId (${stageInfo.name()}):")
              println(f"  Progress: $progress%.1f%% (${stageInfo.numCompleteTasks()}/${stageInfo.numTasks()} tasks)")
              println(s"  Active tasks: ${stageInfo.numActiveTasks()}")
              println(s"  Failed tasks: ${stageInfo.numFailedTasks()}")
              
            case None =>
              println(s"Stage $stageId: No info available")
          }
        }
        
        Thread.sleep(2000) // Update every 2 seconds
      } catch {
        case _: InterruptedException => return
        case e: Exception => println(s"Monitoring error: ${e.getMessage}")
      }
    }
  }
})

monitoringThread.start()

// Trigger computation
val result = processed.count()
println(s"Final result: $result")

// Stop monitoring
monitoringThread.interrupt()
monitoringThread.join()

Executor Monitoring

def printExecutorStatus(statusTracker: SparkStatusTracker): Unit = {
  val executors = statusTracker.getExecutorInfos()
  
  println(s"Total executors: ${executors.length}")
  println("-" * 80)
  
  executors.foreach { executor =>
    val memoryUsagePercent = if (executor.maxMemory() > 0) {
      ((executor.usedOnHeapStorageMemory() + executor.usedOffHeapStorageMemory()).toDouble / executor.maxMemory()) * 100
    } else 0.0
    
    println(s"Executor ${executor.executorId()} (${executor.host()}:${executor.port()}):")
    println(s"  Running tasks: ${executor.numRunningTasks()}")
    println(s"  Completed tasks: ${executor.numCompletedTasks()}")
    println(s"  Failed tasks: ${executor.numFailedTasks()}")
    println(f"  Memory usage: $memoryUsagePercent%.1f%% (${formatBytes(executor.usedOnHeapStorageMemory() + executor.usedOffHeapStorageMemory())}/${formatBytes(executor.maxMemory())})")
    println(s"  Cache size: ${formatBytes(executor.cacheSize())}")
    println()
  }
}

def formatBytes(bytes: Long): String = {
  val units = Array("B", "KB", "MB", "GB", "TB")
  var size = bytes.toDouble
  var unitIndex = 0
  
  while (size >= 1024 && unitIndex < units.length - 1) {
    size /= 1024
    unitIndex += 1
  }
  
  f"$size%.1f ${units(unitIndex)}"
}

// Monitor executor status
printExecutorStatus(statusTracker)

Advanced Monitoring Patterns

Progress Tracking with Callbacks

import scala.collection.mutable
import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}

class JobProgressMonitor(sc: SparkContext) {
  private val statusTracker = sc.statusTracker
  private val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1)
  private val progressCallbacks = mutable.Map[Int, Double => Unit]()
  
  def monitorJob(jobGroup: String, callback: Double => Unit): Unit = {
    scheduler.scheduleAtFixedRate(new Runnable {
      def run(): Unit = {
        try {
          val jobs = statusTracker.getJobIdsForGroup(jobGroup)
          if (jobs.nonEmpty) {
            val totalProgress = jobs.map { jobId =>
              statusTracker.getJobInfo(jobId) match {
                case Some(jobInfo) =>
                  if (jobInfo.numTasks() > 0) {
                    jobInfo.numCompletedTasks().toDouble / jobInfo.numTasks()
                  } else 0.0
                case None => 0.0
              }
            }.sum / jobs.length
            
            callback(totalProgress * 100)
          }
        } catch {
          case e: Exception =>
            println(s"Monitoring error: ${e.getMessage}")
        }
      }
    }, 0, 1, TimeUnit.SECONDS)
  }
  
  def shutdown(): Unit = {
    scheduler.shutdown()
  }
}

// Usage
val monitor = new JobProgressMonitor(sc)

monitor.monitorJob("data-processing", { progress =>
  println(f"Overall progress: $progress%.1f%%")
  
  // Send progress to external monitoring system
  if (progress % 10 == 0) { // Log every 10%
    logProgressToMonitoringSystem("data-processing", progress)
  }
})

// Start processing
sc.setJobGroup("data-processing", "Processing large dataset")
val results = largeDataset.map(complexProcessing).collect()

// Cleanup
monitor.shutdown()

Resource Utilization Tracking

class ResourceMonitor(sc: SparkContext) {
  private val statusTracker = sc.statusTracker
  
  case class ResourceSnapshot(
    timestamp: Long,
    totalExecutors: Int,
    activeExecutors: Int,
    totalMemory: Long,
    usedMemory: Long,
    totalTasks: Int,
    runningTasks: Int,
    completedTasks: Int,
    failedTasks: Int
  )
  
  def takeSnapshot(): ResourceSnapshot = {
    val executors = statusTracker.getExecutorInfos()
    val activeJobs = statusTracker.getActiveJobIds()
    
    val totalMemory = executors.map(_.maxMemory()).sum
    val usedMemory = executors.map(e => e.usedOnHeapStorageMemory() + e.usedOffHeapStorageMemory()).sum
    val totalTasks = activeJobs.flatMap(statusTracker.getJobInfo).map(_.numTasks()).sum
    val runningTasks = executors.map(_.numRunningTasks()).sum
    val completedTasks = executors.map(_.numCompletedTasks()).sum
    val failedTasks = executors.map(_.numFailedTasks()).sum
    
    ResourceSnapshot(
      timestamp = System.currentTimeMillis(),
      totalExecutors = executors.length,
      activeExecutors = executors.count(_.numRunningTasks() > 0),
      totalMemory = totalMemory,
      usedMemory = usedMemory,
      totalTasks = totalTasks,
      runningTasks = runningTasks,
      completedTasks = completedTasks,
      failedTasks = failedTasks
    )
  }
  
  def monitorResources(intervalSeconds: Int = 10): Unit = {
    val scheduler = Executors.newScheduledThreadPool(1)
    
    scheduler.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = {
        val snapshot = takeSnapshot()
        
        val memoryUtilization = if (snapshot.totalMemory > 0) {
          (snapshot.usedMemory.toDouble / snapshot.totalMemory) * 100
        } else 0.0
        
        val executorUtilization = if (snapshot.totalExecutors > 0) {
          (snapshot.activeExecutors.toDouble / snapshot.totalExecutors) * 100
        } else 0.0
        
        println(f"Resource Snapshot (${new java.util.Date(snapshot.timestamp)}):")
        println(f"  Executors: ${snapshot.activeExecutors}/${snapshot.totalExecutors} active ($executorUtilization%.1f%%)")
        println(f"  Memory: ${formatBytes(snapshot.usedMemory)}/${formatBytes(snapshot.totalMemory)} used ($memoryUtilization%.1f%%)")
        println(f"  Tasks: ${snapshot.runningTasks} running, ${snapshot.completedTasks} completed, ${snapshot.failedTasks} failed")
        println()
        
        // Send to monitoring system
        sendResourceMetrics(snapshot)
      }
    }, 0, intervalSeconds, TimeUnit.SECONDS)
  }
  
  private def sendResourceMetrics(snapshot: ResourceSnapshot): Unit = {
    // Implementation to send metrics to external monitoring system
    // e.g., Prometheus, Grafana, CloudWatch, etc.
  }
}

// Usage
val resourceMonitor = new ResourceMonitor(sc)
resourceMonitor.monitorResources(intervalSeconds = 5)

Performance Bottleneck Detection

class PerformanceAnalyzer(sc: SparkContext) {
  private val statusTracker = sc.statusTracker
  
  case class PerformanceMetrics(
    stageId: Int,
    stageName: String,
    duration: Long,
    taskCount: Int,
    failureRate: Double,
    avgTaskTime: Double,
    slowTasks: Int
  )
  
  def analyzeStagePerformance(): Array[PerformanceMetrics] = {
    val stages = statusTracker.getStageInfos()
    
    stages.map { stage =>
      val duration = if (stage.completionTime() > 0) {
        stage.completionTime() - stage.submissionTime()
      } else {
        System.currentTimeMillis() - stage.submissionTime()
      }
      
      val failureRate = if (stage.numTasks() > 0) {
        stage.numFailedTasks().toDouble / stage.numTasks()
      } else 0.0
      
      val avgTaskTime = if (stage.numCompleteTasks() > 0) {
        stage.executorRunTime().toDouble / stage.numCompleteTasks()
      } else 0.0
      
      // Detect slow tasks (tasks taking > 2x average time)
      val slowTasks = 0 // Would need more detailed task-level metrics
      
      PerformanceMetrics(
        stageId = stage.stageId(),
        stageName = stage.name(),
        duration = duration,
        taskCount = stage.numTasks(),
        failureRate = failureRate,
        avgTaskTime = avgTaskTime,
        slowTasks = slowTasks
      )
    }
  }
  
  def detectBottlenecks(): Unit = {
    val metrics = analyzeStagePerformance()
    
    println("Performance Analysis:")
    println("=" * 80)
    
    // Sort by duration to find longest-running stages
    val slowestStages = metrics.sortBy(-_.duration).take(5)
    
    println("Slowest Stages:")
    slowestStages.foreach { metric =>
      println(f"  Stage ${metric.stageId} (${metric.stageName}): ${metric.duration}ms")
      println(f"    Tasks: ${metric.taskCount}, Avg time: ${metric.avgTaskTime}%.1fms")
      println(f"    Failure rate: ${metric.failureRate * 100}%.1f%%")
    }
    
    // Find stages with high failure rates
    val failingStages = metrics.filter(_.failureRate > 0.1).sortBy(-_.failureRate)
    
    if (failingStages.nonEmpty) {
      println("\nStages with High Failure Rates:")
      failingStages.foreach { metric =>
        println(f"  Stage ${metric.stageId} (${metric.stageName}): ${metric.failureRate * 100}%.1f%% failure rate")
      }
    }
    
    // Recommendations
    println("\nRecommendations:")
    metrics.foreach { metric =>
      if (metric.failureRate > 0.2) {
        println(s"  Stage ${metric.stageId}: High failure rate - check for data skew or resource constraints")
      }
      if (metric.avgTaskTime > 30000) { // > 30 seconds
        println(s"  Stage ${metric.stageId}: Long average task time - consider repartitioning or optimizing computation")
      }
    }
  }
}

// Usage
val analyzer = new PerformanceAnalyzer(sc)

// Run analysis after job completion
analyzer.detectBottlenecks()

Custom Metrics Dashboard

import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
import scala.collection.JavaConverters._

class SparkDashboard(sc: SparkContext) {
  private val statusTracker = sc.statusTracker
  private val metrics = new ConcurrentHashMap[String, Any]()
  private val scheduler = Executors.newScheduledThreadPool(2)
  
  case class DashboardMetrics(
    timestamp: Long,
    activeJobs: Int,
    completedJobs: Int,
    failedJobs: Int,
    activeStages: Int,
    totalExecutors: Int,
    activeExecutors: Int,
    totalMemoryGB: Double,
    usedMemoryGB: Double,
    totalTasks: Int,
    runningTasks: Int,
    completedTasks: Int,
    failedTasks: Int,
    avgTaskDuration: Double
  )
  
  def startMonitoring(): Unit = {
    // Update metrics every 5 seconds
    scheduler.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = updateMetrics()
    }, 0, 5, TimeUnit.SECONDS)
    
    // Print dashboard every 30 seconds
    scheduler.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = printDashboard()
    }, 10, 30, TimeUnit.SECONDS)
  }
  
  private def updateMetrics(): Unit = {
    try {
      val executors = statusTracker.getExecutorInfos()
      val activeJobs = statusTracker.getActiveJobIds()
      val activeStages = statusTracker.getActiveStageIds()
      
      val jobInfos = activeJobs.flatMap(statusTracker.getJobInfo)
      val completedJobs = jobInfos.count(_.status().toString == "SUCCEEDED")
      val failedJobs = jobInfos.count(_.status().toString == "FAILED")
      
      val totalMemory = executors.map(_.maxMemory()).sum
      val usedMemory = executors.map(e => e.usedOnHeapStorageMemory() + e.usedOffHeapStorageMemory()).sum
      
      val totalTasks = jobInfos.map(_.numTasks()).sum
      val runningTasks = jobInfos.map(_.numActiveTasks()).sum
      val completedTasks = executors.map(_.numCompletedTasks()).sum
      val failedTasks = executors.map(_.numFailedTasks()).sum
      
      val dashboardMetrics = DashboardMetrics(
        timestamp = System.currentTimeMillis(),
        activeJobs = activeJobs.length,
        completedJobs = completedJobs,
        failedJobs = failedJobs,
        activeStages = activeStages.length,
        totalExecutors = executors.length,
        activeExecutors = executors.count(_.numRunningTasks() > 0),
        totalMemoryGB = totalMemory / (1024.0 * 1024 * 1024),
        usedMemoryGB = usedMemory / (1024.0 * 1024 * 1024),
        totalTasks = totalTasks,
        runningTasks = runningTasks,
        completedTasks = completedTasks,
        failedTasks = failedTasks,
        avgTaskDuration = 0.0 // Would need task-level timing data
      )
      
      metrics.put("dashboard", dashboardMetrics)
    } catch {
      case e: Exception =>
        println(s"Error updating metrics: ${e.getMessage}")
    }
  }
  
  private def printDashboard(): Unit = {
    metrics.get("dashboard") match {
      case dashboard: DashboardMetrics =>
        val memoryUtilization = if (dashboard.totalMemoryGB > 0) {
          (dashboard.usedMemoryGB / dashboard.totalMemoryGB) * 100
        } else 0.0
        
        val executorUtilization = if (dashboard.totalExecutors > 0) {
          (dashboard.activeExecutors.toDouble / dashboard.totalExecutors) * 100
        } else 0.0
        
        println("\n" + "=" * 60)
        println(f"SPARK APPLICATION DASHBOARD - ${new java.util.Date(dashboard.timestamp)}")
        println("=" * 60)
        println(f"Jobs:      Active: ${dashboard.activeJobs}%3d | Completed: ${dashboard.completedJobs}%3d | Failed: ${dashboard.failedJobs}%3d")
        println(f"Stages:    Active: ${dashboard.activeStages}%3d")
        println(f"Executors: Active: ${dashboard.activeExecutors}%3d/${dashboard.totalExecutors}%3d ($executorUtilization%.1f%%)")
        println(f"Memory:    Used: ${dashboard.usedMemoryGB}%.1f/${dashboard.totalMemoryGB}%.1f GB ($memoryUtilization%.1f%%)")
        println(f"Tasks:     Running: ${dashboard.runningTasks}%4d | Completed: ${dashboard.completedTasks}%6d | Failed: ${dashboard.failedTasks}%4d")
        
        if (dashboard.failedTasks > 0) {
          val failureRate = (dashboard.failedTasks.toDouble / (dashboard.completedTasks + dashboard.failedTasks)) * 100
          println(f"           Failure Rate: $failureRate%.2f%%")
        }
        
        println("=" * 60)
        
      case _ =>
        println("Dashboard metrics not available")
    }
  }
  
  def getMetrics(): DashboardMetrics = {
    metrics.get("dashboard").asInstanceOf[DashboardMetrics]
  }
  
  def shutdown(): Unit = {
    scheduler.shutdown()
    try {
      if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
        scheduler.shutdownNow()
      }
    } catch {
      case _: InterruptedException =>
        scheduler.shutdownNow()
    }
  }
}

// Usage
val dashboard = new SparkDashboard(sc)
dashboard.startMonitoring()

// Run your Spark jobs
val results = processLargeDataset()

// Shutdown monitoring when done
dashboard.shutdown()

Integration with External Monitoring

Prometheus Metrics Export

import java.io.StringWriter
import java.net.{HttpURLConnection, URL}

class PrometheusExporter(sc: SparkContext, prometheusGatewayUrl: String) {
  private val statusTracker = sc.statusTracker
  
  def exportMetrics(jobName: String): Unit = {
    val metrics = collectMetrics()
    val prometheusFormat = formatForPrometheus(metrics, jobName)
    pushToPrometheus(prometheusFormat, jobName)
  }
  
  private def collectMetrics(): Map[String, Double] = {
    val executors = statusTracker.getExecutorInfos()
    val activeJobs = statusTracker.getActiveJobIds()
    val activeStages = statusTracker.getActiveStageIds()
    
    Map(
      "spark_active_jobs" -> activeJobs.length.toDouble,
      "spark_active_stages" -> activeStages.length.toDouble,
      "spark_total_executors" -> executors.length.toDouble,
      "spark_active_executors" -> executors.count(_.numRunningTasks() > 0).toDouble,
      "spark_total_memory_bytes" -> executors.map(_.maxMemory()).sum.toDouble,
      "spark_used_memory_bytes" -> executors.map(e => e.usedOnHeapStorageMemory() + e.usedOffHeapStorageMemory()).sum.toDouble,
      "spark_running_tasks" -> executors.map(_.numRunningTasks()).sum.toDouble,
      "spark_completed_tasks" -> executors.map(_.numCompletedTasks()).sum.toDouble,
      "spark_failed_tasks" -> executors.map(_.numFailedTasks()).sum.toDouble
    )
  }
  
  private def formatForPrometheus(metrics: Map[String, Double], jobName: String): String = {
    val writer = new StringWriter()
    
    metrics.foreach { case (metricName, value) =>
      writer.write(s"$metricName{job=\"$jobName\"} $value\n")
    }
    
    writer.toString
  }
  
  private def pushToPrometheus(data: String, jobName: String): Unit = {
    try {
      val url = new URL(s"$prometheusGatewayUrl/metrics/job/$jobName")
      val connection = url.openConnection().asInstanceOf[HttpURLConnection]
      
      connection.setRequestMethod("POST")
      connection.setDoOutput(true)
      connection.setRequestProperty("Content-Type", "text/plain")
      
      val outputStream = connection.getOutputStream
      outputStream.write(data.getBytes("UTF-8"))
      outputStream.close()
      
      val responseCode = connection.getResponseCode
      if (responseCode != 200) {
        println(s"Failed to push metrics: HTTP $responseCode")
      }
      
    } catch {
      case e: Exception =>
        println(s"Error pushing metrics to Prometheus: ${e.getMessage}")
    }
  }
}

// Usage
val exporter = new PrometheusExporter(sc, "http://prometheus-gateway:9091")

// Export metrics periodically during job execution
val scheduler = Executors.newScheduledThreadPool(1)
scheduler.scheduleAtFixedRate(new Runnable {
  override def run(): Unit = {
    exporter.exportMetrics("spark-data-processing")
  }
}, 0, 30, TimeUnit.SECONDS)

Best Practices

Monitoring Strategy

  • Monitor at multiple levels: application, job, stage, and task
  • Set up alerts for high failure rates or resource exhaustion
  • Track trends over time, not just current values
  • Use sampling for high-frequency metrics to avoid overhead

Performance Optimization

  • Status monitoring has minimal overhead but avoid excessive polling
  • Cache status information when making multiple queries
  • Use separate threads for monitoring to avoid blocking main computation
  • Implement circuit breakers for external monitoring system calls

Resource Management

// Efficient monitoring pattern
class EfficientMonitor(sc: SparkContext) {
  private val statusTracker = sc.statusTracker
  private var lastUpdate = 0L
  private var cachedSnapshot: Option[ResourceSnapshot] = None
  
  def getResourceStatus(maxAgeMs: Long = 5000): ResourceSnapshot = {
    val now = System.currentTimeMillis()
    
    if (cachedSnapshot.isEmpty || (now - lastUpdate) > maxAgeMs) {
      cachedSnapshot = Some(takeSnapshot())
      lastUpdate = now
    }
    
    cachedSnapshot.get
  }
  
  private def takeSnapshot(): ResourceSnapshot = {
    // Implementation to collect metrics
    // ... 
  }
}

Error Handling and Reliability

  • Handle cases where jobs/stages complete between status checks
  • Implement retry logic for transient monitoring failures
  • Gracefully handle missing or null status information
  • Log monitoring errors separately from application errors

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-core-2-11

docs

broadcast-accumulators.md

context-configuration.md

index.md

java-api.md

key-value-operations.md

rdd-operations.md

status-monitoring.md

storage-persistence.md

task-context.md

tile.json