Apache Spark Core - The foundational component of Apache Spark providing distributed computing capabilities including RDDs, transformations, actions, and cluster management.
npx @tessl/cli install tessl/maven-org-apache-spark--spark-core-2-11@2.4.0Apache Spark Core is the foundational component of the Apache Spark unified analytics engine for large-scale data processing. It provides the core functionality including distributed task scheduling, memory management, fault recovery, and interactions with storage systems. The library implements resilient distributed datasets (RDDs) as the fundamental data abstraction, offering fault-tolerant collections that can be operated on in parallel across a cluster.
pom.xml:<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.8</version>
</dependency>For SBT:
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.8"import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.storage.StorageLevelimport org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.storage.StorageLevel;import org.apache.spark.{SparkContext, SparkConf}
// Create Spark configuration
val conf = new SparkConf()
.setAppName("MySparkApp")
.setMaster("local[*]")
// Create Spark context
val sc = new SparkContext(conf)
// Create RDD from a collection
val data = sc.parallelize(Seq(1, 2, 3, 4, 5))
// Transform and action
val result = data
.map(_ * 2)
.filter(_ > 4)
.collect()
// Clean up
sc.stop()import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
// Create configuration
SparkConf conf = new SparkConf()
.setAppName("MySparkApp")
.setMaster("local[*]");
// Create context
JavaSparkContext jsc = new JavaSparkContext(conf);
// Create RDD
JavaRDD<Integer> data = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
// Transform and collect
List<Integer> result = data
.map(x -> x * 2)
.filter(x -> x > 4)
.collect();
// Clean up
jsc.stop();Spark Core is built around several key abstractions:
The primary entry points for configuring and initializing Spark applications.
class SparkConf() {
def set(key: String, value: String): SparkConf
def setMaster(master: String): SparkConf
def setAppName(name: String): SparkConf
def get(key: String): String
def get(key: String, defaultValue: String): String
}
class SparkContext(config: SparkConf) {
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
def stop(): Unit
def broadcast[T: ClassTag](value: T): Broadcast[T]
def longAccumulator(): LongAccumulator
def doubleAccumulator(): DoubleAccumulator
}The core distributed data abstraction with comprehensive transformation and action operations.
abstract class RDD[T: ClassTag] {
// Transformations
def map[U: ClassTag](f: T => U): RDD[U]
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
def filter(f: T => Boolean): RDD[T]
def distinct(numPartitions: Int = partitions.length): RDD[T]
def union(other: RDD[T]): RDD[T]
def intersection(other: RDD[T]): RDD[T]
// Actions
def collect(): Array[T]
def count(): Long
def first(): T
def take(num: Int): Array[T]
def reduce(f: (T, T) => T): T
def foreach(f: T => Unit): Unit
// Persistence
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): this.type
def cache(): this.type
def unpersist(blocking: Boolean = true): this.type
}Specialized operations available on RDDs of key-value pairs for aggregation and joining.
class PairRDDFunctions[K, V](self: RDD[(K, V)]) {
def keys: RDD[K]
def values: RDD[V]
def groupByKey(): RDD[(K, Iterable[V])]
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def sortByKey(ascending: Boolean = true): RDD[(K, V)]
}Java-friendly wrappers that provide type-safe operations and integrate with Java collections.
public class JavaSparkContext {
public <T> JavaRDD<T> parallelize(List<T> list);
public JavaRDD<String> textFile(String path);
public <T> Broadcast<T> broadcast(T value);
public void stop();
}
public class JavaRDD<T> {
public <R> JavaRDD<R> map(Function<T, R> f);
public <U> JavaRDD<U> flatMap(FlatMapFunction<T, U> f);
public JavaRDD<T> filter(Function<T, Boolean> f);
public List<T> collect();
public long count();
public T first();
}
public class JavaPairRDD<K, V> {
public JavaRDD<K> keys();
public JavaRDD<V> values();
public JavaPairRDD<K, Iterable<V>> groupByKey();
public JavaPairRDD<K, V> reduceByKey(Function2<V, V, V> func);
public <W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other);
}Shared variables for efficient data distribution and aggregation across cluster nodes.
abstract class Broadcast[T] {
def value: T
def unpersist(): Unit
def unpersist(blocking: Boolean): Unit
def destroy(): Unit
def id: Long
}
abstract class AccumulatorV2[IN, OUT] {
def isZero: Boolean
def copy(): AccumulatorV2[IN, OUT]
def reset(): Unit
def add(v: IN): Unit
def merge(other: AccumulatorV2[IN, OUT]): Unit
def value: OUT
}
class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {
def add(v: Long): Unit
def add(v: java.lang.Long): Unit
def sum: Long
def count: Long
def avg: Double
}Fine-grained control over RDD caching and persistence strategies across memory and disk.
object StorageLevel {
val NONE: StorageLevel
val DISK_ONLY: StorageLevel
val DISK_ONLY_2: StorageLevel
val MEMORY_ONLY: StorageLevel
val MEMORY_ONLY_2: StorageLevel
val MEMORY_ONLY_SER: StorageLevel
val MEMORY_ONLY_SER_2: StorageLevel
val MEMORY_AND_DISK: StorageLevel
val MEMORY_AND_DISK_2: StorageLevel
val MEMORY_AND_DISK_SER: StorageLevel
val MEMORY_AND_DISK_SER_2: StorageLevel
val OFF_HEAP: StorageLevel
}
class StorageLevel {
def useDisk: Boolean
def useMemory: Boolean
def useOffHeap: Boolean
def deserialized: Boolean
def replication: Int
}Runtime information and control for tasks executing on cluster nodes.
abstract class TaskContext {
def isCompleted(): Boolean
def isInterrupted(): Boolean
def stageId(): Int
def partitionId(): Int
def attemptNumber(): Int
def taskAttemptId(): Long
def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext
def addTaskFailureListener(listener: TaskFailureListener): TaskContext
def getLocalProperty(key: String): String
}
object TaskContext {
def get(): TaskContext
def getPartitionId(): Int
}APIs for monitoring job and stage progress, executor status, and application metrics.
class SparkStatusTracker {
def getJobIdsForGroup(jobGroup: String): Array[Int]
def getActiveStageIds(): Array[Int]
def getActiveJobIds(): Array[Int]
def getJobInfo(jobId: Int): Option[SparkJobInfo]
def getStageInfo(stageId: Int): Option[SparkStageInfo]
def getExecutorInfos: Array[SparkExecutorInfo]
}
class SparkJobInfo {
def jobId(): Int
def stageIds(): Array[Int]
def status(): JobExecutionStatus
}
class SparkStageInfo {
def stageId(): Int
def name(): String
def numTasks(): Int
def numActiveTasks(): Int
def numCompleteTasks(): Int
def numFailedTasks(): Int
}// Core type constraints
type ClassTag[T] = scala.reflect.ClassTag[T]
// Partitioning
abstract class Partitioner {
def numPartitions: Int
def getPartition(key: Any): Int
}
class HashPartitioner(partitions: Int) extends Partitioner
class RangePartitioner[K: Ordering: ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]]
) extends Partitioner
// Function types for Java API
@FunctionalInterface
trait Function[T1, R] extends Serializable {
def call(v1: T1): R
}
@FunctionalInterface
trait Function2[T1, T2, R] extends Serializable {
def call(v1: T1, v2: T2): R
}
@FunctionalInterface
trait VoidFunction[T] extends Serializable {
def call(t: T): Unit
}
@FunctionalInterface
trait FlatMapFunction[T, R] extends Serializable {
def call(t: T): java.util.Iterator[R]
}
@FunctionalInterface
trait PairFunction[T, K, V] extends Serializable {
def call(t: T): Tuple2[K, V]
}
// Exception types
class SparkException(message: String, cause: Throwable) extends Exception
class TaskKilledException(reason: String) extends RuntimeException
class TaskNotSerializableException(className: String) extends SparkException