Apache Spark Core provides the foundational execution engine and API for distributed data processing with RDDs, task scheduling, and cluster management.
Application setup and cluster connection management through SparkContext and SparkConf for coordinating distributed Spark applications.
Main entry point for Spark functionality, representing the connection to a Spark cluster. Only one SparkContext should be active per JVM.
/**
* Main entry point for Spark functionality
* @param config Spark configuration object
*/
class SparkContext(config: SparkConf) {
/** Create RDD from Scala collection */
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
/** Read text files from HDFS, local filesystem, or any Hadoop-supported URI */
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
/** Create RDD from Hadoop InputFormat */
def hadoopRDD[K, V](
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)]
/** Create RDD from new Hadoop API InputFormat */
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V]
): RDD[(K, V)]
/** Create broadcast variable */
def broadcast[T: ClassTag](value: T): Broadcast[T]
/** Create accumulator */
def longAccumulator(name: String): LongAccumulator
def doubleAccumulator(name: String): DoubleAccumulator
def collectionAccumulator[T](name: String): CollectionAccumulator[T]
/** Add file to be downloaded with this Spark job */
def addFile(path: String, recursive: Boolean = false): Unit
/** Add JAR file to be distributed to cluster */
def addJar(path: String): Unit
/** Set checkpoint directory for RDD checkpointing */
def setCheckpointDir(directory: String): Unit
/** Stop SparkContext and release resources */
def stop(): Unit
/** Get current status tracker */
def statusTracker: SparkStatusTracker
/** Default parallelism level */
def defaultParallelism: Int
/** Spark version */
def version: String
}Usage Examples:
import org.apache.spark.{SparkContext, SparkConf}
// Basic setup
val conf = new SparkConf()
.setAppName("MyApp")
.setMaster("local[2]")
val sc = new SparkContext(conf)
// Create RDD from collection
val numbers = sc.parallelize(1 to 100)
// Read text file
val lines = sc.textFile("hdfs://data/input.txt")
// Add dependencies
sc.addFile("config.properties")
sc.addJar("lib/custom.jar")
// Cleanup
sc.stop()Configuration for Spark applications with key-value pairs for runtime settings.
/**
* Configuration for Spark applications
* @param loadDefaults whether to load default configurations
*/
class SparkConf(loadDefaults: Boolean = true) {
/** Set configuration property */
def set(key: String, value: String): SparkConf
/** Set application name */
def setAppName(name: String): SparkConf
/** Set master URL */
def setMaster(master: String): SparkConf
/** Set Spark home directory */
def setSparkHome(home: String): SparkConf
/** Set JAR files to distribute to cluster */
def setJars(jars: Seq[String]): SparkConf
/** Set executor memory */
def set(key: String, value: String): SparkConf
/** Get configuration value */
def get(key: String): String
def get(key: String, defaultValue: String): String
/** Get all configuration as key-value pairs */
def getAll: Array[(String, String)]
/** Check if configuration contains key */
def contains(key: String): Boolean
/** Remove configuration property */
def remove(key: String): SparkConf
/** Clone configuration */
def clone(): SparkConf
}Usage Examples:
val conf = new SparkConf()
.setAppName("Data Processing")
.setMaster("yarn")
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "4")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// Advanced configuration
conf.set("spark.hadoop.fs.s3a.access.key", accessKey)
.set("spark.hadoop.fs.s3a.secret.key", secretKey)
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")Low-level status API for monitoring Spark applications and job execution.
/**
* Low-level status tracking API
*/
class SparkStatusTracker {
/** Get IDs of all known jobs */
def getJobIdsForGroup(jobGroup: String): Array[Int]
/** Get information for specific job */
def getJobInfo(jobId: Int): Option[SparkJobInfo]
/** Get information for specific stage */
def getStageInfo(stageId: Int): Option[SparkStageInfo]
/** Get active job IDs */
def getActiveJobIds(): Array[Int]
/** Get active stage IDs */
def getActiveStageIds(): Array[Int]
/** Get executor information */
def getExecutorInfos(): Array[SparkExecutorInfo]
}
case class SparkJobInfo(
jobId: Int,
stageIds: Array[Int],
status: JobExecutionStatus
)
case class SparkStageInfo(
stageId: Int,
currentAttemptId: Int,
name: String,
numTasks: Int,
numActiveTasks: Int,
numCompleteTasks: Int,
numFailedTasks: Int
)Utility for resolving paths to files added through SparkContext.addFile().
/**
* Resolves paths to files added through SparkContext.addFile()
*/
object SparkFiles {
/** Get local path to file added with SparkContext.addFile() */
def get(filename: String): String
/** Get root directory containing files added with SparkContext.addFile() */
def getRootDirectory(): String
}class SparkException(message: String, cause: Throwable = null)
extends Exception(message, cause)
class TaskNotSerializableException(taskName: String, cause: NotSerializableException)
extends SparkException(s"Task not serializable: $taskName", cause)
class SparkFileAlreadyExistsException(outputPath: String)
extends SparkException(s"Output path $outputPath already exists")Common exceptions include serialization errors when tasks contain non-serializable closures, and file system errors when output paths already exist.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-core-2-13