Application setup and cluster connection management through SparkContext and SparkConf for coordinating distributed Spark applications.
Main entry point for Spark functionality, representing the connection to a Spark cluster. Only one SparkContext should be active per JVM.
/**
* Main entry point for Spark functionality
* @param config Spark configuration object
*/
class SparkContext(config: SparkConf) {
/** Create RDD from Scala collection */
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
/** Read text files from HDFS, local filesystem, or any Hadoop-supported URI */
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
/** Create RDD from Hadoop InputFormat */
def hadoopRDD[K, V](
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)]
/** Create RDD from new Hadoop API InputFormat */
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V]
): RDD[(K, V)]
/** Create broadcast variable */
def broadcast[T: ClassTag](value: T): Broadcast[T]
/** Create accumulator */
def longAccumulator(name: String): LongAccumulator
def doubleAccumulator(name: String): DoubleAccumulator
def collectionAccumulator[T](name: String): CollectionAccumulator[T]
/** Add file to be downloaded with this Spark job */
def addFile(path: String, recursive: Boolean = false): Unit
/** Add JAR file to be distributed to cluster */
def addJar(path: String): Unit
/** Set checkpoint directory for RDD checkpointing */
def setCheckpointDir(directory: String): Unit
/** Stop SparkContext and release resources */
def stop(): Unit
/** Get current status tracker */
def statusTracker: SparkStatusTracker
/** Default parallelism level */
def defaultParallelism: Int
/** Spark version */
def version: String
}Usage Examples:
import org.apache.spark.{SparkContext, SparkConf}
// Basic setup
val conf = new SparkConf()
.setAppName("MyApp")
.setMaster("local[2]")
val sc = new SparkContext(conf)
// Create RDD from collection
val numbers = sc.parallelize(1 to 100)
// Read text file
val lines = sc.textFile("hdfs://data/input.txt")
// Add dependencies
sc.addFile("config.properties")
sc.addJar("lib/custom.jar")
// Cleanup
sc.stop()Configuration for Spark applications with key-value pairs for runtime settings.
/**
* Configuration for Spark applications
* @param loadDefaults whether to load default configurations
*/
class SparkConf(loadDefaults: Boolean = true) {
/** Set configuration property */
def set(key: String, value: String): SparkConf
/** Set application name */
def setAppName(name: String): SparkConf
/** Set master URL */
def setMaster(master: String): SparkConf
/** Set Spark home directory */
def setSparkHome(home: String): SparkConf
/** Set JAR files to distribute to cluster */
def setJars(jars: Seq[String]): SparkConf
/** Set executor memory */
def set(key: String, value: String): SparkConf
/** Get configuration value */
def get(key: String): String
def get(key: String, defaultValue: String): String
/** Get all configuration as key-value pairs */
def getAll: Array[(String, String)]
/** Check if configuration contains key */
def contains(key: String): Boolean
/** Remove configuration property */
def remove(key: String): SparkConf
/** Clone configuration */
def clone(): SparkConf
}Usage Examples:
val conf = new SparkConf()
.setAppName("Data Processing")
.setMaster("yarn")
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "4")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// Advanced configuration
conf.set("spark.hadoop.fs.s3a.access.key", accessKey)
.set("spark.hadoop.fs.s3a.secret.key", secretKey)
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")Low-level status API for monitoring Spark applications and job execution.
/**
* Low-level status tracking API
*/
class SparkStatusTracker {
/** Get IDs of all known jobs */
def getJobIdsForGroup(jobGroup: String): Array[Int]
/** Get information for specific job */
def getJobInfo(jobId: Int): Option[SparkJobInfo]
/** Get information for specific stage */
def getStageInfo(stageId: Int): Option[SparkStageInfo]
/** Get active job IDs */
def getActiveJobIds(): Array[Int]
/** Get active stage IDs */
def getActiveStageIds(): Array[Int]
/** Get executor information */
def getExecutorInfos(): Array[SparkExecutorInfo]
}
case class SparkJobInfo(
jobId: Int,
stageIds: Array[Int],
status: JobExecutionStatus
)
case class SparkStageInfo(
stageId: Int,
currentAttemptId: Int,
name: String,
numTasks: Int,
numActiveTasks: Int,
numCompleteTasks: Int,
numFailedTasks: Int
)Utility for resolving paths to files added through SparkContext.addFile().
/**
* Resolves paths to files added through SparkContext.addFile()
*/
object SparkFiles {
/** Get local path to file added with SparkContext.addFile() */
def get(filename: String): String
/** Get root directory containing files added with SparkContext.addFile() */
def getRootDirectory(): String
}class SparkException(message: String, cause: Throwable = null)
extends Exception(message, cause)
class TaskNotSerializableException(taskName: String, cause: NotSerializableException)
extends SparkException(s"Task not serializable: $taskName", cause)
class SparkFileAlreadyExistsException(outputPath: String)
extends SparkException(s"Output path $outputPath already exists")Common exceptions include serialization errors when tasks contain non-serializable closures, and file system errors when output paths already exist.