CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-parent-2-13

Apache Spark is a unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R.

Pending
Overview
Eval results
Files

core-engine.mddocs/

Core Engine

The Spark Core Engine provides the fundamental distributed computing capabilities through Resilient Distributed Datasets (RDDs), transformations, actions, and distributed variables. This is the foundation upon which all other Spark components are built.

Package Information

Core engine functionality is available through:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.util.{LongAccumulator, DoubleAccumulator}

Basic Usage

import org.apache.spark.{SparkConf, SparkContext}

// Initialize Spark
val conf = new SparkConf()
  .setAppName("My Spark Application")
  .setMaster("local[*]") // or cluster URL

val sc = new SparkContext(conf)

// Create RDD from collection
val numbers = sc.parallelize(1 to 1000000)

// Transform and compute
val result = numbers
  .filter(_ % 2 == 0)
  .map(_ * 2)
  .reduce(_ + _)

println(s"Result: $result")
sc.stop()

Capabilities

Spark Context

The main entry point for Spark functionality. Represents the connection to a Spark cluster and is used to create RDDs, accumulators, and broadcast variables.

class SparkContext(config: SparkConf) {
  def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
  def parallelize[T: ClassTag](seq: Seq[T]): RDD[T]
  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
  def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
  def sequenceFile[K, V](path: String)(implicit km: ClassTag[K], vm: ClassTag[V]): RDD[(K, V)]
  
  // Distributed variables
  def broadcast[T](value: T): Broadcast[T]
  def longAccumulator(): LongAccumulator
  def longAccumulator(name: String): LongAccumulator
  def doubleAccumulator(): DoubleAccumulator
  def doubleAccumulator(name: String): DoubleAccumulator
  
  // File distribution
  def addFile(path: String): Unit
  def addFile(path: String, recursive: Boolean): Unit
  def addJar(path: String): Unit
  
  // Job and resource management
  def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
  def clearJobGroup(): Unit
  def addJobTag(tag: String): Unit
  def removeJobTag(tag: String): Unit
  def getJobTags(): Set[String]
  def clearJobTags(): Unit
  def requestExecutors(numExecutors: Int): Boolean
  def killExecutors(executorIds: Seq[String]): Boolean
  def getExecutorMemoryStatus: Map[String, (Long, Long)]
  
  // Control
  def stop(): Unit
  def getConf: SparkConf
  def defaultParallelism: Int
  def version: String
}

Usage example:

val conf = new SparkConf().setAppName("MyApp")
val sc = new SparkContext(conf)

// Create RDD from file
val textRDD = sc.textFile("hdfs://path/to/file.txt")

// Create RDD from collection
val numbersRDD = sc.parallelize(Array(1, 2, 3, 4, 5))

// Create broadcast variable
val broadcastVar = sc.broadcast(Map("key1" -> "value1", "key2" -> "value2"))

// Create accumulator
val counter = sc.longAccumulator("MyCounter")

Spark Configuration

Configuration for Spark applications, controlling various aspects of execution.

class SparkConf(loadDefaults: Boolean = true) {
  def set(key: String, value: String): SparkConf
  def setIfMissing(key: String, value: String): SparkConf
  def setAppName(name: String): SparkConf
  def setMaster(master: String): SparkConf
  def setJars(jars: Seq[String]): SparkConf
  def setExecutorEnv(variables: Seq[(String, String)]): SparkConf
  
  def get(key: String): String
  def get(key: String, defaultValue: String): String
  def getOption(key: String): Option[String]
  def getAll: Array[(String, String)]
  def contains(key: String): Boolean
  
  def remove(key: String): SparkConf
  def clone(): SparkConf
}

Usage example:

val conf = new SparkConf()
  .setAppName("My Application")
  .setMaster("yarn")
  .set("spark.executor.memory", "2g")
  .set("spark.executor.cores", "4")
  .set("spark.sql.adaptive.enabled", "true")

Resilient Distributed Datasets (RDDs)

The fundamental data structure in Spark. An RDD is an immutable, distributed collection of objects that can be processed in parallel.

