Apache Spark - Unified analytics engine for large-scale data processing
—
Core distributed data processing using Resilient Distributed Datasets (RDDs) and the fundamental Spark execution engine. RDDs provide fault-tolerant, parallel data structures with transformations and actions for large-scale data processing.
Main entry point for Spark functionality, representing the connection to a Spark cluster. Used to create RDDs, broadcast variables, and accumulators.
/**
* Main entry point for Spark functionality
* @param config Spark configuration object
*/
class SparkContext(config: SparkConf) {
/** Create RDD from a collection */
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
/** Create RDD from text file(s) */
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
/** Create RDD from whole text files */
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
/** Create RDD from binary files */
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]
/** Create RDD from Hadoop file */
def hadoopRDD[K, V](
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)]
/** Create broadcast variable */
def broadcast[T: ClassTag](value: T): Broadcast[T]
/** Create accumulator */
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]
/** Set job group for tracking */
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
/** Stop SparkContext */
def stop(): Unit
}Usage Examples:
import org.apache.spark.{SparkContext, SparkConf}
val conf = new SparkConf().setAppName("MyApp").setMaster("local[*]")
val sc = new SparkContext(conf)
// Create RDD from collection
val numbersRDD = sc.parallelize(1 to 1000, 4)
// Create RDD from text file
val textRDD = sc.textFile("hdfs://path/to/file.txt")
// Create broadcast variable
val broadcastVar = sc.broadcast(Map("key1" -> "value1", "key2" -> "value2"))
sc.stop()Python API:
class SparkContext:
"""
Main entry point for Spark functionality in Python
"""
def __init__(self, conf: SparkConf = None, master: str = None, appName: str = None)
def parallelize(self, c: Iterable, numSlices: int = None) -> RDD
def textFile(self, name: str, minPartitions: int = None, use_unicode: bool = True) -> RDD
def wholeTextFiles(self, path: str, minPartitions: int = None, use_unicode: bool = True) -> RDD
def binaryFiles(self, path: str, minPartitions: int = None) -> RDD
def broadcast(self, value: Any) -> Broadcast
def accumulator(self, value: Any, accum_param: AccumulatorParam = None) -> Accumulator
def setJobGroup(self, groupId: str, description: str, interruptOnCancel: bool = False) -> None
def stop(self) -> NonePython Usage Examples:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("MyApp").setMaster("local[*]")
sc = SparkContext(conf=conf)
# Create RDD from collection
numbers_rdd = sc.parallelize(range(1, 1001), 4)
# Create RDD from text file
text_rdd = sc.textFile("hdfs://path/to/file.txt")
# Create broadcast variable
broadcast_var = sc.broadcast({"key1": "value1", "key2": "value2"})
sc.stop()Resilient Distributed Dataset - the fundamental data structure of Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.
/**
* Resilient Distributed Dataset - core abstraction for distributed collections
*/
abstract class RDD[T: ClassTag] {
/** Transform each element using a function */
def map[U: ClassTag](f: T => U): RDD[U]
/** Transform each element to zero or more elements */
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
/** Keep only elements matching predicate */
def filter(f: T => Boolean): RDD[T]
/** Remove duplicates */
def distinct(numPartitions: Int = partitions.length): RDD[T]
/** Random sample of elements */
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
/** Union with another RDD */
def union(other: RDD[T]): RDD[T]
/** Intersection with another RDD */
def intersection(other: RDD[T]): RDD[T]
/** Cartesian product with another RDD */
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
/** Group elements by key function */
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
/** Sort elements */
def sortBy[K](f: T => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
/** Collect all elements to driver */
def collect(): Array[T]
/** Take first n elements */
def take(num: Int): Array[T]
/** Get first element */
def first(): T
/** Count number of elements */
def count(): Long
/** Reduce elements using function */
def reduce(f: (T, T) => T): T
/** Fold elements with zero value */
def fold(zeroValue: T)(op: (T, T) => T): T
/** Aggregate with different result type */
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
/** Cache RDD in memory */
def cache(): RDD[T]
/** Persist with storage level */
def persist(newLevel: StorageLevel): RDD[T]
/** Remove from cache */
def unpersist(blocking: Boolean = false): RDD[T]
}Usage Examples:
val data = sc.parallelize(1 to 100, 4)
// Transformations (lazy)
val doubled = data.map(_ * 2)
val filtered = doubled.filter(_ > 50)
val unique = filtered.distinct()
// Actions (trigger computation)
val result = unique.collect()
val count = unique.count()
val first10 = unique.take(10)
// Persist for reuse
unique.cache()
// Complex operations
val grouped = data.groupBy(_ % 10) // Group by remainder
val aggregated = data.aggregate(0)(_ + _, _ + _) // Sum all elementsPython RDD API:
class RDD:
"""
Resilient Distributed Dataset - Python implementation
"""
def map(self, f: Callable) -> RDD
def flatMap(self, f: Callable) -> RDD
def filter(self, f: Callable) -> RDD
def distinct(self, numPartitions: int = None) -> RDD
def sample(self, withReplacement: bool, fraction: float, seed: int = None) -> RDD
def union(self, other: RDD) -> RDD
def intersection(self, other: RDD) -> RDD
def cartesian(self, other: RDD) -> RDD
def groupBy(self, f: Callable, numPartitions: int = None) -> RDD
def sortBy(self, keyfunc: Callable, ascending: bool = True, numPartitions: int = None) -> RDD
# Actions
def collect(self) -> List
def take(self, num: int) -> List
def first(self) -> Any
def count(self) -> int
def reduce(self, f: Callable) -> Any
def fold(self, zeroValue: Any, op: Callable) -> Any
def aggregate(self, zeroValue: Any, seqOp: Callable, combOp: Callable) -> Any
def foreach(self, f: Callable) -> None
def foreachPartition(self, f: Callable) -> None
# Persistence
def cache(self) -> RDD
def persist(self, storageLevel: StorageLevel = None) -> RDD
def unpersist(self, blocking: bool = False) -> RDDPython Usage Examples:
data = sc.parallelize(range(1, 101), 4)
# Transformations (lazy)
doubled = data.map(lambda x: x * 2)
filtered = doubled.filter(lambda x: x > 50)
unique = filtered.distinct()
# Actions (trigger computation)
result = unique.collect()
count = unique.count()
first_10 = unique.take(10)
# Persist for reuse
unique.cache()
# Complex operations
grouped = data.groupBy(lambda x: x % 10) # Group by remainder
aggregated = data.aggregate(0, lambda acc, x: acc + x, lambda acc1, acc2: acc1 + acc2) # SumAdditional operations available on RDDs of key-value pairs through implicit conversions.
/**
* Additional operations for RDDs of key-value pairs
*/
implicit class PairRDDFunctions[K, V](self: RDD[(K, V)]) {
/** Get keys only */
def keys: RDD[K]
/** Get values only */
def values: RDD[V]
/** Transform values keeping keys */
def mapValues[U](f: V => U): RDD[(K, U)]
/** Group values by key */
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
/** Reduce values by key */
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
/** Aggregate values by key */
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
/** Sort by keys */
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
/** Inner join with another pair RDD */
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
/** Left outer join */
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
/** Right outer join */
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
/** Full outer join */
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
/** Cogroup with another pair RDD */
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
/** Save as text file */
def saveAsTextFile(path: String): Unit
/** Save as Hadoop sequence file */
def saveAsSequenceFile(path: String): Unit
}Usage Examples:
val pairs = sc.parallelize(Seq(("apple", 1), ("banana", 2), ("apple", 3), ("cherry", 1)))
// Basic pair operations
val keys = pairs.keys.collect() // Array("apple", "banana", "apple", "cherry")
val values = pairs.values.collect() // Array(1, 2, 3, 1)
// Group and reduce
val grouped = pairs.groupByKey().collect()
val sums = pairs.reduceByKey(_ + _).collect() // Array(("apple", 4), ("banana", 2), ("cherry", 1))
// Joins
val other = sc.parallelize(Seq(("apple", "red"), ("banana", "yellow")))
val joined = pairs.join(other).collect()Python Pair RDD Operations:
In Python, RDDs of key-value pairs (tuples) automatically have these operations available:
# Pair RDD operations available on RDD of tuples
class RDD: # When containing (key, value) tuples
def keys(self) -> RDD
def values(self) -> RDD
def mapValues(self, f: Callable) -> RDD
def groupByKey(self, numPartitions: int = None) -> RDD
def reduceByKey(self, func: Callable, numPartitions: int = None) -> RDD
def aggregateByKey(self, zeroValue: Any, seqFunc: Callable, combFunc: Callable, numPartitions: int = None) -> RDD
def sortByKey(self, ascending: bool = True, numPartitions: int = None, keyfunc: Callable = None) -> RDD
def join(self, other: RDD, numPartitions: int = None) -> RDD
def leftOuterJoin(self, other: RDD, numPartitions: int = None) -> RDD
def rightOuterJoin(self, other: RDD, numPartitions: int = None) -> RDD
def fullOuterJoin(self, other: RDD, numPartitions: int = None) -> RDD
def cogroup(self, other: RDD, numPartitions: int = None) -> RDD
def saveAsTextFile(self, path: str) -> NonePython Usage Examples:
pairs = sc.parallelize([("apple", 1), ("banana", 2), ("apple", 3), ("cherry", 1)])
# Basic pair operations
keys = pairs.keys().collect() # ['apple', 'banana', 'apple', 'cherry']
values = pairs.values().collect() # [1, 2, 3, 1]
# Group and reduce
grouped = pairs.groupByKey().collect()
sums = pairs.reduceByKey(lambda a, b: a + b).collect() # [('apple', 4), ('banana', 2), ('cherry', 1)]
# Joins
other = sc.parallelize([("apple", "red"), ("banana", "yellow")])
joined = pairs.join(other).collect()Configuration management for Spark applications.
/**
* Configuration for Spark applications
* @param loadDefaults whether to load default configurations
*/
class SparkConf(loadDefaults: Boolean = true) {
/** Set configuration property */
def set(key: String, value: String): SparkConf
/** Set master URL */
def setMaster(master: String): SparkConf
/** Set application name */
def setAppName(name: String): SparkConf
/** Set JAR files */
def setJars(jars: Seq[String]): SparkConf
/** Set executor environment variable */
def setExecutorEnv(variable: String, value: String): SparkConf
/** Set Spark home directory */
def setSparkHome(home: String): SparkConf
/** Set all properties from iterable */
def setAll(settings: Iterable[(String, String)]): SparkConf
/** Get configuration value */
def get(key: String): String
/** Get configuration value with default */
def get(key: String, defaultValue: String): String
/** Get boolean configuration value */
def getBoolean(key: String, defaultValue: Boolean): Boolean
}Variables that can be shared across cluster nodes efficiently.
/**
* Broadcast variable for efficient data sharing
*/
abstract class Broadcast[T: ClassTag] {
/** Access broadcast value */
def value: T
/** Remove from executors but keep in driver */
def unpersist(): Unit
/** Destroy completely */
def destroy(): Unit
}
/**
* Accumulator for collecting information from executors
*/
class Accumulator[T] {
/** Get current value (driver only) */
def value: T
/** Add to accumulator */
def +=(term: T): Unit
/** Add to accumulator (alternative syntax) */
def add(term: T): Unit
}Storage levels for RDD persistence.
/**
* Storage levels for RDD caching and persistence
*/
object StorageLevel {
val NONE: StorageLevel
val DISK_ONLY: StorageLevel
val DISK_ONLY_2: StorageLevel
val MEMORY_ONLY: StorageLevel
val MEMORY_ONLY_2: StorageLevel
val MEMORY_ONLY_SER: StorageLevel
val MEMORY_ONLY_SER_2: StorageLevel
val MEMORY_AND_DISK: StorageLevel
val MEMORY_AND_DISK_2: StorageLevel
val MEMORY_AND_DISK_SER: StorageLevel
val MEMORY_AND_DISK_SER_2: StorageLevel
val OFF_HEAP: StorageLevel
}Usage Examples:
import org.apache.spark.storage.StorageLevel
val rdd = sc.parallelize(1 to 1000000)
// Different persistence levels
rdd.persist(StorageLevel.MEMORY_ONLY)
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd.persist(StorageLevel.DISK_ONLY_2) // 2x replication
// Shorthand for memory-only
rdd.cache()Common exceptions in core Spark operations:
SparkException - General Spark runtime exceptionsTaskFailedException - Task execution failuresTaskKilledException - Task cancellationSparkOutOfMemoryError - Memory-related errorsInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-parent-2-12