CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-apache-spark

Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R

Pending
Overview
Eval results
Files

spark-context.mddocs/

SparkContext API

The SparkContext is the main entry point for all Spark functionality. It represents the connection to a Spark cluster and coordinates the execution of operations across the cluster. Every Spark application must create exactly one active SparkContext.

SparkContext Class

class SparkContext(config: SparkConf) extends Logging {
  // Primary constructor
  def this() = this(new SparkConf())
  def this(master: String, appName: String, conf: SparkConf) = { /* ... */ }
  def this(master: String, appName: String, sparkHome: String, jars: Seq[String], environment: Map[String, String] = Map()) = { /* ... */ }
}

Creating SparkContext

Basic Construction

import org.apache.spark.{SparkContext, SparkConf}

// Using SparkConf (recommended)
val conf = new SparkConf()
  .setAppName("My Application")
  .setMaster("local[*]")
  .set("spark.executor.memory", "2g")

val sc = new SparkContext(conf)

Alternative Constructors

// Default constructor (loads from system properties)
val sc = new SparkContext()

// With master and app name
val sc = new SparkContext("local[*]", "My App")

// Full constructor with all parameters
val sc = new SparkContext(
  master = "local[*]",
  appName = "My App", 
  sparkHome = "/path/to/spark",
  jars = Seq("myapp.jar"),
  environment = Map("SPARK_ENV_VAR" -> "value")
)

Developer API Constructor (for YARN)

// @DeveloperApi - for internal use, typically in YARN mode
val sc = new SparkContext(
  config = conf,
  preferredNodeLocationData = Map[String, Set[SplitInfo]]()
)

RDD Creation Methods

From Collections

parallelize: Distribute a local collection to form an RDD

def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] // alias
val data = Array(1, 2, 3, 4, 5)
val rdd = sc.parallelize(data)                    // Use default parallelism
val rddWithPartitions = sc.parallelize(data, 4)   // Specify 4 partitions

With location preferences:

def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]
// Create RDD with preferred locations for each element
val dataWithPrefs = Seq(
  (1, Seq("host1", "host2")),
  (2, Seq("host3")),
  (3, Seq("host1"))
)
val rdd = sc.makeRDD(dataWithPrefs)

From Files

textFile: Read text files from HDFS or local filesystem

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
val lines = sc.textFile("hdfs://namenode:port/path/to/file.txt")
val linesLocal = sc.textFile("file:///local/path/file.txt")
val linesWithPartitions = sc.textFile("hdfs://path/to/file.txt", 8)

wholeTextFiles: Read directory of text files as key-value pairs

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
// Returns RDD[(filename, content)]
val files = sc.wholeTextFiles("hdfs://path/to/directory/")
files.foreach { case (filename, content) =>
  println(s"File: $filename, Size: ${content.length}")
}

Hadoop Files

sequenceFile: Read Hadoop SequenceFiles

def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
import org.apache.hadoop.io.{IntWritable, Text}

val seqFile = sc.sequenceFile[IntWritable, Text]("path/to/sequencefile", 
  classOf[IntWritable], classOf[Text])

hadoopFile: Read files with arbitrary Hadoop InputFormat

def hadoopFile[K, V](
  path: String,
  inputFormatClass: Class[_ <: InputFormat[K, V]],
  keyClass: Class[K],
  valueClass: Class[V],
  minPartitions: Int = defaultMinPartitions
): RDD[(K, V)]
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}

val hadoopRDD = sc.hadoopFile[LongWritable, Text](
  "hdfs://path/to/input",
  classOf[TextInputFormat],
  classOf[LongWritable], 
  classOf[Text]
)

objectFile: Load RDD saved as SequenceFile of serialized objects

def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]
// Load RDD that was previously saved with saveAsObjectFile
val restored: RDD[MyClass] = sc.objectFile[MyClass]("path/to/objects")

RDD Manipulation

union: Build union of a list of RDDs

def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T]
val rdd1 = sc.parallelize(Array(1, 2, 3))
val rdd2 = sc.parallelize(Array(4, 5, 6))
val rdd3 = sc.parallelize(Array(7, 8, 9))

