0
# Apache Spark
1
2
Apache Spark is a lightning-fast unified analytics engine for large-scale data processing. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis.
3
4
## Package Information
5
6
**Maven Coordinates:**
7
```xml
8
<groupId>org.apache.spark</groupId>
9
<artifactId>spark-core_2.10</artifactId>
10
<version>1.0.0</version>
11
```
12
13
**Scala Version:** 2.10.x
14
**Java Version:** Java 6+
15
16
## Core Imports
17
18
**Scala:**
19
```scala { .api }
20
import org.apache.spark.SparkContext
21
import org.apache.spark.SparkConf
22
import org.apache.spark.rdd.RDD
23
import org.apache.spark.SparkContext._ // for implicit conversions
24
```
25
26
**Java:**
27
```java { .api }
28
import org.apache.spark.api.java.JavaSparkContext;
29
import org.apache.spark.api.java.JavaRDD;
30
import org.apache.spark.api.java.JavaPairRDD;
31
import org.apache.spark.SparkConf;
32
```
33
34
**Python:**
35
```python { .api }
36
from pyspark import SparkContext, SparkConf
37
from pyspark.sql import SQLContext, Row
38
from pyspark import StorageLevel, SparkFiles
39
```
40
41
## Basic Usage
42
43
### Creating a SparkContext
44
45
```scala { .api }
46
import org.apache.spark.{SparkContext, SparkConf}
47
48
val conf = new SparkConf()
49
.setAppName("My Spark Application")
50
.setMaster("local[*]") // Use all available cores locally
51
52
val sc = new SparkContext(conf)
53
54
// Remember to stop the context when done
55
sc.stop()
56
```
57
58
### Simple RDD Operations
59
60
**Scala:**
61
```scala { .api }
62
// Create an RDD from a collection
63
val data = Array(1, 2, 3, 4, 5)
64
val distData: RDD[Int] = sc.parallelize(data)
65
66
// Transform and action
67
val doubled = distData.map(_ * 2)
68
val result = doubled.collect() // Returns Array(2, 4, 6, 8, 10)
69
```
70
71
**Python:**
72
```python { .api }
73
# Create an RDD from a collection
74
data = [1, 2, 3, 4, 5]
75
dist_data = sc.parallelize(data)
76
77
# Transform and action
78
doubled = dist_data.map(lambda x: x * 2)
79
result = doubled.collect() # Returns [2, 4, 6, 8, 10]
80
```
81
82
## Architecture
83
84
Spark's core components work together to provide a unified analytics platform:
85
86
### SparkContext
87
The main entry point that coordinates distributed processing across a cluster. It creates RDDs, manages shared variables (broadcast variables and accumulators), and controls job execution.
88
89
### RDD (Resilient Distributed Dataset)
90
The fundamental data abstraction - an immutable, fault-tolerant collection of elements that can be operated on in parallel. RDDs support two types of operations:
91
- **Transformations**: Create new RDDs (lazy evaluation)
92
- **Actions**: Return values or save data (trigger computation)
93
94
### Cluster Manager
95
Spark can run on various cluster managers including:
96
- Standalone cluster manager
97
- Apache Mesos
98
- Hadoop YARN
99
100
## Core Capabilities
101
102
### [RDD Operations](./core-rdd.md)
103
104
Essential transformations and actions for data processing:
105
106
```scala { .api }
107
// Transformations (lazy)
108
rdd.map(f) // Apply function to each element
109
rdd.filter(f) // Keep elements matching predicate
110
rdd.flatMap(f) // Apply function and flatten results
111
rdd.distinct() // Remove duplicates
112
113
// Actions (eager)
114
rdd.collect() // Return all elements as array
115
rdd.count() // Count number of elements
116
rdd.reduce(f) // Reduce using associative function
117
rdd.take(n) // Return first n elements
118
```
119
120
### [SparkContext API](./spark-context.md)
121
122
Comprehensive cluster management and RDD creation:
123
124
```scala { .api }
125
class SparkContext(conf: SparkConf) {
126
// RDD Creation
127
def parallelize[T: ClassTag](seq: Seq[T]): RDD[T]
128
def textFile(path: String): RDD[String]
129
def hadoopFile[K, V](path: String, ...): RDD[(K, V)]
130
131
// Shared Variables
132
def broadcast[T: ClassTag](value: T): Broadcast[T]
133
def accumulator[T](initialValue: T): Accumulator[T]
134
135
// Job Control
136
def runJob[T, U](rdd: RDD[T], func: Iterator[T] => U): Array[U]
137
def stop(): Unit
138
}
139
```
140
141
### [Key-Value Operations](./key-value-operations.md)
142
143
Powerful operations on RDDs of (key, value) pairs:
144
145
```scala { .api }
146
// Import for PairRDDFunctions
147
import org.apache.spark.SparkContext._
148
149
val pairRDD: RDD[(String, Int)] = sc.parallelize(Seq(("a", 1), ("b", 2)))
150
151
// Key-value transformations
152
pairRDD.reduceByKey(_ + _) // Combine values by key
153
pairRDD.groupByKey() // Group values by key
154
pairRDD.mapValues(_ * 2) // Transform values, preserve keys
155
pairRDD.join(otherPairRDD) // Inner join on keys
156
```
157
158
### [Data Sources](./data-sources.md)
159
160
Read and write data from various sources:
161
162
```scala { .api }
163
// Reading data
164
sc.textFile("hdfs://path/to/file")
165
sc.sequenceFile[K, V]("path")
166
sc.objectFile[T]("path")
167
sc.hadoopFile[K, V]("path", inputFormat, keyClass, valueClass)
168
169
// Writing data
170
rdd.saveAsTextFile("path")
171
rdd.saveAsObjectFile("path")
172
pairRDD.saveAsSequenceFile("path")
173
```
174
175
### [Caching & Persistence](./caching-persistence.md)
176
177
Optimize performance by caching frequently accessed RDDs:
178
179
```scala { .api }
180
import org.apache.spark.storage.StorageLevel
181
182
rdd.cache() // Cache in memory
183
rdd.persist(StorageLevel.MEMORY_ONLY) // Explicit storage level
184
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) // Memory + disk with serialization
185
rdd.unpersist() // Remove from cache
186
```
187
188
### [Streaming](./streaming.md)
189
190
Process live data streams in micro-batches:
191
192
```scala { .api }
193
import org.apache.spark.streaming.{StreamingContext, Seconds}
194
195
val ssc = new StreamingContext(sc, Seconds(1))
196
197
val lines = ssc.socketTextStream("hostname", 9999)
198
val words = lines.flatMap(_.split(" "))
199
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
200
201
wordCounts.print()
202
ssc.start()
203
ssc.awaitTermination()
204
```
205
206
### [Machine Learning (MLlib)](./mllib.md)
207
208
Scalable machine learning algorithms:
209
210
```scala { .api }
211
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
212
import org.apache.spark.mllib.regression.LabeledPoint
213
import org.apache.spark.mllib.linalg.Vectors
214
215
val data: RDD[LabeledPoint] = sc.parallelize(trainingData)
216
val model = LogisticRegressionWithSGD.train(data, numIterations = 100)
217
val predictions = model.predict(testData.map(_.features))
218
```
219
220
### [Graph Processing (GraphX)](./graphx.md)
221
222
Large-scale graph analytics:
223
224
```scala { .api }
225
import org.apache.spark.graphx._
226
227
// Create graph from edge list
228
val edges: RDD[Edge[Double]] = sc.parallelize(edgeArray)
229
val graph = Graph.fromEdges(edges, defaultValue = 1.0)
230
231
// Run PageRank
232
val ranks = graph.pageRank(0.0001).vertices
233
```
234
235
### [SQL](./sql.md)
236
237
Structured data processing with SQL:
238
239
```scala { .api }
240
import org.apache.spark.sql.SQLContext
241
242
val sqlContext = new SQLContext(sc)
243
val df = sqlContext.read.json("path/to/people.json")
244
245
df.registerTempTable("people")
246
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
247
```
248
249
### [Python API (PySpark)](./python-api.md)
250
251
Python interface for Spark with Pythonic APIs:
252
253
```python { .api }
254
from pyspark import SparkContext, SparkConf
255
256
conf = SparkConf().setAppName("My Python App").setMaster("local[*]")
257
sc = SparkContext(conf=conf)
258
259
data = [1, 2, 3, 4, 5]
260
rdd = sc.parallelize(data)
261
squared = rdd.map(lambda x: x * x)
262
result = squared.collect()
263
```
264
265
### [Java API](./java-api.md)
266
267
Java-friendly wrappers with proper type safety:
268
269
```java { .api }
270
import org.apache.spark.api.java.JavaSparkContext;
271
import org.apache.spark.api.java.JavaRDD;
272
273
JavaSparkContext jsc = new JavaSparkContext(conf);
274
JavaRDD<String> lines = jsc.textFile("data.txt");
275
JavaRDD<Integer> lineLengths = lines.map(String::length);
276
```
277
278
## Performance Considerations
279
280
- **Caching**: Use `cache()` or `persist()` for RDDs accessed multiple times
281
- **Partitioning**: Control data partitioning for better performance in key-based operations
282
- **Serialization**: Use Kryo serializer for better performance
283
- **Memory Management**: Configure executor memory and storage levels appropriately
284
- **Shuffle Operations**: Minimize expensive shuffle operations like `groupByKey()`
285
286
## Common Patterns
287
288
**Word Count:**
289
```scala { .api }
290
val textFile = sc.textFile("hdfs://...")
291
val counts = textFile
292
.flatMap(line => line.split(" "))
293
.map(word => (word, 1))
294
.reduceByKey(_ + _)
295
```
296
297
**Log Analysis:**
298
```scala { .api }
299
val logFile = sc.textFile("access.log")
300
val errors = logFile.filter(_.contains("ERROR"))
301
val errorsByHost = errors
302
.map(line => (extractHost(line), 1))
303
.reduceByKey(_ + _)
304
```