Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
npx @tessl/cli install tessl/maven-apache-spark@1.0.0Apache Spark is a lightning-fast unified analytics engine for large-scale data processing. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis.
Maven Coordinates:
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.0.0</version>Scala Version: 2.10.x
Java Version: Java 6+
Scala:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._ // for implicit conversionsJava:
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.SparkConf;Python:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
from pyspark import StorageLevel, SparkFilesimport org.apache.spark.{SparkContext, SparkConf}
val conf = new SparkConf()
.setAppName("My Spark Application")
.setMaster("local[*]") // Use all available cores locally
val sc = new SparkContext(conf)
// Remember to stop the context when done
sc.stop()Scala:
// Create an RDD from a collection
val data = Array(1, 2, 3, 4, 5)
val distData: RDD[Int] = sc.parallelize(data)
// Transform and action
val doubled = distData.map(_ * 2)
val result = doubled.collect() // Returns Array(2, 4, 6, 8, 10)Python:
# Create an RDD from a collection
data = [1, 2, 3, 4, 5]
dist_data = sc.parallelize(data)
# Transform and action
doubled = dist_data.map(lambda x: x * 2)
result = doubled.collect() # Returns [2, 4, 6, 8, 10]Spark's core components work together to provide a unified analytics platform:
The main entry point that coordinates distributed processing across a cluster. It creates RDDs, manages shared variables (broadcast variables and accumulators), and controls job execution.
The fundamental data abstraction - an immutable, fault-tolerant collection of elements that can be operated on in parallel. RDDs support two types of operations:
Spark can run on various cluster managers including:
Essential transformations and actions for data processing:
// Transformations (lazy)
rdd.map(f) // Apply function to each element
rdd.filter(f) // Keep elements matching predicate
rdd.flatMap(f) // Apply function and flatten results
rdd.distinct() // Remove duplicates
// Actions (eager)
rdd.collect() // Return all elements as array
rdd.count() // Count number of elements
rdd.reduce(f) // Reduce using associative function
rdd.take(n) // Return first n elementsComprehensive cluster management and RDD creation:
class SparkContext(conf: SparkConf) {
// RDD Creation
def parallelize[T: ClassTag](seq: Seq[T]): RDD[T]
def textFile(path: String): RDD[String]
def hadoopFile[K, V](path: String, ...): RDD[(K, V)]
// Shared Variables
def broadcast[T: ClassTag](value: T): Broadcast[T]
def accumulator[T](initialValue: T): Accumulator[T]
// Job Control
def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U): Array[U]
def stop(): Unit
}Powerful operations on RDDs of (key, value) pairs:
// Import for PairRDDFunctions
import org.apache.spark.SparkContext._
val pairRDD: RDD[(String, Int)] = sc.parallelize(Seq(("a", 1), ("b", 2)))
// Key-value transformations
pairRDD.reduceByKey(_ + _) // Combine values by key
pairRDD.groupByKey() // Group values by key
pairRDD.mapValues(_ * 2) // Transform values, preserve keys
pairRDD.join(otherPairRDD) // Inner join on keysRead and write data from various sources:
// Reading data
sc.textFile("hdfs://path/to/file")
sc.sequenceFile[K, V]("path")
sc.objectFile[T]("path")
sc.hadoopFile[K, V]("path", inputFormat, keyClass, valueClass)
// Writing data
rdd.saveAsTextFile("path")
rdd.saveAsObjectFile("path")
pairRDD.saveAsSequenceFile("path")Optimize performance by caching frequently accessed RDDs:
import org.apache.spark.storage.StorageLevel
rdd.cache() // Cache in memory
rdd.persist(StorageLevel.MEMORY_ONLY) // Explicit storage level
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) // Memory + disk with serialization
rdd.unpersist() // Remove from cacheProcess live data streams in micro-batches:
import org.apache.spark.streaming.{StreamingContext, Seconds}
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("hostname", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()Scalable machine learning algorithms:
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
val data: RDD[LabeledPoint] = sc.parallelize(trainingData)
val model = LogisticRegressionWithSGD.train(data, numIterations = 100)
val predictions = model.predict(testData.map(_.features))Large-scale graph analytics:
import org.apache.spark.graphx._
// Create graph from edge list
val edges: RDD[Edge[Double]] = sc.parallelize(edgeArray)
val graph = Graph.fromEdges(edges, defaultValue = 1.0)
// Run PageRank
val ranks = graph.pageRank(0.0001).verticesStructured data processing with SQL:
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json("path/to/people.json")
df.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")Python interface for Spark with Pythonic APIs:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("My Python App").setMaster("local[*]")
sc = SparkContext(conf=conf)
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
squared = rdd.map(lambda x: x * x)
result = squared.collect()Java-friendly wrappers with proper type safety:
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> lines = jsc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(String::length);cache() or persist() for RDDs accessed multiple timesgroupByKey()Word Count:
val textFile = sc.textFile("hdfs://...")
val counts = textFile
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)Log Analysis:
val logFile = sc.textFile("access.log")
val errors = logFile.filter(_.contains("ERROR"))
val errorsByHost = errors
.map(line => (extractHost(line), 1))
.reduceByKey(_ + _)