val unionRDD = sc.union(Seq(rdd1, rdd2, rdd3))

emptyRDD: Create empty RDD with no partitions

def emptyRDD[T: ClassTag]: RDD[T]
val empty: RDD[String] = sc.emptyRDD[String]

Shared Variables

Spark provides two types of shared variables: broadcast variables and accumulators.

Broadcast Variables

broadcast: Create a broadcast variable for read-only data

def broadcast[T: ClassTag](value: T): Broadcast[T]
val lookupTable = Map("apple" -> 1, "banana" -> 2, "orange" -> 3)
val broadcastTable = sc.broadcast(lookupTable)

val data = sc.parallelize(Array("apple", "banana", "apple"))
val mapped = data.map(fruit => broadcastTable.value.getOrElse(fruit, 0))

Accumulators

accumulator: Create a simple accumulator

def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]

accumulable: Create an accumulable with different result/element types

def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]): Accumulable[T, R]
// Simple counter
val counter = sc.accumulator(0, "Error Counter")

val data = sc.parallelize(Array(1, 2, -1, 4, -5))
val positive = data.filter { x =>
  if (x < 0) counter += 1  // Count negative numbers
  x > 0
}
positive.count()  // Trigger action
println(s"Negative numbers: ${counter.value}")

// Collection accumulator
val errorList = sc.accumulableCollection(mutable.Set[String]())

Built-in AccumulatorParam Types

// Available in SparkContext companion object
DoubleAccumulatorParam    // For Double values
IntAccumulatorParam       // For Int values
LongAccumulatorParam      // For Long values
FloatAccumulatorParam     // For Float values

Job Control and Execution

Running Jobs

runJob: Run a function on RDD partitions

def runJob[T, U: ClassTag](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int],
  allowLocal: Boolean = false,
  resultHandler: (Int, U) => Unit = null
): Array[U]

// Simplified versions
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U]
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U]
val data = sc.parallelize(1 to 100, 4)

// Run custom function on each partition
val results = sc.runJob(data, (iter: Iterator[Int]) => iter.sum)
println(s"Partition sums: ${results.mkString(", ")}")

// With task context
val results2 = sc.runJob(data, (context: TaskContext, iter: Iterator[Int]) => {
  (context.partitionId, iter.size)
})

submitJob: Submit job asynchronously (Experimental)

def submitJob[T, U, R](
  rdd: RDD[T],
  processPartition: Iterator[T] => U,
  partitions: Seq[Int],
  resultHandler: (Int, U) => Unit,
  resultFunc: => R
): SimpleFutureAction[R]

Job Groups and Cancellation

setJobGroup: Assign group ID to all jobs started by this thread

def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit

clearJobGroup: Clear the job group for this thread

def clearJobGroup(): Unit

cancelJobGroup: Cancel all jobs for the given group

def cancelJobGroup(groupId: String): Unit

cancelAllJobs: Cancel all scheduled or running jobs

def cancelAllJobs(): Unit
// Set job group
sc.setJobGroup("etl-jobs", "ETL Processing", interruptOnCancel = true)

val data = sc.textFile("large-file.txt")
val processed = data.map(processLine)
processed.saveAsTextFile("output")

// Cancel specific job group from another thread
sc.cancelJobGroup("etl-jobs")

Configuration and Properties

Configuration Access

getConf: Get a copy of the SparkContext's configuration

def getConf: SparkConf
val conf = sc.getConf
val appName = conf.get("spark.app.name")
val executorMemory = conf.get("spark.executor.memory", "1g")

Local Properties

setLocalProperty: Set local property that affects jobs submitted from this thread

def setLocalProperty(key: String, value: String): Unit

getLocalProperty: Get local property set in this thread

def getLocalProperty(key: String): String
// Set properties that will be passed to tasks
sc.setLocalProperty("spark.sql.execution.id", "query-123")
sc.setLocalProperty("callSite.short", "MyApp.process")

val value = sc.getLocalProperty("spark.sql.execution.id")

File and JAR Management

addFile: Add a file to be downloaded with this Spark job on every node

