CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-parent-2-12

Apache Spark - Unified analytics engine for large-scale data processing

Pending
Overview
Eval results
Files

core.mddocs/

Core Data Processing

Core distributed data processing using Resilient Distributed Datasets (RDDs) and the fundamental Spark execution engine. RDDs provide fault-tolerant, parallel data structures with transformations and actions for large-scale data processing.

Capabilities

SparkContext

Main entry point for Spark functionality, representing the connection to a Spark cluster. Used to create RDDs, broadcast variables, and accumulators.

/**
 * Main entry point for Spark functionality
 * @param config Spark configuration object
 */
class SparkContext(config: SparkConf) {
  /** Create RDD from a collection */
  def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
  /** Create RDD from text file(s) */
  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
  /** Create RDD from whole text files */
  def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
  /** Create RDD from binary files */
  def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]
  /** Create RDD from Hadoop file */
  def hadoopRDD[K, V](
    conf: JobConf,
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int = defaultMinPartitions
  ): RDD[(K, V)]
  /** Create broadcast variable */
  def broadcast[T: ClassTag](value: T): Broadcast[T]
  /** Create accumulator */
  def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]
  /** Set job group for tracking */
  def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
  /** Stop SparkContext */
  def stop(): Unit
}

Usage Examples:

import org.apache.spark.{SparkContext, SparkConf}

val conf = new SparkConf().setAppName("MyApp").setMaster("local[*]")
val sc = new SparkContext(conf)

// Create RDD from collection
val numbersRDD = sc.parallelize(1 to 1000, 4)

// Create RDD from text file
val textRDD = sc.textFile("hdfs://path/to/file.txt")

// Create broadcast variable
val broadcastVar = sc.broadcast(Map("key1" -> "value1", "key2" -> "value2"))

sc.stop()

Python API:

class SparkContext:
    """
    Main entry point for Spark functionality in Python
    """
    def __init__(self, conf: SparkConf = None, master: str = None, appName: str = None)
    def parallelize(self, c: Iterable, numSlices: int = None) -> RDD
    def textFile(self, name: str, minPartitions: int = None, use_unicode: bool = True) -> RDD
    def wholeTextFiles(self, path: str, minPartitions: int = None, use_unicode: bool = True) -> RDD
    def binaryFiles(self, path: str, minPartitions: int = None) -> RDD
    def broadcast(self, value: Any) -> Broadcast
    def accumulator(self, value: Any, accum_param: AccumulatorParam = None) -> Accumulator
    def setJobGroup(self, groupId: str, description: str, interruptOnCancel: bool = False) -> None
    def stop(self) -> None

Python Usage Examples:

from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("MyApp").setMaster("local[*]")
sc = SparkContext(conf=conf)

# Create RDD from collection
numbers_rdd = sc.parallelize(range(1, 1001), 4)

# Create RDD from text file
text_rdd = sc.textFile("hdfs://path/to/file.txt")

# Create broadcast variable
broadcast_var = sc.broadcast({"key1": "value1", "key2": "value2"})

sc.stop()

RDD[T]

Resilient Distributed Dataset - the fundamental data structure of Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.

/**
 * Resilient Distributed Dataset - core abstraction for distributed collections
 */
abstract class RDD[T: ClassTag] {
  /** Transform each element using a function */
  def map[U: ClassTag](f: T => U): RDD[U]
  /** Transform each element to zero or more elements */
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
  /** Keep only elements matching predicate */
  def filter(f: T => Boolean): RDD[T]
  /** Remove duplicates */
  def distinct(numPartitions: Int = partitions.length): RDD[T]
  /** Random sample of elements */
  def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
  /** Union with another RDD */
  def union(other: RDD[T]): RDD[T]
  /** Intersection with another RDD */
  def intersection(other: RDD[T]): RDD[T]
  /** Cartesian product with another RDD */
  def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
  /** Group elements by key function */
  def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
  /** Sort elements */
  def sortBy[K](f: T => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
  
  /** Collect all elements to driver */
  def collect(): Array[T]
  /** Take first n elements */
  def take(num: Int): Array[T]
  /** Get first element */
  def first(): T
  /** Count number of elements */
  def count(): Long
  /** Reduce elements using function */
  def reduce(f: (T, T) => T): T
  /** Fold elements with zero value */
  def fold(zeroValue: T)(op: (T, T) => T): T
  /** Aggregate with different result type */
  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
  
  /** Cache RDD in memory */
  def cache(): RDD[T]
  /** Persist with storage level */
  def persist(newLevel: StorageLevel): RDD[T]
  /** Remove from cache */
  def unpersist(blocking: Boolean = false): RDD[T]
}

Usage Examples:

val data = sc.parallelize(1 to 100, 4)

// Transformations (lazy)
val doubled = data.map(_ * 2)
val filtered = doubled.filter(_ > 50)
val unique = filtered.distinct()

// Actions (trigger computation)
val result = unique.collect()
val count = unique.count()
val first10 = unique.take(10)

// Persist for reuse
unique.cache()

// Complex operations
val grouped = data.groupBy(_ % 10)  // Group by remainder
val aggregated = data.aggregate(0)(_ + _, _ + _)  // Sum all elements

Python RDD API:

class RDD:
    """
    Resilient Distributed Dataset - Python implementation
    """
    def map(self, f: Callable) -> RDD
    def flatMap(self, f: Callable) -> RDD
    def filter(self, f: Callable) -> RDD
    def distinct(self, numPartitions: int = None) -> RDD
    def sample(self, withReplacement: bool, fraction: float, seed: int = None) -> RDD
    def union(self, other: RDD) -> RDD
    def intersection(self, other: RDD) -> RDD
    def cartesian(self, other: RDD) -> RDD
    def groupBy(self, f: Callable, numPartitions: int = None) -> RDD
    def sortBy(self, keyfunc: Callable, ascending: bool = True, numPartitions: int = None) -> RDD
    
