Spec Registry
Help your agents use open-source better. Learn more.
Find usage specs for your project’s dependencies
- Author
- tessl
- Last updated
- Spec files
maven-apache-spark
Describes: maven/org.apache.spark/spark-parent
- Description
- Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
- Author
- tessl
- Last updated
spark-context.md docs/
1# SparkContext API23The SparkContext is the main entry point for all Spark functionality. It represents the connection to a Spark cluster and coordinates the execution of operations across the cluster. Every Spark application must create exactly one active SparkContext.45## SparkContext Class67```scala { .api }8class SparkContext(config: SparkConf) extends Logging {9// Primary constructor10def this() = this(new SparkConf())11def this(master: String, appName: String, conf: SparkConf) = { /* ... */ }12def this(master: String, appName: String, sparkHome: String, jars: Seq[String], environment: Map[String, String] = Map()) = { /* ... */ }13}14```1516## Creating SparkContext1718### Basic Construction1920```scala { .api }21import org.apache.spark.{SparkContext, SparkConf}2223// Using SparkConf (recommended)24val conf = new SparkConf()25.setAppName("My Application")26.setMaster("local[*]")27.set("spark.executor.memory", "2g")2829val sc = new SparkContext(conf)30```3132### Alternative Constructors3334```scala { .api }35// Default constructor (loads from system properties)36val sc = new SparkContext()3738// With master and app name39val sc = new SparkContext("local[*]", "My App")4041// Full constructor with all parameters42val sc = new SparkContext(43master = "local[*]",44appName = "My App",45sparkHome = "/path/to/spark",46jars = Seq("myapp.jar"),47environment = Map("SPARK_ENV_VAR" -> "value")48)49```5051### Developer API Constructor (for YARN)5253```scala { .api }54// @DeveloperApi - for internal use, typically in YARN mode55val sc = new SparkContext(56config = conf,57preferredNodeLocationData = Map[String, Set[SplitInfo]]()58)59```6061## RDD Creation Methods6263### From Collections6465**parallelize**: Distribute a local collection to form an RDD66```scala { .api }67def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]68def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] // alias69```7071```scala72val data = Array(1, 2, 3, 4, 5)73val rdd = sc.parallelize(data) // Use default parallelism74val rddWithPartitions = sc.parallelize(data, 4) // Specify 4 partitions75```7677**With location preferences**:78```scala { .api }79def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]80```8182```scala83// Create RDD with preferred locations for each element84val dataWithPrefs = Seq(85(1, Seq("host1", "host2")),86(2, Seq("host3")),87(3, Seq("host1"))88)89val rdd = sc.makeRDD(dataWithPrefs)90```9192### From Files9394**textFile**: Read text files from HDFS or local filesystem95```scala { .api }96def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]97```9899```scala100val lines = sc.textFile("hdfs://namenode:port/path/to/file.txt")101val linesLocal = sc.textFile("file:///local/path/file.txt")102val linesWithPartitions = sc.textFile("hdfs://path/to/file.txt", 8)103```104105**wholeTextFiles**: Read directory of text files as key-value pairs106```scala { .api }107def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]108```109110```scala111// Returns RDD[(filename, content)]112val files = sc.wholeTextFiles("hdfs://path/to/directory/")113files.foreach { case (filename, content) =>114println(s"File: $filename, Size: ${content.length}")115}116```117118### Hadoop Files119120**sequenceFile**: Read Hadoop SequenceFiles121```scala { .api }122def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]123```124125```scala126import org.apache.hadoop.io.{IntWritable, Text}127128val seqFile = sc.sequenceFile[IntWritable, Text]("path/to/sequencefile",129classOf[IntWritable], classOf[Text])130```131132**hadoopFile**: Read files with arbitrary Hadoop InputFormat133```scala { .api }134def hadoopFile[K, V](135path: String,136inputFormatClass: Class[_ <: InputFormat[K, V]],137keyClass: Class[K],138valueClass: Class[V],139minPartitions: Int = defaultMinPartitions140): RDD[(K, V)]141```142143```scala144import org.apache.hadoop.mapred.TextInputFormat145import org.apache.hadoop.io.{LongWritable, Text}146147val hadoopRDD = sc.hadoopFile[LongWritable, Text](148"hdfs://path/to/input",149classOf[TextInputFormat],150classOf[LongWritable],151classOf[Text]152)153```154155**objectFile**: Load RDD saved as SequenceFile of serialized objects156```scala { .api }157def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]158```159160```scala161// Load RDD that was previously saved with saveAsObjectFile162val restored: RDD[MyClass] = sc.objectFile[MyClass]("path/to/objects")163```164165### RDD Manipulation166167**union**: Build union of a list of RDDs168```scala { .api }169def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T]170```171172```scala173val rdd1 = sc.parallelize(Array(1, 2, 3))174val rdd2 = sc.parallelize(Array(4, 5, 6))175val rdd3 = sc.parallelize(Array(7, 8, 9))176177val unionRDD = sc.union(Seq(rdd1, rdd2, rdd3))178```179180**emptyRDD**: Create empty RDD with no partitions181```scala { .api }182def emptyRDD[T: ClassTag]: RDD[T]183```184185```scala186val empty: RDD[String] = sc.emptyRDD[String]187```188189## Shared Variables190191Spark provides two types of shared variables: broadcast variables and accumulators.192193### Broadcast Variables194195**broadcast**: Create a broadcast variable for read-only data196```scala { .api }197def broadcast[T: ClassTag](value: T): Broadcast[T]198```199200```scala201val lookupTable = Map("apple" -> 1, "banana" -> 2, "orange" -> 3)202val broadcastTable = sc.broadcast(lookupTable)203204val data = sc.parallelize(Array("apple", "banana", "apple"))205val mapped = data.map(fruit => broadcastTable.value.getOrElse(fruit, 0))206```207208### Accumulators209210**accumulator**: Create a simple accumulator211```scala { .api }212def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]213```214215**accumulable**: Create an accumulable with different result/element types216```scala { .api }217def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]): Accumulable[T, R]218```219220```scala221// Simple counter222val counter = sc.accumulator(0, "Error Counter")223224val data = sc.parallelize(Array(1, 2, -1, 4, -5))225val positive = data.filter { x =>226if (x < 0) counter += 1 // Count negative numbers227x > 0228}229positive.count() // Trigger action230println(s"Negative numbers: ${counter.value}")231232// Collection accumulator233val errorList = sc.accumulableCollection(mutable.Set[String]())234```235236#### Built-in AccumulatorParam Types237238```scala { .api }239// Available in SparkContext companion object240DoubleAccumulatorParam // For Double values241IntAccumulatorParam // For Int values242LongAccumulatorParam // For Long values243FloatAccumulatorParam // For Float values244```245246## Job Control and Execution247248### Running Jobs249250**runJob**: Run a function on RDD partitions251```scala { .api }252def runJob[T, U: ClassTag](253rdd: RDD[T],254func: (TaskContext, Iterator[T]) => U,255partitions: Seq[Int],256allowLocal: Boolean = false,257resultHandler: (Int, U) => Unit = null258): Array[U]259260// Simplified versions261def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U]262def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U]263```264265```scala266val data = sc.parallelize(1 to 100, 4)267268// Run custom function on each partition269val results = sc.runJob(data, (iter: Iterator[Int]) => iter.sum)270println(s"Partition sums: ${results.mkString(", ")}")271272// With task context273val results2 = sc.runJob(data, (context: TaskContext, iter: Iterator[Int]) => {274(context.partitionId, iter.size)275})276```277278**submitJob**: Submit job asynchronously (Experimental)279```scala { .api }280def submitJob[T, U, R](281rdd: RDD[T],282processPartition: Iterator[T] => U,283partitions: Seq[Int],284resultHandler: (Int, U) => Unit,285resultFunc: => R286): SimpleFutureAction[R]287```288289### Job Groups and Cancellation290291**setJobGroup**: Assign group ID to all jobs started by this thread292```scala { .api }293def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit294```295296**clearJobGroup**: Clear the job group for this thread297```scala { .api }298def clearJobGroup(): Unit299```300301**cancelJobGroup**: Cancel all jobs for the given group302```scala { .api }303def cancelJobGroup(groupId: String): Unit304```305306**cancelAllJobs**: Cancel all scheduled or running jobs307```scala { .api }308def cancelAllJobs(): Unit309```310311```scala312// Set job group313sc.setJobGroup("etl-jobs", "ETL Processing", interruptOnCancel = true)314315val data = sc.textFile("large-file.txt")316val processed = data.map(processLine)317processed.saveAsTextFile("output")318319// Cancel specific job group from another thread320sc.cancelJobGroup("etl-jobs")321```322323## Configuration and Properties324325### Configuration Access326327**getConf**: Get a copy of the SparkContext's configuration328```scala { .api }329def getConf: SparkConf330```331332```scala333val conf = sc.getConf334val appName = conf.get("spark.app.name")335val executorMemory = conf.get("spark.executor.memory", "1g")336```337338### Local Properties339340**setLocalProperty**: Set local property that affects jobs submitted from this thread341```scala { .api }342def setLocalProperty(key: String, value: String): Unit343```344345**getLocalProperty**: Get local property set in this thread346```scala { .api }347def getLocalProperty(key: String): String348```349350```scala351// Set properties that will be passed to tasks352sc.setLocalProperty("spark.sql.execution.id", "query-123")353sc.setLocalProperty("callSite.short", "MyApp.process")354355val value = sc.getLocalProperty("spark.sql.execution.id")356```357358### File and JAR Management359360**addFile**: Add a file to be downloaded with this Spark job on every node361```scala { .api }362def addFile(path: String): Unit363def addFile(path: String, recursive: Boolean): Unit364```365366**addJar**: Add a JAR dependency for all tasks to be executed on this SparkContext367```scala { .api }368def addJar(path: String): Unit369```370371```scala372// Add files that tasks can access via SparkFiles.get()373sc.addFile("/path/to/config.properties")374sc.addFile("hdfs://path/to/lookup-table.csv")375376// Add JARs for task execution377sc.addJar("/path/to/dependencies.jar")378sc.addJar("hdfs://path/to/libs/mylib.jar")379```380381### Checkpointing382383**setCheckpointDir**: Set directory for RDD checkpointing384```scala { .api }385def setCheckpointDir(directory: String): Unit386```387388```scala389sc.setCheckpointDir("hdfs://namenode/checkpoints")390391val data = sc.textFile("large-dataset.txt")392val processed = data.map(complexProcessing).filter(isValid)393processed.checkpoint() // Checkpoint this RDD394```395396## Monitoring and Information397398### Memory and Storage Status399400**getExecutorMemoryStatus**: Get memory status of all executors401```scala { .api }402def getExecutorMemoryStatus: Map[String, (Long, Long)]403```404405**getExecutorStorageStatus**: Get storage status from all executors406```scala { .api }407def getExecutorStorageStatus: Array[StorageStatus]408```409410**getRDDStorageInfo**: Get information about cached/persisted RDDs411```scala { .api }412def getRDDStorageInfo: Array[RDDInfo]413```414415**getPersistentRDDs**: Get all currently persisted RDDs416```scala { .api }417def getPersistentRDDs: Map[Int, RDD[_]]418```419420```scala421// Check memory usage across executors422val memoryStatus = sc.getExecutorMemoryStatus423memoryStatus.foreach { case (executorId, (maxMemory, remainingMemory)) =>424println(s"Executor $executorId: ${remainingMemory}/${maxMemory} bytes available")425}426427// Check cached RDDs428val cachedRDDs = sc.getRDDStorageInfo429cachedRDDs.foreach { rddInfo =>430println(s"RDD ${rddInfo.id} (${rddInfo.name}): ${rddInfo.memSize} bytes in memory")431}432```433434### System Properties435436**version**: Get the version of Spark on which this application is running437```scala { .api }438def version: String439```440441**defaultParallelism**: Default level of parallelism for operations442```scala { .api }443def defaultParallelism: Int444```445446**defaultMinPartitions**: Default min number of partitions for Hadoop RDDs447```scala { .api }448def defaultMinPartitions: Int449```450451```scala452println(s"Spark Version: ${sc.version}")453println(s"Default Parallelism: ${sc.defaultParallelism}")454println(s"Default Min Partitions: ${sc.defaultMinPartitions}")455```456457### Hadoop Configuration458459**hadoopConfiguration**: Access to Hadoop Configuration for reuse across operations460```scala { .api }461def hadoopConfiguration: Configuration462```463464```scala465import org.apache.hadoop.conf.Configuration466467val hadoopConf = sc.hadoopConfiguration468hadoopConf.set("fs.s3a.access.key", "your-access-key")469hadoopConf.set("fs.s3a.secret.key", "your-secret-key")470471// Now S3 operations will use these credentials472val s3Data = sc.textFile("s3a://bucket/path/to/file")473```474475## SparkContext Lifecycle476477### Stopping SparkContext478479**stop**: Shut down the SparkContext480```scala { .api }481def stop(): Unit482```483484```scala485try {486val sc = new SparkContext(conf)487488// Perform Spark operations489val data = sc.textFile("input.txt")490val result = data.map(_.toUpperCase).collect()491492} finally {493sc.stop() // Always stop the context494}495```496497### Best Practices4984991. **Single SparkContext**: Only one SparkContext should be active per JVM5002. **Proper Shutdown**: Always call `stop()` to release resources5013. **Configuration**: Use SparkConf for all configuration rather than constructor parameters5024. **Shared Variables**: Use broadcast variables for large read-only data5035. **Accumulators**: Only use accumulators for debugging and monitoring504505```scala506// Proper pattern for SparkContext usage507val conf = new SparkConf()508.setAppName("My Spark Application")509.setMaster("local[*]")510511val sc = new SparkContext(conf)512513try {514// All Spark operations here515516} finally {517sc.stop()518}519```520521The SparkContext is the foundation of all Spark applications and understanding its API is essential for effective Spark programming.