def addFile(path: String): Unit
def addFile(path: String, recursive: Boolean): Unit

addJar: Add a JAR dependency for all tasks to be executed on this SparkContext

def addJar(path: String): Unit
// Add files that tasks can access via SparkFiles.get()
sc.addFile("/path/to/config.properties")
sc.addFile("hdfs://path/to/lookup-table.csv")

// Add JARs for task execution
sc.addJar("/path/to/dependencies.jar")
sc.addJar("hdfs://path/to/libs/mylib.jar")

Checkpointing

setCheckpointDir: Set directory for RDD checkpointing

def setCheckpointDir(directory: String): Unit
sc.setCheckpointDir("hdfs://namenode/checkpoints")

val data = sc.textFile("large-dataset.txt")
val processed = data.map(complexProcessing).filter(isValid)
processed.checkpoint()  // Checkpoint this RDD

Monitoring and Information

Memory and Storage Status

getExecutorMemoryStatus: Get memory status of all executors

def getExecutorMemoryStatus: Map[String, (Long, Long)]

getExecutorStorageStatus: Get storage status from all executors

def getExecutorStorageStatus: Array[StorageStatus]

getRDDStorageInfo: Get information about cached/persisted RDDs

def getRDDStorageInfo: Array[RDDInfo]

getPersistentRDDs: Get all currently persisted RDDs

def getPersistentRDDs: Map[Int, RDD[_]]
// Check memory usage across executors
val memoryStatus = sc.getExecutorMemoryStatus
memoryStatus.foreach { case (executorId, (maxMemory, remainingMemory)) =>
  println(s"Executor $executorId: ${remainingMemory}/${maxMemory} bytes available")
}

// Check cached RDDs
val cachedRDDs = sc.getRDDStorageInfo
cachedRDDs.foreach { rddInfo =>
  println(s"RDD ${rddInfo.id} (${rddInfo.name}): ${rddInfo.memSize} bytes in memory")
}

System Properties

version: Get the version of Spark on which this application is running

def version: String

defaultParallelism: Default level of parallelism for operations

def defaultParallelism: Int

defaultMinPartitions: Default min number of partitions for Hadoop RDDs

def defaultMinPartitions: Int
println(s"Spark Version: ${sc.version}")
println(s"Default Parallelism: ${sc.defaultParallelism}")
println(s"Default Min Partitions: ${sc.defaultMinPartitions}")

Hadoop Configuration

hadoopConfiguration: Access to Hadoop Configuration for reuse across operations

def hadoopConfiguration: Configuration
import org.apache.hadoop.conf.Configuration

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", "your-access-key")
hadoopConf.set("fs.s3a.secret.key", "your-secret-key")

// Now S3 operations will use these credentials
val s3Data = sc.textFile("s3a://bucket/path/to/file")

SparkContext Lifecycle

Stopping SparkContext

stop: Shut down the SparkContext

def stop(): Unit
try {
  val sc = new SparkContext(conf)
  
  // Perform Spark operations
  val data = sc.textFile("input.txt")
  val result = data.map(_.toUpperCase).collect()
  
} finally {
  sc.stop()  // Always stop the context
}

Best Practices

  1. Single SparkContext: Only one SparkContext should be active per JVM
  2. Proper Shutdown: Always call stop() to release resources
  3. Configuration: Use SparkConf for all configuration rather than constructor parameters
  4. Shared Variables: Use broadcast variables for large read-only data
  5. Accumulators: Only use accumulators for debugging and monitoring
// Proper pattern for SparkContext usage
val conf = new SparkConf()
  .setAppName("My Spark Application")
  .setMaster("local[*]")

val sc = new SparkContext(conf)

try {
  // All Spark operations here
  
} finally {
  sc.stop()
}

The SparkContext is the foundation of all Spark applications and understanding its API is essential for effective Spark programming.

Install with Tessl CLI

npx tessl i tessl/maven-apache-spark

docs

caching-persistence.md

core-rdd.md

data-sources.md

graphx.md

index.md

java-api.md

key-value-operations.md

mllib.md

python-api.md

spark-context.md

sql.md

streaming.md

tile.json