Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
—
The SparkContext is the main entry point for all Spark functionality. It represents the connection to a Spark cluster and coordinates the execution of operations across the cluster. Every Spark application must create exactly one active SparkContext.
class SparkContext(config: SparkConf) extends Logging {
// Primary constructor
def this() = this(new SparkConf())
def this(master: String, appName: String, conf: SparkConf) = { /* ... */ }
def this(master: String, appName: String, sparkHome: String, jars: Seq[String], environment: Map[String, String] = Map()) = { /* ... */ }
}import org.apache.spark.{SparkContext, SparkConf}
// Using SparkConf (recommended)
val conf = new SparkConf()
.setAppName("My Application")
.setMaster("local[*]")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)// Default constructor (loads from system properties)
val sc = new SparkContext()
// With master and app name
val sc = new SparkContext("local[*]", "My App")
// Full constructor with all parameters
val sc = new SparkContext(
master = "local[*]",
appName = "My App",
sparkHome = "/path/to/spark",
jars = Seq("myapp.jar"),
environment = Map("SPARK_ENV_VAR" -> "value")
)// @DeveloperApi - for internal use, typically in YARN mode
val sc = new SparkContext(
config = conf,
preferredNodeLocationData = Map[String, Set[SplitInfo]]()
)parallelize: Distribute a local collection to form an RDD
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] // aliasval data = Array(1, 2, 3, 4, 5)
val rdd = sc.parallelize(data) // Use default parallelism
val rddWithPartitions = sc.parallelize(data, 4) // Specify 4 partitionsWith location preferences:
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]// Create RDD with preferred locations for each element
val dataWithPrefs = Seq(
(1, Seq("host1", "host2")),
(2, Seq("host3")),
(3, Seq("host1"))
)
val rdd = sc.makeRDD(dataWithPrefs)textFile: Read text files from HDFS or local filesystem
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]val lines = sc.textFile("hdfs://namenode:port/path/to/file.txt")
val linesLocal = sc.textFile("file:///local/path/file.txt")
val linesWithPartitions = sc.textFile("hdfs://path/to/file.txt", 8)wholeTextFiles: Read directory of text files as key-value pairs
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]// Returns RDD[(filename, content)]
val files = sc.wholeTextFiles("hdfs://path/to/directory/")
files.foreach { case (filename, content) =>
println(s"File: $filename, Size: ${content.length}")
}sequenceFile: Read Hadoop SequenceFiles
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]import org.apache.hadoop.io.{IntWritable, Text}
val seqFile = sc.sequenceFile[IntWritable, Text]("path/to/sequencefile",
classOf[IntWritable], classOf[Text])hadoopFile: Read files with arbitrary Hadoop InputFormat
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)]import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
val hadoopRDD = sc.hadoopFile[LongWritable, Text](
"hdfs://path/to/input",
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text]
)objectFile: Load RDD saved as SequenceFile of serialized objects
def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]// Load RDD that was previously saved with saveAsObjectFile
val restored: RDD[MyClass] = sc.objectFile[MyClass]("path/to/objects")union: Build union of a list of RDDs
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T]val rdd1 = sc.parallelize(Array(1, 2, 3))
val rdd2 = sc.parallelize(Array(4, 5, 6))
val rdd3 = sc.parallelize(Array(7, 8, 9))
val unionRDD = sc.union(Seq(rdd1, rdd2, rdd3))emptyRDD: Create empty RDD with no partitions
def emptyRDD[T: ClassTag]: RDD[T]val empty: RDD[String] = sc.emptyRDD[String]Spark provides two types of shared variables: broadcast variables and accumulators.
broadcast: Create a broadcast variable for read-only data
def broadcast[T: ClassTag](value: T): Broadcast[T]val lookupTable = Map("apple" -> 1, "banana" -> 2, "orange" -> 3)
val broadcastTable = sc.broadcast(lookupTable)
val data = sc.parallelize(Array("apple", "banana", "apple"))
val mapped = data.map(fruit => broadcastTable.value.getOrElse(fruit, 0))accumulator: Create a simple accumulator
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]accumulable: Create an accumulable with different result/element types
def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]): Accumulable[T, R]// Simple counter
val counter = sc.accumulator(0, "Error Counter")
val data = sc.parallelize(Array(1, 2, -1, 4, -5))
val positive = data.filter { x =>
if (x < 0) counter += 1 // Count negative numbers
x > 0
}
positive.count() // Trigger action
println(s"Negative numbers: ${counter.value}")
// Collection accumulator
val errorList = sc.accumulableCollection(mutable.Set[String]())// Available in SparkContext companion object
DoubleAccumulatorParam // For Double values
IntAccumulatorParam // For Int values
LongAccumulatorParam // For Long values
FloatAccumulatorParam // For Float valuesrunJob: Run a function on RDD partitions
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean = false,
resultHandler: (Int, U) => Unit = null
): Array[U]
// Simplified versions
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U]
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U]val data = sc.parallelize(1 to 100, 4)
// Run custom function on each partition
val results = sc.runJob(data, (iter: Iterator[Int]) => iter.sum)
println(s"Partition sums: ${results.mkString(", ")}")
// With task context
val results2 = sc.runJob(data, (context: TaskContext, iter: Iterator[Int]) => {
(context.partitionId, iter.size)
})submitJob: Submit job asynchronously (Experimental)
def submitJob[T, U, R](
rdd: RDD[T],
processPartition: Iterator[T] => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit,
resultFunc: => R
): SimpleFutureAction[R]setJobGroup: Assign group ID to all jobs started by this thread
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): UnitclearJobGroup: Clear the job group for this thread
def clearJobGroup(): UnitcancelJobGroup: Cancel all jobs for the given group
def cancelJobGroup(groupId: String): UnitcancelAllJobs: Cancel all scheduled or running jobs
def cancelAllJobs(): Unit// Set job group
sc.setJobGroup("etl-jobs", "ETL Processing", interruptOnCancel = true)
val data = sc.textFile("large-file.txt")
val processed = data.map(processLine)
processed.saveAsTextFile("output")
// Cancel specific job group from another thread
sc.cancelJobGroup("etl-jobs")getConf: Get a copy of the SparkContext's configuration
def getConf: SparkConfval conf = sc.getConf
val appName = conf.get("spark.app.name")
val executorMemory = conf.get("spark.executor.memory", "1g")setLocalProperty: Set local property that affects jobs submitted from this thread
def setLocalProperty(key: String, value: String): UnitgetLocalProperty: Get local property set in this thread
def getLocalProperty(key: String): String// Set properties that will be passed to tasks
sc.setLocalProperty("spark.sql.execution.id", "query-123")
sc.setLocalProperty("callSite.short", "MyApp.process")
val value = sc.getLocalProperty("spark.sql.execution.id")addFile: Add a file to be downloaded with this Spark job on every node
def addFile(path: String): Unit
def addFile(path: String, recursive: Boolean): UnitaddJar: Add a JAR dependency for all tasks to be executed on this SparkContext
def addJar(path: String): Unit// Add files that tasks can access via SparkFiles.get()
sc.addFile("/path/to/config.properties")
sc.addFile("hdfs://path/to/lookup-table.csv")
// Add JARs for task execution
sc.addJar("/path/to/dependencies.jar")
sc.addJar("hdfs://path/to/libs/mylib.jar")setCheckpointDir: Set directory for RDD checkpointing
def setCheckpointDir(directory: String): Unitsc.setCheckpointDir("hdfs://namenode/checkpoints")
val data = sc.textFile("large-dataset.txt")
val processed = data.map(complexProcessing).filter(isValid)
processed.checkpoint() // Checkpoint this RDDgetExecutorMemoryStatus: Get memory status of all executors
def getExecutorMemoryStatus: Map[String, (Long, Long)]getExecutorStorageStatus: Get storage status from all executors
def getExecutorStorageStatus: Array[StorageStatus]getRDDStorageInfo: Get information about cached/persisted RDDs
def getRDDStorageInfo: Array[RDDInfo]getPersistentRDDs: Get all currently persisted RDDs
def getPersistentRDDs: Map[Int, RDD[_]]// Check memory usage across executors
val memoryStatus = sc.getExecutorMemoryStatus
memoryStatus.foreach { case (executorId, (maxMemory, remainingMemory)) =>
println(s"Executor $executorId: ${remainingMemory}/${maxMemory} bytes available")
}
// Check cached RDDs
val cachedRDDs = sc.getRDDStorageInfo
cachedRDDs.foreach { rddInfo =>
println(s"RDD ${rddInfo.id} (${rddInfo.name}): ${rddInfo.memSize} bytes in memory")
}version: Get the version of Spark on which this application is running
def version: StringdefaultParallelism: Default level of parallelism for operations
def defaultParallelism: IntdefaultMinPartitions: Default min number of partitions for Hadoop RDDs
def defaultMinPartitions: Intprintln(s"Spark Version: ${sc.version}")
println(s"Default Parallelism: ${sc.defaultParallelism}")
println(s"Default Min Partitions: ${sc.defaultMinPartitions}")hadoopConfiguration: Access to Hadoop Configuration for reuse across operations
def hadoopConfiguration: Configurationimport org.apache.hadoop.conf.Configuration
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", "your-access-key")
hadoopConf.set("fs.s3a.secret.key", "your-secret-key")
// Now S3 operations will use these credentials
val s3Data = sc.textFile("s3a://bucket/path/to/file")stop: Shut down the SparkContext
def stop(): Unittry {
val sc = new SparkContext(conf)
// Perform Spark operations
val data = sc.textFile("input.txt")
val result = data.map(_.toUpperCase).collect()
} finally {
sc.stop() // Always stop the context
}stop() to release resources// Proper pattern for SparkContext usage
val conf = new SparkConf()
.setAppName("My Spark Application")
.setMaster("local[*]")
val sc = new SparkContext(conf)
try {
// All Spark operations here
} finally {
sc.stop()
}The SparkContext is the foundation of all Spark applications and understanding its API is essential for effective Spark programming.
Install with Tessl CLI
npx tessl i tessl/maven-apache-spark