abstract class RDD[T: ClassTag] extends Serializable {
  // Transformations (lazy)
  def map[U: ClassTag](f: T => U): RDD[U]
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
  def filter(f: T => Boolean): RDD[T]
  def distinct(): RDD[T]
  def distinct(numPartitions: Int): RDD[T]
  def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
  def union(other: RDD[T]): RDD[T]
  def intersection(other: RDD[T]): RDD[T]
  def subtract(other: RDD[T]): RDD[T]
  def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
  def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
  def zipWithIndex(): RDD[(T, Long)]
  def zipWithUniqueId(): RDD[(T, Long)]
  def zipPartitions[U: ClassTag, V: ClassTag](rdd2: RDD[U])(f: (Iterator[T], Iterator[U]) => Iterator[V]): RDD[V]
  def zipPartitions[U: ClassTag, V: ClassTag, W: ClassTag](rdd2: RDD[U], rdd3: RDD[V])(f: (Iterator[T], Iterator[U], Iterator[V]) => Iterator[W]): RDD[W]
  def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
  def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
  def glom(): RDD[Array[T]]
  def keyBy[K](f: T => K): RDD[(K, T)]
  
  // Advanced operations
  def barrier(): RDDBarrier[T]
  def withResources[U: ClassTag](func: Iterator[T] => Iterator[U]): RDD[U]
  
  // Partitioning
  def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
  def repartition(numPartitions: Int): RDD[T]
  def partitionBy(partitioner: Partitioner): RDD[T]
  
  // Persistence
  def cache(): RDD[T]
  def persist(): RDD[T]
  def persist(newLevel: StorageLevel): RDD[T]
  def unpersist(blocking: Boolean = true): RDD[T]
  
  // Actions (trigger computation)
  def collect(): Array[T]
  def count(): Long
  def first(): T
  def take(num: Int): Array[T]
  def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
  def top(num: Int)(implicit ord: Ordering[T]): Array[T]
  def reduce(f: (T, T) => T): T
  def fold(zeroValue: T)(op: (T, T) => T): T
  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
  def foreach(f: T => Unit): Unit
  def foreachPartition(f: Iterator[T] => Unit): Unit
  
  // Information
  def partitions: Array[Partition]
  def getNumPartitions: Int
  def isEmpty(): Boolean
  def name: String
  def setName(name: String): RDD[T]
}

Pair RDD Functions

Additional operations available on RDDs of key-value pairs through implicit conversion.

class PairRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V]) {
  // Transformations
  def keys: RDD[K]
  def values: RDD[V]
  def mapValues[U](f: V => U): RDD[(K, U)]
  def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]
  def groupByKey(): RDD[(K, Iterable[V])]
  def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
  def reduceByKey(func: (V, V) => V): RDD[(K, V)]
  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
  def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
  def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
  
  // Joins
  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
  def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
  def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
  def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
  def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
  
  // Actions
  def countByKey(): Map[K, Long]
  def collectAsMap(): Map[K, V]
  def lookup(key: K): Seq[V]
  
  // Output
  def saveAsTextFile(path: String): Unit
  def saveAsSequenceFile(path: String): Unit
}

Usage example:

val pairs = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3), ("b", 4)))

// Group by key
val grouped = pairs.groupByKey()
// Result: Array(("a", Iterable(1, 3)), ("b", Iterable(2, 4)))

// Reduce by key
val sums = pairs.reduceByKey(_ + _)
// Result: Array(("a", 4), ("b", 6))

// Join with another RDD
val other = sc.parallelize(Array(("a", "apple"), ("b", "banana")))
val joined = pairs.join(other)
// Result: Array(("a", (1, "apple")), ("a", (3, "apple")), ("b", (2, "banana")), ("b", (4, "banana")))

Numeric RDD Functions

Additional operations available on RDDs of numeric values.

class DoubleRDDFunctions(self: RDD[Double]) {
  def sum(): Double
  def mean(): Double
  def variance(): Double
  def sampleVariance(): Double
  def stdev(): Double
  def sampleStdev(): Double
  def stats(): StatCounter
  def histogram(buckets: Array[Double]): Array[Long]
  def histogram(bucketCount: Int): (Array[Double], Array[Long])
}

class StatCounter extends Serializable {
  def count: Long
  def mean: Double
  def sum: Double
  def min: Double
  def max: Double
  def variance: Double
  def sampleVariance: Double
  def stdev: Double
  def sampleStdev: Double
}

Distributed Variables

Variables that can be shared across cluster nodes efficiently.

Broadcast Variables

Read-only variables cached on each machine rather than shipping a copy with tasks.

abstract class Broadcast[T] extends Serializable {
  def value: T
  def unpersist(): Unit
  def unpersist(blocking: Boolean): Unit
  def destroy(): Unit
  def destroy(blocking: Boolean): Unit
  def id: Long
}

Usage example:

// Create broadcast variable
val broadcastMap = sc.broadcast(Map("key1" -> "value1", "key2" -> "value2"))

