or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdio-operations.mdkey-value-operations.mdpartitioning-shuffling.mdrdd-operations.mdshared-variables.mdspark-context.mdstorage-persistence.md
tile.json

spark-context.mddocs/

SparkContext and Configuration

Core entry point for Spark applications providing configuration management, resource coordination, and the foundation for all Spark operations.

Capabilities

SparkContext

Main entry point for Spark functionality representing the connection to a Spark cluster. Used to create RDDs, accumulators, and broadcast variables.

/**
 * Main entry point for Spark functionality
 * @param config SparkConf object describing application configuration
 */
class SparkContext(config: SparkConf) extends ExecutorAllocationClient

// Core RDD creation methods
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]
def emptyRDD[T: ClassTag]: RDD[T]
def range(start: Long, end: Long, step: Long = 1, numSlices: Int = defaultParallelism): RDD[Long]
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T]
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T]

// File-based RDD creation
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]
def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration): RDD[Array[Byte]]
def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]

// Hadoop integration
def hadoopRDD[K, V](conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], 
                    keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
def hadoopFile[K, V](path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], 
                     keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String, fClass: Class[F], 
                     kClass: Class[K], vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)]
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](conf: Configuration = hadoopConfiguration, 
                    fClass: Class[F], kClass: Class[K], vClass: Class[V]): RDD[(K, V)]
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], 
                       minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
def hadoopConfiguration: Configuration

// Shared variables
def broadcast[T: ClassTag](value: T): Broadcast[T]
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]): Accumulator[T]
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]): Accumulable[R, T]
def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]): Accumulable[R, T]
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T](initialValue: R): Accumulable[R, T]

// Resource management
def stop(): Unit
def setCheckpointDir(directory: String): Unit
def getCheckpointDir: Option[String]
def requestExecutors(numAdditionalExecutors: Int): Boolean
def killExecutors(executorIds: Seq[String]): Boolean
def killExecutor(executorId: String): Boolean

// Job execution and management
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]): Array[U]
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]): Array[U]
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U]
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U]
def submitJob[T, U, R](rdd: RDD[T], processPartition: Iterator[T] => U, partitions: Seq[Int], 
                       resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R]
def runApproximateJob[T, U, R](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, 
                               evaluator: ApproximateEvaluator[U, R], timeout: Long): PartialResult[R]
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
def setJobDescription(value: String): Unit
def clearJobGroup(): Unit
def cancelJobGroup(groupId: String): Unit
def cancelAllJobs(): Unit

// Files and JARs
def addFile(path: String): Unit
def addFile(path: String, recursive: Boolean): Unit
def addJar(path: String): Unit
def setLocalProperty(key: String, value: String): Unit
def getLocalProperty(key: String): String

// Status and monitoring
def statusTracker: SparkStatusTracker
def getExecutorMemoryStatus: Map[String, (Long, Long)]
def getExecutorStorageStatus: Array[StorageStatus]
def getRDDStorageInfo: Array[RDDInfo]
def getPersistentRDDs: Map[Int, RDD[_]]
def getAllPools: Seq[Schedulable]
def getPoolForName(pool: String): Option[Schedulable]
def getSchedulingMode: SchedulingMode.SchedulingMode
def metricsSystem: MetricsSystem
def addSparkListener(listener: SparkListener): Unit

// Configuration and context properties
def getConf: SparkConf
def setLogLevel(logLevel: String): Unit
def master: String
def appName: String
def applicationId: String
def applicationAttemptId: Option[String]
def version: String
def startTime: Long
def isLocal: Boolean
def isStopped: Boolean
def defaultParallelism: Int
def defaultMinPartitions: Int

// Call site management
def setCallSite(shortCallSite: String): Unit
def clearCallSite(): Unit

SparkContext Companion Object

Static methods for SparkContext management and utility functions.

object SparkContext {
  /**
   * Get existing SparkContext or create a new one with provided configuration
   */
  def getOrCreate(config: SparkConf): SparkContext
  def getOrCreate(): SparkContext
  
  /**
   * Find JAR containing the given class
   */
  def jarOfClass(cls: Class[_]): Option[String]
  def jarOfObject(obj: AnyRef): Option[String]
}

Usage Examples:

import org.apache.spark.{SparkContext, SparkConf}

// Basic SparkContext creation
val conf = new SparkConf().setAppName("MyApp").setMaster("local[2]")
val sc = new SparkContext(conf)

// Alternative constructor
val sc2 = new SparkContext("local[*]", "MyApp")

// Create RDDs from collections
val data = List(1, 2, 3, 4, 5)
val rdd = sc.parallelize(data)
val rdd2 = sc.parallelize(data, numSlices = 4) // specify partitions

// Create RDDs from files
val textRDD = sc.textFile("hdfs://path/to/file.txt")
val wholeFiles = sc.wholeTextFiles("hdfs://path/to/directory")

// Resource management
sc.setCheckpointDir("hdfs://path/to/checkpoint")
sc.addFile("path/to/config.properties")
sc.addJar("path/to/dependency.jar")

