Apache Spark Core - The foundational component of Apache Spark providing distributed computing capabilities including RDDs, transformations, actions, and cluster management.
—
This module provides the primary entry points for creating and configuring Spark applications. The SparkContext serves as the main coordination point for distributed computation, while SparkConf handles all configuration parameters.
SparkConf manages all configuration parameters for a Spark application.
class SparkConf(loadDefaults: Boolean = true) {
def set(key: String, value: String): SparkConf
def setMaster(master: String): SparkConf
def setAppName(name: String): SparkConf
def setJars(jars: Seq[String]): SparkConf
def setExecutorEnv(variable: String, value: String): SparkConf
def setExecutorEnv(variables: Seq[(String, String)]): SparkConf
def setExecutorEnv(variables: Array[(String, String)]): SparkConf
def get(key: String): String
def get(key: String, defaultValue: String): String
def getOption(key: String): Option[String]
def getAll: Array[(String, String)]
def remove(key: String): SparkConf
def contains(key: String): Boolean
def clone(): SparkConf
def setSparkHome(home: String): SparkConf
def setIfMissing(key: String, value: String): SparkConf
}import org.apache.spark.SparkConf
// Basic configuration
val conf = new SparkConf()
.setAppName("My Spark Application")
.setMaster("local[4]")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
// Configure executor settings
conf.set("spark.executor.memory", "4g")
.set("spark.executor.cores", "2")
.set("spark.executor.instances", "10")
// Set environment variables for executors
conf.setExecutorEnv("JAVA_HOME", "/usr/lib/jvm/java-8-openjdk")
.setExecutorEnv(Seq(
("SPARK_LOCAL_DIRS", "/tmp/spark"),
("SPARK_WORKER_DIR", "/tmp/spark-worker")
))
// Conditional configuration
if (!conf.contains("spark.master")) {
conf.setMaster("local[*]")
}
// Clone configuration for different contexts
val testConf = conf.clone()
.setAppName("Test Application")
.set("spark.sql.execution.arrow.pyspark.enabled", "false")SparkContext is the main entry point for Spark functionality. Only one SparkContext should be active per JVM.
class SparkContext(config: SparkConf) {
// RDD Creation
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]
def emptyRDD[T: ClassTag]: RDD[T]
def range(start: Long, end: Long, step: Long = 1, numSlices: Int = defaultParallelism): RDD[Long]
// File Input
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
def hadoopRDD[K, V](conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](conf: Configuration = hadoopConfiguration, fClass: Class[F], kClass: Class[K], vClass: Class[V]): RDD[(K, V)]
// Broadcast and Accumulators
def broadcast[T: ClassTag](value: T): Broadcast[T]
def longAccumulator(): LongAccumulator
def longAccumulator(name: String): LongAccumulator
def doubleAccumulator(): DoubleAccumulator
def doubleAccumulator(name: String): DoubleAccumulator
def collectionAccumulator[T](): CollectionAccumulator[T]
def collectionAccumulator[T](name: String): CollectionAccumulator[T]
// Application Control
def stop(): Unit
def addFile(path: String): Unit
def addFile(path: String, recursive: Boolean): Unit
def addJar(path: String): Unit
def clearFiles(): Unit
def clearJars(): Unit
// Configuration and Environment
def setLogLevel(logLevel: String): Unit
def setLocalProperty(key: String, value: String): Unit
def getLocalProperty(key: String): String
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
def clearJobGroup(): Unit
def setJobDescription(value: String): Unit
// Dynamic Resource Allocation
def requestTotalExecutors(requestedTotal: Int, localityAwareTasks: Int = 0, hostToLocalTaskCount: Map[String, Int] = Map.empty): Boolean
def requestExecutors(numAdditionalExecutors: Int): Boolean
def killExecutors(executorIds: Seq[String]): Boolean
def killExecutor(executorId: String): Boolean
// Properties
def master: String
def appName: String
def jars: Seq[String]
def files: Seq[String]
def startTime: Long
def version: String
def defaultParallelism: Int
def defaultMinPartitions: Int
def conf: SparkConf
def statusTracker: SparkStatusTracker
}import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
// Create configuration and context
val conf = new SparkConf()
.setAppName("Data Processing Pipeline")
.setMaster("spark://master:7077")
val sc = new SparkContext(conf)
// Create RDDs from various sources
val numbers = sc.parallelize(1 to 1000000, numSlices = 100)
val lines = sc.textFile("hdfs://data/input.txt", minPartitions = 50)
val keyValueData = sc.sequenceFile[String, Int]("hdfs://data/sequence")
// Set job properties for monitoring
sc.setJobGroup("data-processing", "Main data processing pipeline")
sc.setJobDescription("Processing customer transaction data")
// Add application files and JARs
sc.addFile("hdfs://shared/config.properties")
sc.addJar("s3://libs/custom-transformations.jar")
// Create broadcast variables for lookup tables
val lookupTable = Map("A" -> 1, "B" -> 2, "C" -> 3)
val broadcastLookup = sc.broadcast(lookupTable)
// Create accumulators for metrics
val errorCount = sc.longAccumulator("Processing Errors")
val processingTime = sc.doubleAccumulator("Total Processing Time")
// Dynamic resource management
if (sc.defaultParallelism < 100) {
sc.requestExecutors(20)
}
// Process data using broadcast and accumulators
val results = lines.map { line =>
try {
val startTime = System.currentTimeMillis()
val processed = processLine(line, broadcastLookup.value)
processingTime.add(System.currentTimeMillis() - startTime)
processed
} catch {
case e: Exception =>
errorCount.add(1)
throw e
}
}
// Persist intermediate results
results.persist(StorageLevel.MEMORY_AND_DISK_SER)
// Trigger computation and collect metrics
val finalCount = results.count()
println(s"Processed $finalCount records")
println(s"Error count: ${errorCount.value}")
println(s"Average processing time: ${processingTime.value / finalCount}ms")
// Clean up
sc.stop()Access to Spark version and build information.
// Available in org.apache.spark package object
val SPARK_VERSION: String
val SPARK_VERSION_SHORT: String
val SPARK_BRANCH: String
val SPARK_REVISION: String
val SPARK_BUILD_USER: String
val SPARK_REPO_URL: String
val SPARK_BUILD_DATE: Stringimport org.apache.spark._
println(s"Running Spark version: $SPARK_VERSION")
println(s"Built by: $SPARK_BUILD_USER on $SPARK_BUILD_DATE")
println(s"Git revision: $SPARK_REVISION")Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-core-2-11