0
# Python API (PySpark)
1
2
PySpark is the Python API for Apache Spark that allows Python developers to harness the power of Spark's distributed computing capabilities. It provides a Python interface to Spark's core functionality including RDDs, SQL, Streaming, MLlib and GraphX.
3
4
## Core Imports
5
6
```python { .api }
7
from pyspark import SparkContext, SparkConf
8
from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row
9
from pyspark import SparkFiles, StorageLevel
10
```
11
12
## Basic Usage
13
14
### Creating a SparkContext
15
16
```python { .api }
17
from pyspark import SparkContext, SparkConf
18
19
# Using SparkConf (recommended)
20
conf = SparkConf() \
21
.setAppName("My Python App") \
22
.setMaster("local[*]") \
23
.set("spark.executor.memory", "2g")
24
25
sc = SparkContext(conf=conf)
26
27
# Simple constructor
28
sc = SparkContext("local[*]", "My Python App")
29
30
# Remember to stop the context
31
sc.stop()
32
```
33
34
## Capabilities
35
36
### SparkContext
37
38
Main entry point for all Spark functionality.
39
40
```python { .api }
41
class SparkContext(object):
42
def __init__(self, master=None, appName=None, sparkHome=None,
43
pyFiles=None, environment=None, batchSize=1024,
44
serializer=PickleSerializer(), conf=None, gateway=None):
45
"""
46
Create a new SparkContext.
47
48
Args:
49
master: Cluster URL to connect to (e.g. local[4], spark://host:port)
50
appName: A name for your job, to display on cluster web UI
51
sparkHome: Location where Spark is installed on cluster nodes
52
pyFiles: Collection of .zip or .py files to send to cluster
53
environment: Dictionary of environment variables for worker nodes
54
batchSize: Number of Python objects represented as single Java object
55
serializer: The serializer for RDDs
56
conf: A SparkConf object setting Spark properties
57
gateway: Use existing gateway and JVM, otherwise create new JVM
58
"""
59
```
60
61
#### RDD Creation Methods
62
63
**parallelize**: Distribute a local collection to form an RDD
64
```python { .api }
65
def parallelize(self, c, numSlices=None):
66
"""
67
Distribute a local Python collection to form an RDD.
68
69
Args:
70
c: Collection to parallelize (list, tuple, etc.)
71
numSlices: Number of partitions to create (optional)
72
73
Returns:
74
RDD containing the distributed data
75
"""
76
```
77
78
```python
79
data = [1, 2, 3, 4, 5]
80
rdd = sc.parallelize(data) # Use default parallelism
81
rdd_with_partitions = sc.parallelize(data, 4) # Specify 4 partitions
82
```
83
84
**textFile**: Read text files as RDD of strings
85
```python { .api }
86
def textFile(self, name, minPartitions=None):
87
"""
88
Read a text file from HDFS or local filesystem.
89
90
Args:
91
name: Path to text file
92
minPartitions: Minimum number of partitions (optional)
93
94
Returns:
95
RDD where each element is a line from the file
96
"""
97
```
98
99
```python
100
lines = sc.textFile("hdfs://namenode:port/path/to/file.txt")
101
lines_local = sc.textFile("file:///local/path/file.txt")
102
lines_with_partitions = sc.textFile("hdfs://path/to/file.txt", 8)
103
```
104
105
**wholeTextFiles**: Read directory of text files as key-value pairs
106
```python { .api }
107
def wholeTextFiles(self, path, minPartitions=None):
108
"""
109
Read directory of text files as (filename, content) pairs.
110
111
Args:
112
path: Directory path containing text files
113
minPartitions: Minimum number of partitions (optional)
114
115
Returns:
116
RDD of (filename, content) tuples
117
"""
118
```
119
120
#### Shared Variables
121
122
**broadcast**: Create a broadcast variable for read-only data
123
```python { .api }
124
def broadcast(self, value):
125
"""
126
Broadcast a read-only variable to all nodes.
127
128
Args:
129
value: Value to broadcast
130
131
Returns:
132
Broadcast object with .value property
133
"""
134
```
135
136
```python
137
lookup_table = {"apple": 1, "banana": 2, "orange": 3}
138
broadcast_table = sc.broadcast(lookup_table)
139
140
data = sc.parallelize(["apple", "banana", "apple"])
141
mapped = data.map(lambda fruit: broadcast_table.value.get(fruit, 0))
142
```
143
144
**accumulator**: Create an accumulator for aggregating information
145
```python { .api }
146
def accumulator(self, value, accum_param=None):
147
"""
148
Create an accumulator with the given initial value.
149
150
Args:
151
value: Initial value
152
accum_param: AccumulatorParam object (optional)
153
154
Returns:
155
Accumulator object
156
"""
157
```
158
159
```python
160
counter = sc.accumulator(0)
161
162
data = sc.parallelize([1, 2, -1, 4, -5])
163
positive = data.filter(lambda x: x > 0 or counter.add(1))
164
positive.count() # Trigger action
165
print(f"Negative numbers: {counter.value}")
166
```
167
168
#### Job Control
169
170
**setJobGroup**: Assign group ID to jobs
171
```python { .api }
172
def setJobGroup(self, groupId, description, interruptOnCancel=False):
173
"""Set job group for all jobs started by this thread."""
174
```
175
176
**cancelJobGroup**: Cancel all jobs in a group
177
```python { .api }
178
def cancelJobGroup(self, groupId):
179
"""Cancel all jobs associated with a job group."""
180
```
181
182
**cancelAllJobs**: Cancel all scheduled or running jobs
183
```python { .api }
184
def cancelAllJobs(self):
185
"""Cancel all scheduled or running jobs."""
186
```
187
188
#### File Management
189
190
**addFile**: Add a file to be downloaded on every node
191
```python { .api }
192
def addFile(self, path, recursive=False):
193
"""
194
Add a file to be downloaded with this Spark job on every node.
195
196
Args:
197
path: Path to file (local or remote)
198
recursive: Whether to recursively add files in directories
199
"""
200
```
201
202
**addPyFile**: Add a Python file to be distributed
203
```python { .api }
204
def addPyFile(self, path):
205
"""
206
Add a .py or .zip file to be distributed with this Spark job.
207
208
Args:
209
path: Path to Python file or zip archive
210
"""
211
```
212
213
### RDD Operations
214
215
The RDD class provides the fundamental distributed data abstraction.
216
217
```python { .api }
218
class RDD(object):
219
"""
220
Resilient Distributed Dataset - immutable distributed collection.
221
"""
222
```
223
224
#### Transformations (Lazy)
225
226
**map**: Apply function to each element
227
```python { .api }
228
def map(self, f, preservesPartitioning=False):
229
"""
230
Apply a function to each element of the RDD.
231
232
Args:
233
f: Function to apply to each element
234
preservesPartitioning: Whether partitioning should be preserved
235
236
Returns:
237
New RDD with transformed elements
238
"""
239
```
240
241
```python
242
numbers = sc.parallelize([1, 2, 3, 4, 5])
243
squared = numbers.map(lambda x: x * x)
244
# Result: RDD containing [1, 4, 9, 16, 25]
245
```
246
247
**flatMap**: Apply function and flatten results
248
```python { .api }
249
def flatMap(self, f, preservesPartitioning=False):
250
"""
251
Apply function and flatten the results.
252
253
Args:
254
f: Function that returns iterable for each element
255
preservesPartitioning: Whether partitioning should be preserved
256
257
Returns:
258
New RDD with flattened results
259
"""
260
```
261
262
```python
263
lines = sc.parallelize(["hello world", "spark rdd"])
264
words = lines.flatMap(lambda line: line.split(" "))
265
# Result: RDD containing ["hello", "world", "spark", "rdd"]
266
```
267
268
**filter**: Keep elements matching predicate
269
```python { .api }
270
def filter(self, f):
271
"""
272
Filter elements using a predicate function.
273
274
Args:
275
f: Function that returns boolean for each element
276
277
Returns:
278
New RDD containing only matching elements
279
"""
280
```
281
282
**distinct**: Remove duplicate elements
283
```python { .api }
284
def distinct(self, numPartitions=None):
285
"""
286
Remove duplicate elements from RDD.
287
288
Args:
289
numPartitions: Number of partitions in result (optional)
290
291
Returns:
292
New RDD with duplicates removed
293
"""
294
```
295
296
**union**: Combine with another RDD
297
```python { .api }
298
def union(self, other):
299
"""
300
Return union of this RDD and another.
301
302
Args:
303
other: Another RDD of the same type
304
305
Returns:
306
New RDD containing elements from both RDDs
307
"""
308
```
309
310
**intersection**: Find common elements
311
```python { .api }
312
def intersection(self, other, numPartitions=None):
313
"""
314
Return intersection of this RDD and another.
315
316
Args:
317
other: Another RDD
318
numPartitions: Number of partitions in result (optional)
319
320
Returns:
321
New RDD containing common elements
322
"""
323
```
324
325
**sortBy**: Sort elements using key function
326
```python { .api }
327
def sortBy(self, keyfunc, ascending=True, numPartitions=None):
328
"""
329
Sort RDD by the given key function.
330
331
Args:
332
keyfunc: Function to compute sort key for each element
333
ascending: Whether to sort in ascending order
334
numPartitions: Number of partitions in result (optional)
335
336
Returns:
337
New sorted RDD
338
"""
339
```
340
341
#### Actions (Eager)
342
343
**collect**: Return all elements as list
344
```python { .api }
345
def collect(self):
346
"""
347
Return all elements of the RDD as a list.
348
WARNING: Ensure result fits in driver memory.
349
350
Returns:
351
List containing all RDD elements
352
"""
353
```
354
355
**count**: Count number of elements
356
```python { .api }
357
def count(self):
358
"""
359
Count the number of elements in the RDD.
360
361
Returns:
362
Number of elements as integer
363
"""
364
```
365
366
**first**: Return first element
367
```python { .api }
368
def first(self):
369
"""
370
Return the first element of the RDD.
371
372
Returns:
373
First element
374
375
Raises:
376
ValueError: If RDD is empty
377
"""
378
```
379
380
**take**: Return first n elements
381
```python { .api }
382
def take(self, num):
383
"""
384
Return first n elements of the RDD.
385
386
Args:
387
num: Number of elements to return
388
389
Returns:
390
List of first n elements
391
"""
392
```
393
394
**reduce**: Reduce elements using associative function
395
```python { .api }
396
def reduce(self, f):
397
"""
398
Reduce elements using associative and commutative function.
399
400
Args:
401
f: Binary function that takes two parameters of same type
402
403
Returns:
404
Single reduced value
405
"""
406
```
407
408
```python
409
numbers = sc.parallelize([1, 2, 3, 4, 5])
410
sum_result = numbers.reduce(lambda a, b: a + b)
411
# Result: 15
412
```
413
414
**foreach**: Apply function to each element (for side effects)
415
```python { .api }
416
def foreach(self, f):
417
"""
418
Apply function to each element for side effects only.
419
420
Args:
421
f: Function to apply to each element
422
"""
423
```
424
425
#### Persistence Operations
426
427
**cache**: Cache RDD in memory
428
```python { .api }
429
def cache(self):
430
"""
431
Cache this RDD in memory using default storage level.
432
433
Returns:
434
Same RDD for method chaining
435
"""
436
```
437
438
**persist**: Cache with specific storage level
439
```python { .api }
440
def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
441
"""
442
Cache RDD with specified storage level.
443
444
Args:
445
storageLevel: Storage level from StorageLevel class
446
447
Returns:
448
Same RDD for method chaining
449
"""
450
```
451
452
```python
453
from pyspark import StorageLevel
454
455
rdd = sc.textFile("large-file.txt")
456
rdd.persist(StorageLevel.MEMORY_AND_DISK)
457
rdd.cache() # Equivalent to persist(StorageLevel.MEMORY_ONLY)
458
```
459
460
**unpersist**: Remove from cache
461
```python { .api }
462
def unpersist(self, blocking=False):
463
"""
464
Remove this RDD from cache/storage.
465
466
Args:
467
blocking: Whether to block until removal is complete
468
469
Returns:
470
Same RDD for method chaining
471
"""
472
```
473
474
#### Key-Value Operations (PairRDD Functions)
475
476
When RDD contains tuples, additional operations are available:
477
478
**reduceByKey**: Combine values by key
479
```python { .api }
480
def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):
481
"""
482
Combine values with same key using associative function.
483
484
Args:
485
func: Binary function to combine values
486
numPartitions: Number of partitions in result (optional)
487
partitionFunc: Partitioning function (optional)
488
489
Returns:
490
New RDD with combined values per key
491
"""
492
```
493
494
```python
495
pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
496
sums = pairs.reduceByKey(lambda a, b: a + b)
497
# Result: [("a", 4), ("b", 2)]
498
```
499
500
**groupByKey**: Group values by key
501
```python { .api }
502
def groupByKey(self, numPartitions=None, partitionFunc=portable_hash):
503
"""
504
Group values with same key into iterables.
505
506
Args:
507
numPartitions: Number of partitions in result (optional)
508
partitionFunc: Partitioning function (optional)
509
510
Returns:
511
New RDD with grouped values per key
512
"""
513
```
514
515
**mapValues**: Transform values, preserve keys
516
```python { .api }
517
def mapValues(self, f):
518
"""
519
Apply function to values while preserving keys.
520
521
Args:
522
f: Function to apply to each value
523
524
Returns:
525
New RDD with transformed values
526
"""
527
```
528
529
**join**: Inner join on keys
530
```python { .api }
531
def join(self, other, numPartitions=None):
532
"""
533
Inner join with another RDD on keys.
534
535
Args:
536
other: Another pair RDD to join with
537
numPartitions: Number of partitions in result (optional)
538
539
Returns:
540
New RDD with joined key-value pairs
541
"""
542
```
543
544
### SparkConf
545
546
Configuration class for Spark applications.
547
548
```python { .api }
549
class SparkConf(object):
550
"""Configuration for a Spark application."""
551
552
def __init__(self, loadDefaults=True, _jvm=None, _jconf=None):
553
"""Create SparkConf object."""
554
555
def set(self, key, value):
556
"""Set configuration property."""
557
558
def setMaster(self, value):
559
"""Set master URL."""
560
561
def setAppName(self, value):
562
"""Set application name."""
563
564
def setSparkHome(self, value):
565
"""Set Spark installation directory."""
566
567
def setExecutorEnv(self, key=None, value=None, pairs=None):
568
"""Set environment variables for executors."""
569
570
def get(self, key, defaultValue=None):
571
"""Get configuration value."""
572
573
def getAll(self):
574
"""Get all configuration as list of (key, value) pairs."""
575
```
576
577
```python
578
conf = SparkConf() \
579
.setAppName("My Application") \
580
.setMaster("local[4]") \
581
.set("spark.executor.memory", "4g") \
582
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
583
```
584
585
### Broadcast Variables
586
587
Read-only variables distributed to all nodes.
588
589
```python { .api }
590
class Broadcast(object):
591
"""A broadcast variable created with SparkContext.broadcast()."""
592
593
@property
594
def value(self):
595
"""Get the broadcasted value."""
596
597
def unpersist(self, blocking=False):
598
"""Delete cached copies of this broadcast on executors."""
599
600
def destroy(self):
601
"""Destroy all data and metadata related to this broadcast."""
602
```
603
604
### Accumulators
605
606
Shared variables for aggregating information.
607
608
```python { .api }
609
class Accumulator(object):
610
"""Shared variable that can only be added to."""
611
612
def add(self, term):
613
"""Add a term to this accumulator."""
614
615
@property
616
def value(self):
617
"""Get the accumulator's value (only valid on driver)."""
618
```
619
620
### StorageLevel
621
622
Constants for RDD persistence levels.
623
624
```python { .api }
625
class StorageLevel(object):
626
"""Storage levels for persisting RDDs."""
627
628
DISK_ONLY = StorageLevel(True, False, False, False, 1)
629
DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
630
MEMORY_ONLY = StorageLevel(False, True, False, False, 1)
631
MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
632
MEMORY_ONLY_SER = StorageLevel(False, True, False, True, 1)
633
MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, True, 2)
634
MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)
635
MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
636
MEMORY_AND_DISK_SER = StorageLevel(True, True, False, True, 1)
637
MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, True, 2)
638
```
639
640
### SparkFiles
641
642
Utility for accessing files distributed with Spark job.
643
644
```python { .api }
645
class SparkFiles(object):
646
"""Access files distributed via SparkContext.addFile()."""
647
648
@classmethod
649
def get(cls, filename):
650
"""
651
Get path to file added via SparkContext.addFile().
652
653
Args:
654
filename: Name of file to locate
655
656
Returns:
657
Absolute path to file on current node
658
"""
659
660
@classmethod
661
def getRootDirectory(cls):
662
"""
663
Get root directory for files added via addFile().
664
665
Returns:
666
Path to root directory containing distributed files
667
"""
668
```
669
670
## Usage Examples
671
672
### Word Count
673
```python
674
text_file = sc.textFile("hdfs://...")
675
counts = text_file \
676
.flatMap(lambda line: line.split(" ")) \
677
.map(lambda word: (word, 1)) \
678
.reduceByKey(lambda a, b: a + b)
679
680
counts.saveAsTextFile("hdfs://output")
681
```
682
683
### Log Analysis
684
```python
685
log_file = sc.textFile("access.log")
686
errors = log_file.filter(lambda line: "ERROR" in line)
687
error_counts = errors \
688
.map(lambda line: (extract_host(line), 1)) \
689
.reduceByKey(lambda a, b: a + b)
690
691
result = error_counts.collect()
692
```
693
694
### Using Broadcast Variables
695
```python
696
lookup_table = {"user1": "admin", "user2": "guest"}
697
broadcast_lookup = sc.broadcast(lookup_table)
698
699
user_logs = sc.textFile("user_activity.log")
700
enriched_logs = user_logs.map(lambda log: {
701
"log": log,
702
"role": broadcast_lookup.value.get(extract_user(log), "unknown")
703
})
704
```
705
706
### Caching for Performance
707
```python
708
large_dataset = sc.textFile("huge_file.txt")
709
filtered_data = large_dataset.filter(lambda line: "important" in line)
710
711
# Cache the filtered data since we'll use it multiple times
712
filtered_data.cache()
713
714
# Multiple operations on cached data
715
count = filtered_data.count()
716
sample = filtered_data.sample(False, 0.1).collect()
717
unique_words = filtered_data.flatMap(lambda line: line.split()).distinct().count()
718
```
719
720
## Error Handling
721
722
Common exceptions and error patterns in PySpark:
723
724
**Py4JJavaError**: Most common error, indicates Java exception
725
```python
726
try:
727
result = rdd.collect()
728
except Py4JJavaError as e:
729
print(f"Java exception occurred: {e}")
730
```
731
732
**SparkContext Errors**: Only one SparkContext per JVM
733
```python
734
try:
735
sc = SparkContext()
736
except ValueError as e:
737
print("SparkContext already exists")
738
```
739
740
**File Not Found**: When reading non-existent files
741
```python
742
try:
743
rdd = sc.textFile("nonexistent_file.txt")
744
rdd.count() # Error occurs on action, not creation
745
except Exception as e:
746
print(f"File access error: {e}")
747
```
748
749
The Python API provides a Pythonic interface to Spark's distributed computing capabilities while maintaining compatibility with the underlying Scala/Java implementation.