Core entry point for Spark applications providing configuration management, resource coordination, and the foundation for all Spark operations.
Main entry point for Spark functionality representing the connection to a Spark cluster. Used to create RDDs, accumulators, and broadcast variables.
/**
* Main entry point for Spark functionality
* @param config SparkConf object describing application configuration
*/
class SparkContext(config: SparkConf) extends ExecutorAllocationClient
// Core RDD creation methods
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]
def emptyRDD[T: ClassTag]: RDD[T]
def range(start: Long, end: Long, step: Long = 1, numSlices: Int = defaultParallelism): RDD[Long]
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T]
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T]
// File-based RDD creation
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]
def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration): RDD[Array[Byte]]
def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]
// Hadoop integration
def hadoopRDD[K, V](conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
def hadoopFile[K, V](path: String, inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String, fClass: Class[F],
kClass: Class[K], vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)]
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](conf: Configuration = hadoopConfiguration,
fClass: Class[F], kClass: Class[K], vClass: Class[V]): RDD[(K, V)]
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
def hadoopConfiguration: Configuration
// Shared variables
def broadcast[T: ClassTag](value: T): Broadcast[T]
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]): Accumulator[T]
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]): Accumulable[R, T]
def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]): Accumulable[R, T]
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T](initialValue: R): Accumulable[R, T]
// Resource management
def stop(): Unit
def setCheckpointDir(directory: String): Unit
def getCheckpointDir: Option[String]
def requestExecutors(numAdditionalExecutors: Int): Boolean
def killExecutors(executorIds: Seq[String]): Boolean
def killExecutor(executorId: String): Boolean
// Job execution and management
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]): Array[U]
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]): Array[U]
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U]
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U]
def submitJob[T, U, R](rdd: RDD[T], processPartition: Iterator[T] => U, partitions: Seq[Int],
resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R]
def runApproximateJob[T, U, R](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U,
evaluator: ApproximateEvaluator[U, R], timeout: Long): PartialResult[R]
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
def setJobDescription(value: String): Unit
def clearJobGroup(): Unit
def cancelJobGroup(groupId: String): Unit
def cancelAllJobs(): Unit
// Files and JARs
def addFile(path: String): Unit
def addFile(path: String, recursive: Boolean): Unit
def addJar(path: String): Unit
def setLocalProperty(key: String, value: String): Unit
def getLocalProperty(key: String): String
// Status and monitoring
def statusTracker: SparkStatusTracker
def getExecutorMemoryStatus: Map[String, (Long, Long)]
def getExecutorStorageStatus: Array[StorageStatus]
def getRDDStorageInfo: Array[RDDInfo]
def getPersistentRDDs: Map[Int, RDD[_]]
def getAllPools: Seq[Schedulable]
def getPoolForName(pool: String): Option[Schedulable]
def getSchedulingMode: SchedulingMode.SchedulingMode
def metricsSystem: MetricsSystem
def addSparkListener(listener: SparkListener): Unit
// Configuration and context properties
def getConf: SparkConf
def setLogLevel(logLevel: String): Unit
def master: String
def appName: String
def applicationId: String
def applicationAttemptId: Option[String]
def version: String
def startTime: Long
def isLocal: Boolean
def isStopped: Boolean
def defaultParallelism: Int
def defaultMinPartitions: Int
// Call site management
def setCallSite(shortCallSite: String): Unit
def clearCallSite(): UnitStatic methods for SparkContext management and utility functions.
object SparkContext {
/**
* Get existing SparkContext or create a new one with provided configuration
*/
def getOrCreate(config: SparkConf): SparkContext
def getOrCreate(): SparkContext
/**
* Find JAR containing the given class
*/
def jarOfClass(cls: Class[_]): Option[String]
def jarOfObject(obj: AnyRef): Option[String]
}Usage Examples:
import org.apache.spark.{SparkContext, SparkConf}
// Basic SparkContext creation
val conf = new SparkConf().setAppName("MyApp").setMaster("local[2]")
val sc = new SparkContext(conf)
// Alternative constructor
val sc2 = new SparkContext("local[*]", "MyApp")
// Create RDDs from collections
val data = List(1, 2, 3, 4, 5)
val rdd = sc.parallelize(data)
val rdd2 = sc.parallelize(data, numSlices = 4) // specify partitions
// Create RDDs from files
val textRDD = sc.textFile("hdfs://path/to/file.txt")
val wholeFiles = sc.wholeTextFiles("hdfs://path/to/directory")
// Resource management
sc.setCheckpointDir("hdfs://path/to/checkpoint")
sc.addFile("path/to/config.properties")
sc.addJar("path/to/dependency.jar")
// Job management
sc.setJobGroup("data-processing", "Processing user data", interruptOnCancel = true)
// ... run some jobs
sc.clearJobGroup()
// Clean shutdown
sc.stop()Configuration object for Spark applications containing key-value settings that control Spark's behavior.
/**
* Configuration for a Spark application
*/
class SparkConf(loadDefaults: Boolean = true)
// Configuration setting methods
def set(key: String, value: String): SparkConf
def setMaster(master: String): SparkConf
def setAppName(name: String): SparkConf
def setJars(jars: Seq[String]): SparkConf
def setJars(jars: Array[String]): SparkConf
def setExecutorEnv(variable: String, value: String): SparkConf
def setExecutorEnv(variables: Seq[(String, String)]): SparkConf
def setExecutorEnv(variables: Array[(String, String)]): SparkConf
// Configuration retrieval methods
def get(key: String): String
def get(key: String, defaultValue: String): String
def getOption(key: String): Option[String]
def getInt(key: String, defaultValue: Int): Int
def getLong(key: String, defaultValue: Long): Long
def getDouble(key: String, defaultValue: Double): Double
def getBoolean(key: String, defaultValue: Boolean): Boolean
def getSizeAsBytes(key: String, defaultValue: String): Long
def getSizeAsKb(key: String, defaultValue: String): Long
def getSizeAsMb(key: String, defaultValue: String): Long
def getSizeAsGb(key: String, defaultValue: String): Long
def getTimeAsMs(key: String, defaultValue: String): Long
def getTimeAsSeconds(key: String, defaultValue: String): Long
// Configuration introspection
def contains(key: String): Boolean
def getAll: Array[(String, String)]
def remove(key: String): SparkConf
def clone(): SparkConf
// Validation
def validateSettings(): SparkConfUsage Examples:
import org.apache.spark.SparkConf
// Basic configuration
val conf = new SparkConf()
.setAppName("Data Processing App")
.setMaster("spark://master:7077")
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "4")
// Advanced configuration
val advancedConf = new SparkConf()
.setAppName("Advanced App")
.setMaster("yarn")
.setJars(Array("path/to/app.jar", "path/to/dependency.jar"))
.setExecutorEnv("PYTHON_PATH", "/opt/python")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// Reading configuration values
val appName = conf.get("spark.app.name")
val executorMemory = conf.get("spark.executor.memory", "1g")
val executorCores = conf.getInt("spark.executor.cores", 2)
val isAdaptiveEnabled = conf.getBoolean("spark.sql.adaptive.enabled", false)
// Configuration validation and introspection
conf.validateSettings()
val allSettings = conf.getAll
println(s"All settings: ${allSettings.mkString(", ")}")Utility object for resolving paths to files added through SparkContext.addFile().
/**
* Resolves paths to files added through SparkContext.addFile()
*/
object SparkFiles {
/**
* Get the absolute path of a file added through SparkContext.addFile()
* @param filename name of the file
* @return absolute path to the file
*/
def get(filename: String): String
/**
* Get the root directory that contains files added through SparkContext.addFile()
* @return path to the root directory
*/
def getRootDirectory(): String
}Low-level status reporting APIs for monitoring job and stage progress.
/**
* Low-level status reporting APIs for monitoring job and stage progress
*/
class SparkStatusTracker(sc: SparkContext) {
/**
* Return a list of all known jobs in a particular job group
*/
def getJobIdsForGroup(jobGroup: String): Array[Int]
/**
* Returns an array containing the IDs of all active jobs
*/
def getActiveJobIds(): Array[Int]
/**
* Returns an array containing the IDs of all active stages
*/
def getActiveStageIds(): Array[Int]
/**
* Returns stage information, or None if the stage info is not available
*/
def getStageInfo(stageId: Int): Option[SparkStageInfo]
/**
* Returns information of all executors known to this SparkContext
*/
def getExecutorInfos(): Array[SparkExecutorInfo]
}Common exceptions that may be thrown:
Example Error Handling:
import org.apache.spark.{SparkContext, SparkConf, SparkException}
try {
val conf = new SparkConf().setAppName("MyApp")
val sc = new SparkContext(conf)
// Your Spark operations here
} catch {
case e: SparkException =>
println(s"Spark error: ${e.getMessage}")
case e: IllegalArgumentException =>
println(s"Invalid configuration: ${e.getMessage}")
case e: Exception =>
println(s"Unexpected error: ${e.getMessage}")
} finally {
if (sc != null && !sc.isStopped) {
sc.stop()
}
}