// Use in RDD operations
val rdd = sc.parallelize(Array("key1", "key2", "key3"))
val result = rdd.map(key => broadcastMap.value.getOrElse(key, "default"))

// Clean up
broadcastMap.unpersist()

Accumulators

Variables that can be "added" to from parallel operations and are only readable by the driver.

abstract class AccumulatorV2[IN, OUT] extends Serializable {
  def isZero: Boolean
  def copy(): AccumulatorV2[IN, OUT]
  def reset(): Unit
  def add(v: IN): Unit
  def merge(other: AccumulatorV2[IN, OUT]): Unit
  def value: OUT
  def name: Option[String]
}

class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {
  def add(v: Long): Unit
  def add(v: java.lang.Long): Unit
  def count: Long
  def sum: Long
  def avg: Double
}

class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double] {
  def add(v: Double): Unit
  def add(v: java.lang.Double): Unit
  def count: Long
  def sum: Double
  def avg: Double
}

class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
  def add(v: T): Unit
  def value: java.util.List[T]
}

Usage example:

// Create accumulators
val counter = sc.longAccumulator("My Counter")
val errors = sc.collectionAccumulator[String]("Error Messages")

// Use in RDD operations
val data = sc.parallelize(1 to 1000)
data.foreach { x =>
  counter.add(1)
  if (x % 100 == 0) {
    errors.add(s"Processed $x items")
  }
}

println(s"Processed ${counter.value} items")
println(s"Errors: ${errors.value}")

Storage Levels

Control how RDDs are stored in memory and/or disk.

class StorageLevel private(
  private var _useDisk: Boolean,
  private var _useMemory: Boolean,
  private var _useOffHeap: Boolean,
  private var _deserialized: Boolean,
  private var _replication: Int
) extends Externalizable {
  def useDisk: Boolean
  def useMemory: Boolean
  def useOffHeap: Boolean
  def deserialized: Boolean
  def replication: Int
}

object StorageLevel {
  val NONE: StorageLevel
  val DISK_ONLY: StorageLevel
  val DISK_ONLY_2: StorageLevel
  val MEMORY_ONLY: StorageLevel
  val MEMORY_ONLY_2: StorageLevel
  val MEMORY_ONLY_SER: StorageLevel
  val MEMORY_ONLY_SER_2: StorageLevel
  val MEMORY_AND_DISK: StorageLevel
  val MEMORY_AND_DISK_2: StorageLevel
  val MEMORY_AND_DISK_SER: StorageLevel
  val MEMORY_AND_DISK_SER_2: StorageLevel
  val OFF_HEAP: StorageLevel
}

Usage example:

val rdd = sc.textFile("large-file.txt")

// Cache in memory only
rdd.persist(StorageLevel.MEMORY_ONLY)

// Cache in memory and disk with replication
rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

// Use serialized storage to save memory
rdd.persist(StorageLevel.MEMORY_ONLY_SER)

Partitioning

Control how data is distributed across cluster nodes.

abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

class HashPartitioner(partitions: Int) extends Partitioner {
  def numPartitions: Int
  def getPartition(key: Any): Int
  def equals(other: Any): Boolean
  def hashCode: Int
}

class RangePartitioner[K : Ordering : ClassTag, V](
  partitions: Int,
  rdd: RDD[_ <: Product2[K, V]],
  ascending: Boolean = true
) extends Partitioner {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

Task Context

Contextual information and utilities available to running tasks.

abstract class TaskContext extends Serializable {
  def isCompleted(): Boolean
  def isInterrupted(): Boolean
  def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext
  def addTaskFailureListener(listener: TaskFailureListener): TaskContext
  def stageId(): Int
  def stageAttemptNumber(): Int
  def partitionId(): Int
  def attemptNumber(): Int
  def taskAttemptId(): Long
  def getLocalProperty(key: String): String
  def taskMetrics(): TaskMetrics
  def getMetricsSources(sourceName: String): Seq[Source]
}

object TaskContext {
  def get(): TaskContext
  def getPartitionId(): Int
}

Usage example:

val rdd = sc.parallelize(1 to 100, 4)
val result = rdd.mapPartitionsWithIndex { (partitionIndex, iterator) =>
  val context = TaskContext.get()
  println(s"Processing partition ${context.partitionId()} on stage ${context.stageId()}")
  iterator.map(_ * 2)
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-parent-2-13

docs

core-engine.md

graph-processing.md

index.md

machine-learning.md

sql-dataframes.md

stream-processing.md

tile.json