// Job management
sc.setJobGroup("data-processing", "Processing user data", interruptOnCancel = true)
// ... run some jobs
sc.clearJobGroup()

// Clean shutdown
sc.stop()

SparkConf

Configuration object for Spark applications containing key-value settings that control Spark's behavior.

/**
 * Configuration for a Spark application
 */
class SparkConf(loadDefaults: Boolean = true)

// Configuration setting methods
def set(key: String, value: String): SparkConf
def setMaster(master: String): SparkConf
def setAppName(name: String): SparkConf
def setJars(jars: Seq[String]): SparkConf
def setJars(jars: Array[String]): SparkConf
def setExecutorEnv(variable: String, value: String): SparkConf
def setExecutorEnv(variables: Seq[(String, String)]): SparkConf
def setExecutorEnv(variables: Array[(String, String)]): SparkConf

// Configuration retrieval methods
def get(key: String): String
def get(key: String, defaultValue: String): String
def getOption(key: String): Option[String]
def getInt(key: String, defaultValue: Int): Int
def getLong(key: String, defaultValue: Long): Long
def getDouble(key: String, defaultValue: Double): Double
def getBoolean(key: String, defaultValue: Boolean): Boolean
def getSizeAsBytes(key: String, defaultValue: String): Long
def getSizeAsKb(key: String, defaultValue: String): Long
def getSizeAsMb(key: String, defaultValue: String): Long
def getSizeAsGb(key: String, defaultValue: String): Long
def getTimeAsMs(key: String, defaultValue: String): Long
def getTimeAsSeconds(key: String, defaultValue: String): Long

// Configuration introspection
def contains(key: String): Boolean
def getAll: Array[(String, String)]
def remove(key: String): SparkConf
def clone(): SparkConf

// Validation
def validateSettings(): SparkConf

Usage Examples:

import org.apache.spark.SparkConf

// Basic configuration
val conf = new SparkConf()
  .setAppName("Data Processing App")
  .setMaster("spark://master:7077")
  .set("spark.executor.memory", "2g")
  .set("spark.executor.cores", "4")

// Advanced configuration
val advancedConf = new SparkConf()
  .setAppName("Advanced App")
  .setMaster("yarn")
  .setJars(Array("path/to/app.jar", "path/to/dependency.jar"))
  .setExecutorEnv("PYTHON_PATH", "/opt/python")
  .set("spark.sql.adaptive.enabled", "true")
  .set("spark.sql.adaptive.coalescePartitions.enabled", "true")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

// Reading configuration values
val appName = conf.get("spark.app.name")
val executorMemory = conf.get("spark.executor.memory", "1g")
val executorCores = conf.getInt("spark.executor.cores", 2)
val isAdaptiveEnabled = conf.getBoolean("spark.sql.adaptive.enabled", false)

// Configuration validation and introspection
conf.validateSettings()
val allSettings = conf.getAll
println(s"All settings: ${allSettings.mkString(", ")}")

SparkFiles

Utility object for resolving paths to files added through SparkContext.addFile().

/**
 * Resolves paths to files added through SparkContext.addFile()
 */
object SparkFiles {
  /**
   * Get the absolute path of a file added through SparkContext.addFile()
   * @param filename name of the file
   * @return absolute path to the file
   */
  def get(filename: String): String
  
  /**
   * Get the root directory that contains files added through SparkContext.addFile()
   * @return path to the root directory
   */
  def getRootDirectory(): String
}

SparkStatusTracker

Low-level status reporting APIs for monitoring job and stage progress.

/**
 * Low-level status reporting APIs for monitoring job and stage progress
 */
class SparkStatusTracker(sc: SparkContext) {
  /**
   * Return a list of all known jobs in a particular job group
   */
  def getJobIdsForGroup(jobGroup: String): Array[Int]
  
  /**
   * Returns an array containing the IDs of all active jobs
   */
  def getActiveJobIds(): Array[Int]
  
  /**
   * Returns an array containing the IDs of all active stages
   */
  def getActiveStageIds(): Array[Int]
  
  /**
   * Returns stage information, or None if the stage info is not available
   */
  def getStageInfo(stageId: Int): Option[SparkStageInfo]
  
  /**
   * Returns information of all executors known to this SparkContext
   */
  def getExecutorInfos(): Array[SparkExecutorInfo]
}

Error Handling

Common exceptions that may be thrown:

  • SparkException: General Spark-related errors
  • IllegalArgumentException: Invalid configuration parameters
  • IllegalStateException: Attempting operations on stopped SparkContext
  • UnsupportedOperationException: Unsupported operations in certain deployment modes

Example Error Handling:

import org.apache.spark.{SparkContext, SparkConf, SparkException}

try {
  val conf = new SparkConf().setAppName("MyApp")
  val sc = new SparkContext(conf)
  
  // Your Spark operations here
  
} catch {
  case e: SparkException => 
    println(s"Spark error: ${e.getMessage}")
  case e: IllegalArgumentException => 
    println(s"Invalid configuration: ${e.getMessage}")
  case e: Exception => 
    println(s"Unexpected error: ${e.getMessage}")
} finally {
  if (sc != null && !sc.isStopped) {
    sc.stop()
  }
}