The main entry point for Spark functionality. SparkContext represents the connection to a Spark cluster and is used to create RDDs, accumulators, and broadcast variables.
class SparkContext(config: SparkConf) {
// RDD Creation
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
// Hadoop Integration
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
conf: Configuration = hadoopConfiguration
): RDD[(K, V)]
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)]
// Shared Variables
def broadcast[T: ClassTag](value: T): Broadcast[T]
def longAccumulator(): LongAccumulator
def longAccumulator(name: String): LongAccumulator
def doubleAccumulator(): DoubleAccumulator
def doubleAccumulator(name: String): DoubleAccumulator
def collectionAccumulator[T](): CollectionAccumulator[T]
def collectionAccumulator[T](name: String): CollectionAccumulator[T]
// Configuration and Properties
def getConf: SparkConf
def master: String
def appName: String
def applicationId: String
def defaultParallelism: Int
val startTime: Long
def version: String
// File and Jar Management
def addFile(path: String): Unit
def addFile(path: String, recursive: Boolean): Unit
def addJar(path: String): Unit
// Job Management
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
def clearJobGroup(): Unit
def cancelJobGroup(groupId: String): Unit
// Additional RDD Creation
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T]
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T]
// Lifecycle Management
def stop(): Unit
def statusTracker: SparkStatusTracker
}Creating a SparkContext:
import org.apache.spark.{SparkContext, SparkConf}
val conf = new SparkConf()
.setAppName("My Application")
.setMaster("local[4]") // Use 4 local threads
val sc = new SparkContext(conf)
// Use SparkContext...
sc.stop() // Always stop when doneReading data:
// From local collection
val numbers = sc.parallelize(1 to 1000, 8) // 8 partitions
// From text file
val lines = sc.textFile("hdfs://path/to/file.txt")
// Multiple text files as (filename, content) pairs
val files = sc.wholeTextFiles("hdfs://path/to/directory/")Configuration object for Spark applications. Supports method chaining for easy configuration setup.
class SparkConf(loadDefaults: Boolean = true) {
// Core Configuration
def set(key: String, value: String): SparkConf
def setMaster(master: String): SparkConf
def setAppName(name: String): SparkConf
def setJars(jars: Seq[String]): SparkConf
def setExecutorEnv(variables: Seq[(String, String)]): SparkConf
def setExecutorEnv(variable: String, value: String): SparkConf
// Retrieval
def get(key: String): String
def get(key: String, defaultValue: String): String
def getAll: Array[(String, String)]
def contains(key: String): Boolean
def getOption(key: String): Option[String]
// Spark-specific Settings
def setSparkHome(home: String)
def setExecutorMemory(mem: String): SparkConf
def setAll(settings: Traversable[(String, String)]): SparkConf
// Management
def clone(): SparkConf
def remove(key: String): SparkConf
}Basic configuration:
val conf = new SparkConf()
.setAppName("Data Processing Job")
.setMaster("spark://master:7077")
.set("spark.executor.memory", "4g")
.set("spark.executor.cores", "4")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")Loading from properties file:
// Loads from spark-defaults.conf and system properties
val conf = new SparkConf() // loadDefaults = true by default
.setAppName("My App")Resource configuration:
val conf = new SparkConf()
.setAppName("Resource Heavy Job")
.set("spark.executor.memory", "8g")
.set("spark.executor.cores", "8")
.set("spark.executor.instances", "10")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.sql.shuffle.partitions", "400")spark.master - Master URL (local[*], spark://host:port, yarn, etc.)spark.app.name - Application namespark.executor.memory - Executor memory (e.g., "4g", "2048m")spark.executor.cores - CPU cores per executorspark.executor.instances - Number of executors (for YARN/K8s)spark.default.parallelism - Default number of partitionsspark.serializer - Serializer class (Java or Kryo)spark.kryo.registrator - Kryo registrator classspark.kryo.classesToRegister - Classes to register with Kryospark.storage.level - Default storage level for RDDsspark.storage.memoryFraction - Fraction of JVM heap for storagespark.storage.safetyFraction - Safety fraction for storage memoryspark.shuffle.service.enabled - Enable external shuffle servicespark.shuffle.compress - Compress shuffle outputspark.shuffle.spill.compress - Compress spilled data