    # Actions
    def collect(self) -> List
    def take(self, num: int) -> List
    def first(self) -> Any
    def count(self) -> int
    def reduce(self, f: Callable) -> Any
    def fold(self, zeroValue: Any, op: Callable) -> Any
    def aggregate(self, zeroValue: Any, seqOp: Callable, combOp: Callable) -> Any
    def foreach(self, f: Callable) -> None
    def foreachPartition(self, f: Callable) -> None
    
    # Persistence
    def cache(self) -> RDD
    def persist(self, storageLevel: StorageLevel = None) -> RDD
    def unpersist(self, blocking: bool = False) -> RDD

Python Usage Examples:

data = sc.parallelize(range(1, 101), 4)

# Transformations (lazy)
doubled = data.map(lambda x: x * 2)
filtered = doubled.filter(lambda x: x > 50)
unique = filtered.distinct()

# Actions (trigger computation)
result = unique.collect()
count = unique.count()
first_10 = unique.take(10)

# Persist for reuse
unique.cache()

# Complex operations
grouped = data.groupBy(lambda x: x % 10)  # Group by remainder  
aggregated = data.aggregate(0, lambda acc, x: acc + x, lambda acc1, acc2: acc1 + acc2)  # Sum

PairRDDFunctions

Additional operations available on RDDs of key-value pairs through implicit conversions.

/**
 * Additional operations for RDDs of key-value pairs
 */
implicit class PairRDDFunctions[K, V](self: RDD[(K, V)]) {
  /** Get keys only */
  def keys: RDD[K]
  /** Get values only */
  def values: RDD[V]
  /** Transform values keeping keys */
  def mapValues[U](f: V => U): RDD[(K, U)]
  /** Group values by key */
  def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
  /** Reduce values by key */
  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
  /** Aggregate values by key */
  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
  /** Sort by keys */
  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
  /** Inner join with another pair RDD */
  def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
  /** Left outer join */
  def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
  /** Right outer join */
  def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
  /** Full outer join */
  def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
  /** Cogroup with another pair RDD */
  def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
  /** Save as text file */
  def saveAsTextFile(path: String): Unit
  /** Save as Hadoop sequence file */
  def saveAsSequenceFile(path: String): Unit
}

Usage Examples:

val pairs = sc.parallelize(Seq(("apple", 1), ("banana", 2), ("apple", 3), ("cherry", 1)))

// Basic pair operations
val keys = pairs.keys.collect()  // Array("apple", "banana", "apple", "cherry")
val values = pairs.values.collect()  // Array(1, 2, 3, 1)

// Group and reduce
val grouped = pairs.groupByKey().collect()
val sums = pairs.reduceByKey(_ + _).collect()  // Array(("apple", 4), ("banana", 2), ("cherry", 1))

// Joins
val other = sc.parallelize(Seq(("apple", "red"), ("banana", "yellow")))
val joined = pairs.join(other).collect()

Python Pair RDD Operations:

In Python, RDDs of key-value pairs (tuples) automatically have these operations available:

# Pair RDD operations available on RDD of tuples
class RDD:  # When containing (key, value) tuples
    def keys(self) -> RDD
    def values(self) -> RDD
    def mapValues(self, f: Callable) -> RDD
    def groupByKey(self, numPartitions: int = None) -> RDD
    def reduceByKey(self, func: Callable, numPartitions: int = None) -> RDD
    def aggregateByKey(self, zeroValue: Any, seqFunc: Callable, combFunc: Callable, numPartitions: int = None) -> RDD
    def sortByKey(self, ascending: bool = True, numPartitions: int = None, keyfunc: Callable = None) -> RDD
    def join(self, other: RDD, numPartitions: int = None) -> RDD
    def leftOuterJoin(self, other: RDD, numPartitions: int = None) -> RDD
    def rightOuterJoin(self, other: RDD, numPartitions: int = None) -> RDD
    def fullOuterJoin(self, other: RDD, numPartitions: int = None) -> RDD
    def cogroup(self, other: RDD, numPartitions: int = None) -> RDD
    def saveAsTextFile(self, path: str) -> None

Python Usage Examples:

pairs = sc.parallelize([("apple", 1), ("banana", 2), ("apple", 3), ("cherry", 1)])

# Basic pair operations
keys = pairs.keys().collect()  # ['apple', 'banana', 'apple', 'cherry']
values = pairs.values().collect()  # [1, 2, 3, 1]

# Group and reduce
grouped = pairs.groupByKey().collect()
sums = pairs.reduceByKey(lambda a, b: a + b).collect()  # [('apple', 4), ('banana', 2), ('cherry', 1)]

# Joins
other = sc.parallelize([("apple", "red"), ("banana", "yellow")])
joined = pairs.join(other).collect()

Configuration

Configuration management for Spark applications.

/**
 * Configuration for Spark applications
 * @param loadDefaults whether to load default configurations
 */
class SparkConf(loadDefaults: Boolean = true) {
  /** Set configuration property */
  def set(key: String, value: String): SparkConf
  /** Set master URL */
  def setMaster(master: String): SparkConf
  /** Set application name */
  def setAppName(name: String): SparkConf
  /** Set JAR files */
  def setJars(jars: Seq[String]): SparkConf
  /** Set executor environment variable */
  def setExecutorEnv(variable: String, value: String): SparkConf
  /** Set Spark home directory */
  def setSparkHome(home: String): SparkConf
  /** Set all properties from iterable */
  def setAll(settings: Iterable[(String, String)]): SparkConf
  /** Get configuration value */
  def get(key: String): String
  /** Get configuration value with default */
  def get(key: String, defaultValue: String): String
  /** Get boolean configuration value */
  def getBoolean(key: String, defaultValue: Boolean): Boolean
}

Shared Variables

Variables that can be shared across cluster nodes efficiently.

/**
 * Broadcast variable for efficient data sharing
 */
abstract class Broadcast[T: ClassTag] {
  /** Access broadcast value */
  def value: T
  /** Remove from executors but keep in driver */
  def unpersist(): Unit
  /** Destroy completely */
  def destroy(): Unit
}

/**
 * Accumulator for collecting information from executors
 */
class Accumulator[T] {
  /** Get current value (driver only) */
  def value: T
  /** Add to accumulator */
  def +=(term: T): Unit
  /** Add to accumulator (alternative syntax) */
  def add(term: T): Unit
}

Storage Levels

Storage levels for RDD persistence.

/**
 * Storage levels for RDD caching and persistence
 */
object StorageLevel {
  val NONE: StorageLevel
  val DISK_ONLY: StorageLevel
  val DISK_ONLY_2: StorageLevel
  val MEMORY_ONLY: StorageLevel
  val MEMORY_ONLY_2: StorageLevel
  val MEMORY_ONLY_SER: StorageLevel
  val MEMORY_ONLY_SER_2: StorageLevel
  val MEMORY_AND_DISK: StorageLevel
  val MEMORY_AND_DISK_2: StorageLevel
  val MEMORY_AND_DISK_SER: StorageLevel
  val MEMORY_AND_DISK_SER_2: StorageLevel
  val OFF_HEAP: StorageLevel
}

Usage Examples:

import org.apache.spark.storage.StorageLevel

val rdd = sc.parallelize(1 to 1000000)

// Different persistence levels
rdd.persist(StorageLevel.MEMORY_ONLY)
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd.persist(StorageLevel.DISK_ONLY_2)  // 2x replication

// Shorthand for memory-only
rdd.cache()

Error Handling

Common exceptions in core Spark operations:

  • SparkException - General Spark runtime exceptions
  • TaskFailedException - Task execution failures
  • TaskKilledException - Task cancellation
  • SparkOutOfMemoryError - Memory-related errors

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-parent-2-12

docs

core.md

deployment.md

graphx.md

index.md

ml.md

sql.md

streaming.md

tile.json