Apache Spark is a unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R.
—
The Spark Core Engine provides the fundamental distributed computing capabilities through Resilient Distributed Datasets (RDDs), transformations, actions, and distributed variables. This is the foundation upon which all other Spark components are built.
Core engine functionality is available through:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.util.{LongAccumulator, DoubleAccumulator}import org.apache.spark.{SparkConf, SparkContext}
// Initialize Spark
val conf = new SparkConf()
.setAppName("My Spark Application")
.setMaster("local[*]") // or cluster URL
val sc = new SparkContext(conf)
// Create RDD from collection
val numbers = sc.parallelize(1 to 1000000)
// Transform and compute
val result = numbers
.filter(_ % 2 == 0)
.map(_ * 2)
.reduce(_ + _)
println(s"Result: $result")
sc.stop()The main entry point for Spark functionality. Represents the connection to a Spark cluster and is used to create RDDs, accumulators, and broadcast variables.
class SparkContext(config: SparkConf) {
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
def parallelize[T: ClassTag](seq: Seq[T]): RDD[T]
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
def sequenceFile[K, V](path: String)(implicit km: ClassTag[K], vm: ClassTag[V]): RDD[(K, V)]
// Distributed variables
def broadcast[T](value: T): Broadcast[T]
def longAccumulator(): LongAccumulator
def longAccumulator(name: String): LongAccumulator
def doubleAccumulator(): DoubleAccumulator
def doubleAccumulator(name: String): DoubleAccumulator
// File distribution
def addFile(path: String): Unit
def addFile(path: String, recursive: Boolean): Unit
def addJar(path: String): Unit
// Job and resource management
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
def clearJobGroup(): Unit
def addJobTag(tag: String): Unit
def removeJobTag(tag: String): Unit
def getJobTags(): Set[String]
def clearJobTags(): Unit
def requestExecutors(numExecutors: Int): Boolean
def killExecutors(executorIds: Seq[String]): Boolean
def getExecutorMemoryStatus: Map[String, (Long, Long)]
// Control
def stop(): Unit
def getConf: SparkConf
def defaultParallelism: Int
def version: String
}Usage example:
val conf = new SparkConf().setAppName("MyApp")
val sc = new SparkContext(conf)
// Create RDD from file
val textRDD = sc.textFile("hdfs://path/to/file.txt")
// Create RDD from collection
val numbersRDD = sc.parallelize(Array(1, 2, 3, 4, 5))
// Create broadcast variable
val broadcastVar = sc.broadcast(Map("key1" -> "value1", "key2" -> "value2"))
// Create accumulator
val counter = sc.longAccumulator("MyCounter")Configuration for Spark applications, controlling various aspects of execution.
class SparkConf(loadDefaults: Boolean = true) {
def set(key: String, value: String): SparkConf
def setIfMissing(key: String, value: String): SparkConf
def setAppName(name: String): SparkConf
def setMaster(master: String): SparkConf
def setJars(jars: Seq[String]): SparkConf
def setExecutorEnv(variables: Seq[(String, String)]): SparkConf
def get(key: String): String
def get(key: String, defaultValue: String): String
def getOption(key: String): Option[String]
def getAll: Array[(String, String)]
def contains(key: String): Boolean
def remove(key: String): SparkConf
def clone(): SparkConf
}Usage example:
val conf = new SparkConf()
.setAppName("My Application")
.setMaster("yarn")
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "4")
.set("spark.sql.adaptive.enabled", "true")The fundamental data structure in Spark. An RDD is an immutable, distributed collection of objects that can be processed in parallel.
abstract class RDD[T: ClassTag] extends Serializable {
// Transformations (lazy)
def map[U: ClassTag](f: T => U): RDD[U]
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
def filter(f: T => Boolean): RDD[T]
def distinct(): RDD[T]
def distinct(numPartitions: Int): RDD[T]
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
def union(other: RDD[T]): RDD[T]
def intersection(other: RDD[T]): RDD[T]
def subtract(other: RDD[T]): RDD[T]
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
def zipWithIndex(): RDD[(T, Long)]
def zipWithUniqueId(): RDD[(T, Long)]
def zipPartitions[U: ClassTag, V: ClassTag](rdd2: RDD[U])(f: (Iterator[T], Iterator[U]) => Iterator[V]): RDD[V]
def zipPartitions[U: ClassTag, V: ClassTag, W: ClassTag](rdd2: RDD[U], rdd3: RDD[V])(f: (Iterator[T], Iterator[U], Iterator[V]) => Iterator[W]): RDD[W]
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
def glom(): RDD[Array[T]]
def keyBy[K](f: T => K): RDD[(K, T)]
// Advanced operations
def barrier(): RDDBarrier[T]
def withResources[U: ClassTag](func: Iterator[T] => Iterator[U]): RDD[U]
// Partitioning
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
def repartition(numPartitions: Int): RDD[T]
def partitionBy(partitioner: Partitioner): RDD[T]
// Persistence
def cache(): RDD[T]
def persist(): RDD[T]
def persist(newLevel: StorageLevel): RDD[T]
def unpersist(blocking: Boolean = true): RDD[T]
// Actions (trigger computation)
def collect(): Array[T]
def count(): Long
def first(): T
def take(num: Int): Array[T]
def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
def reduce(f: (T, T) => T): T
def fold(zeroValue: T)(op: (T, T) => T): T
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
def foreach(f: T => Unit): Unit
def foreachPartition(f: Iterator[T] => Unit): Unit
// Information
def partitions: Array[Partition]
def getNumPartitions: Int
def isEmpty(): Boolean
def name: String
def setName(name: String): RDD[T]
}Additional operations available on RDDs of key-value pairs through implicit conversion.
class PairRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V]) {
// Transformations
def keys: RDD[K]
def values: RDD[V]
def mapValues[U](f: V => U): RDD[(K, U)]
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
// Joins
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
// Actions
def countByKey(): Map[K, Long]
def collectAsMap(): Map[K, V]
def lookup(key: K): Seq[V]
// Output
def saveAsTextFile(path: String): Unit
def saveAsSequenceFile(path: String): Unit
}Usage example:
val pairs = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3), ("b", 4)))
// Group by key
val grouped = pairs.groupByKey()
// Result: Array(("a", Iterable(1, 3)), ("b", Iterable(2, 4)))
// Reduce by key
val sums = pairs.reduceByKey(_ + _)
// Result: Array(("a", 4), ("b", 6))
// Join with another RDD
val other = sc.parallelize(Array(("a", "apple"), ("b", "banana")))
val joined = pairs.join(other)
// Result: Array(("a", (1, "apple")), ("a", (3, "apple")), ("b", (2, "banana")), ("b", (4, "banana")))Additional operations available on RDDs of numeric values.
class DoubleRDDFunctions(self: RDD[Double]) {
def sum(): Double
def mean(): Double
def variance(): Double
def sampleVariance(): Double
def stdev(): Double
def sampleStdev(): Double
def stats(): StatCounter
def histogram(buckets: Array[Double]): Array[Long]
def histogram(bucketCount: Int): (Array[Double], Array[Long])
}
class StatCounter extends Serializable {
def count: Long
def mean: Double
def sum: Double
def min: Double
def max: Double
def variance: Double
def sampleVariance: Double
def stdev: Double
def sampleStdev: Double
}Variables that can be shared across cluster nodes efficiently.
Read-only variables cached on each machine rather than shipping a copy with tasks.
abstract class Broadcast[T] extends Serializable {
def value: T
def unpersist(): Unit
def unpersist(blocking: Boolean): Unit
def destroy(): Unit
def destroy(blocking: Boolean): Unit
def id: Long
}Usage example:
// Create broadcast variable
val broadcastMap = sc.broadcast(Map("key1" -> "value1", "key2" -> "value2"))
// Use in RDD operations
val rdd = sc.parallelize(Array("key1", "key2", "key3"))
val result = rdd.map(key => broadcastMap.value.getOrElse(key, "default"))
// Clean up
broadcastMap.unpersist()Variables that can be "added" to from parallel operations and are only readable by the driver.
abstract class AccumulatorV2[IN, OUT] extends Serializable {
def isZero: Boolean
def copy(): AccumulatorV2[IN, OUT]
def reset(): Unit
def add(v: IN): Unit
def merge(other: AccumulatorV2[IN, OUT]): Unit
def value: OUT
def name: Option[String]
}
class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {
def add(v: Long): Unit
def add(v: java.lang.Long): Unit
def count: Long
def sum: Long
def avg: Double
}
class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double] {
def add(v: Double): Unit
def add(v: java.lang.Double): Unit
def count: Long
def sum: Double
def avg: Double
}
class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
def add(v: T): Unit
def value: java.util.List[T]
}Usage example:
// Create accumulators
val counter = sc.longAccumulator("My Counter")
val errors = sc.collectionAccumulator[String]("Error Messages")
// Use in RDD operations
val data = sc.parallelize(1 to 1000)
data.foreach { x =>
counter.add(1)
if (x % 100 == 0) {
errors.add(s"Processed $x items")
}
}
println(s"Processed ${counter.value} items")
println(s"Errors: ${errors.value}")Control how RDDs are stored in memory and/or disk.
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int
) extends Externalizable {
def useDisk: Boolean
def useMemory: Boolean
def useOffHeap: Boolean
def deserialized: Boolean
def replication: Int
}
object StorageLevel {
val NONE: StorageLevel
val DISK_ONLY: StorageLevel
val DISK_ONLY_2: StorageLevel
val MEMORY_ONLY: StorageLevel
val MEMORY_ONLY_2: StorageLevel
val MEMORY_ONLY_SER: StorageLevel
val MEMORY_ONLY_SER_2: StorageLevel
val MEMORY_AND_DISK: StorageLevel
val MEMORY_AND_DISK_2: StorageLevel
val MEMORY_AND_DISK_SER: StorageLevel
val MEMORY_AND_DISK_SER_2: StorageLevel
val OFF_HEAP: StorageLevel
}Usage example:
val rdd = sc.textFile("large-file.txt")
// Cache in memory only
rdd.persist(StorageLevel.MEMORY_ONLY)
// Cache in memory and disk with replication
rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
// Use serialized storage to save memory
rdd.persist(StorageLevel.MEMORY_ONLY_SER)Control how data is distributed across cluster nodes.
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
class HashPartitioner(partitions: Int) extends Partitioner {
def numPartitions: Int
def getPartition(key: Any): Int
def equals(other: Any): Boolean
def hashCode: Int
}
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
ascending: Boolean = true
) extends Partitioner {
def numPartitions: Int
def getPartition(key: Any): Int
}Contextual information and utilities available to running tasks.
abstract class TaskContext extends Serializable {
def isCompleted(): Boolean
def isInterrupted(): Boolean
def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext
def addTaskFailureListener(listener: TaskFailureListener): TaskContext
def stageId(): Int
def stageAttemptNumber(): Int
def partitionId(): Int
def attemptNumber(): Int
def taskAttemptId(): Long
def getLocalProperty(key: String): String
def taskMetrics(): TaskMetrics
def getMetricsSources(sourceName: String): Seq[Source]
}
object TaskContext {
def get(): TaskContext
def getPartitionId(): Int
}Usage example:
val rdd = sc.parallelize(1 to 100, 4)
val result = rdd.mapPartitionsWithIndex { (partitionIndex, iterator) =>
val context = TaskContext.get()
println(s"Processing partition ${context.partitionId()} on stage ${context.stageId()}")
iterator.map(_ * 2)
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-parent-2-13