Spec RegistrySpec Registry

Help your agents use open-source better. Learn more.

Find usage specs for your project’s dependencies

>

maven-apache-spark

Description
Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
Author
tessl
Last updated

How to use

npx @tessl/cli registry install tessl/maven-apache-spark@1.0.0

python-api.md docs/

1
# Python API (PySpark)
2
3
PySpark is the Python API for Apache Spark that allows Python developers to harness the power of Spark's distributed computing capabilities. It provides a Python interface to Spark's core functionality including RDDs, SQL, Streaming, MLlib and GraphX.
4
5
## Core Imports
6
7
```python { .api }
8
from pyspark import SparkContext, SparkConf
9
from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row
10
from pyspark import SparkFiles, StorageLevel
11
```
12
13
## Basic Usage
14
15
### Creating a SparkContext
16
17
```python { .api }
18
from pyspark import SparkContext, SparkConf
19
20
# Using SparkConf (recommended)
21
conf = SparkConf() \
22
.setAppName("My Python App") \
23
.setMaster("local[*]") \
24
.set("spark.executor.memory", "2g")
25
26
sc = SparkContext(conf=conf)
27
28
# Simple constructor
29
sc = SparkContext("local[*]", "My Python App")
30
31
# Remember to stop the context
32
sc.stop()
33
```
34
35
## Capabilities
36
37
### SparkContext
38
39
Main entry point for all Spark functionality.
40
41
```python { .api }
42
class SparkContext(object):
43
def __init__(self, master=None, appName=None, sparkHome=None,
44
pyFiles=None, environment=None, batchSize=1024,
45
serializer=PickleSerializer(), conf=None, gateway=None):
46
"""
47
Create a new SparkContext.
48
49
Args:
50
master: Cluster URL to connect to (e.g. local[4], spark://host:port)
51
appName: A name for your job, to display on cluster web UI
52
sparkHome: Location where Spark is installed on cluster nodes
53
pyFiles: Collection of .zip or .py files to send to cluster
54
environment: Dictionary of environment variables for worker nodes
55
batchSize: Number of Python objects represented as single Java object
56
serializer: The serializer for RDDs
57
conf: A SparkConf object setting Spark properties
58
gateway: Use existing gateway and JVM, otherwise create new JVM
59
"""
60
```
61
62
#### RDD Creation Methods
63
64
**parallelize**: Distribute a local collection to form an RDD
65
```python { .api }
66
def parallelize(self, c, numSlices=None):
67
"""
68
Distribute a local Python collection to form an RDD.
69
70
Args:
71
c: Collection to parallelize (list, tuple, etc.)
72
numSlices: Number of partitions to create (optional)
73
74
Returns:
75
RDD containing the distributed data
76
"""
77
```
78
79
```python
80
data = [1, 2, 3, 4, 5]
81
rdd = sc.parallelize(data) # Use default parallelism
82
rdd_with_partitions = sc.parallelize(data, 4) # Specify 4 partitions
83
```
84
85
**textFile**: Read text files as RDD of strings
86
```python { .api }
87
def textFile(self, name, minPartitions=None):
88
"""
89
Read a text file from HDFS or local filesystem.
90
91
Args:
92
name: Path to text file
93
minPartitions: Minimum number of partitions (optional)
94
95
Returns:
96
RDD where each element is a line from the file
97
"""
98
```
99
100
```python
101
lines = sc.textFile("hdfs://namenode:port/path/to/file.txt")
102
lines_local = sc.textFile("file:///local/path/file.txt")
103
lines_with_partitions = sc.textFile("hdfs://path/to/file.txt", 8)
104
```
105
106
**wholeTextFiles**: Read directory of text files as key-value pairs
107
```python { .api }
108
def wholeTextFiles(self, path, minPartitions=None):
109
"""
110
Read directory of text files as (filename, content) pairs.
111
112
Args:
113
path: Directory path containing text files
114
minPartitions: Minimum number of partitions (optional)
115
116
Returns:
117
RDD of (filename, content) tuples
118
"""
119
```
120
121
#### Shared Variables
122
123
**broadcast**: Create a broadcast variable for read-only data
124
```python { .api }
125
def broadcast(self, value):
126
"""
127
Broadcast a read-only variable to all nodes.
128
129
Args:
130
value: Value to broadcast
131
132
Returns:
133
Broadcast object with .value property
134
"""
135
```
136
137
```python
138
lookup_table = {"apple": 1, "banana": 2, "orange": 3}
139
broadcast_table = sc.broadcast(lookup_table)
140
141
data = sc.parallelize(["apple", "banana", "apple"])
142
mapped = data.map(lambda fruit: broadcast_table.value.get(fruit, 0))
143
```
144
145
**accumulator**: Create an accumulator for aggregating information
146
```python { .api }
147
def accumulator(self, value, accum_param=None):
148
"""
149
Create an accumulator with the given initial value.
150
151
Args:
152
value: Initial value
153
accum_param: AccumulatorParam object (optional)
154
155
Returns:
156
Accumulator object
157
"""
158
```
159
160
```python
161
counter = sc.accumulator(0)
162
163
data = sc.parallelize([1, 2, -1, 4, -5])
164
positive = data.filter(lambda x: x > 0 or counter.add(1))
165
positive.count() # Trigger action
166
print(f"Negative numbers: {counter.value}")
167
```
168
169
#### Job Control
170
171
**setJobGroup**: Assign group ID to jobs
172
```python { .api }
173
def setJobGroup(self, groupId, description, interruptOnCancel=False):
174
"""Set job group for all jobs started by this thread."""
175
```
176
177
**cancelJobGroup**: Cancel all jobs in a group
178
```python { .api }
179
def cancelJobGroup(self, groupId):
180
"""Cancel all jobs associated with a job group."""
181
```
182
183
**cancelAllJobs**: Cancel all scheduled or running jobs
184
```python { .api }
185
def cancelAllJobs(self):
186
"""Cancel all scheduled or running jobs."""
187
```
188
189
#### File Management
190
191
**addFile**: Add a file to be downloaded on every node
192
```python { .api }
193
def addFile(self, path, recursive=False):
194
"""
195
Add a file to be downloaded with this Spark job on every node.
196
197
Args:
198
path: Path to file (local or remote)
199
recursive: Whether to recursively add files in directories
200
"""
201
```
202
203
**addPyFile**: Add a Python file to be distributed
204
```python { .api }
205
def addPyFile(self, path):
206
"""
207
Add a .py or .zip file to be distributed with this Spark job.
208
209
Args:
210
path: Path to Python file or zip archive
211
"""
212
```
213
214
### RDD Operations
215
216
The RDD class provides the fundamental distributed data abstraction.
217
218
```python { .api }
219
class RDD(object):
220
"""
221
Resilient Distributed Dataset - immutable distributed collection.
222
"""
223
```
224
225
#### Transformations (Lazy)
226
227
**map**: Apply function to each element
228
```python { .api }
229
def map(self, f, preservesPartitioning=False):
230
"""
231
Apply a function to each element of the RDD.
232
233
Args:
234
f: Function to apply to each element
235
preservesPartitioning: Whether partitioning should be preserved
236
237
Returns:
238
New RDD with transformed elements
239
"""
240
```
241
242
```python
243
numbers = sc.parallelize([1, 2, 3, 4, 5])
244
squared = numbers.map(lambda x: x * x)
245
# Result: RDD containing [1, 4, 9, 16, 25]
246
```
247
248
**flatMap**: Apply function and flatten results
249
```python { .api }
250
def flatMap(self, f, preservesPartitioning=False):
251
"""
252
Apply function and flatten the results.
253
254
Args:
255
f: Function that returns iterable for each element
256
preservesPartitioning: Whether partitioning should be preserved
257
258
Returns:
259
New RDD with flattened results
260
"""
261
```
262
263
```python
264
lines = sc.parallelize(["hello world", "spark rdd"])
265
words = lines.flatMap(lambda line: line.split(" "))
266
# Result: RDD containing ["hello", "world", "spark", "rdd"]
267
```
268
269
**filter**: Keep elements matching predicate
270
```python { .api }
271
def filter(self, f):
272
"""
273
Filter elements using a predicate function.
274
275
Args:
276
f: Function that returns boolean for each element
277
278
Returns:
279
New RDD containing only matching elements
280
"""
281
```
282
283
**distinct**: Remove duplicate elements
284
```python { .api }
285
def distinct(self, numPartitions=None):
286
"""
287
Remove duplicate elements from RDD.
288
289
Args:
290
numPartitions: Number of partitions in result (optional)
291
292
Returns:
293
New RDD with duplicates removed
294
"""
295
```
296
297
**union**: Combine with another RDD
298
```python { .api }
299
def union(self, other):
300
"""
301
Return union of this RDD and another.
302
303
Args:
304
other: Another RDD of the same type
305
306
Returns:
307
New RDD containing elements from both RDDs
308
"""
309
```
310
311
**intersection**: Find common elements
312
```python { .api }
313
def intersection(self, other, numPartitions=None):
314
"""
315
Return intersection of this RDD and another.
316
317
Args:
318
other: Another RDD
319
numPartitions: Number of partitions in result (optional)
320
321
Returns:
322
New RDD containing common elements
323
"""
324
```
325
326
**sortBy**: Sort elements using key function
327
```python { .api }
328
def sortBy(self, keyfunc, ascending=True, numPartitions=None):
329
"""
330
Sort RDD by the given key function.
331
332
Args:
333
keyfunc: Function to compute sort key for each element
334
ascending: Whether to sort in ascending order
335
numPartitions: Number of partitions in result (optional)
336
337
Returns:
338
New sorted RDD
339
"""
340
```
341
342
#### Actions (Eager)
343
344
**collect**: Return all elements as list
345
```python { .api }
346
def collect(self):
347
"""
348
Return all elements of the RDD as a list.
349
WARNING: Ensure result fits in driver memory.
350
351
Returns:
352
List containing all RDD elements
353
"""
354
```
355
356
**count**: Count number of elements
357
```python { .api }
358
def count(self):
359
"""
360
Count the number of elements in the RDD.
361
362
Returns:
363
Number of elements as integer
364
"""
365
```
366
367
**first**: Return first element
368
```python { .api }
369
def first(self):
370
"""
371
Return the first element of the RDD.
372
373
Returns:
374
First element
375
376
Raises:
377
ValueError: If RDD is empty
378
"""
379
```
380
381
**take**: Return first n elements
382
```python { .api }
383
def take(self, num):
384
"""
385
Return first n elements of the RDD.
386
387
Args:
388
num: Number of elements to return
389
390
Returns:
391
List of first n elements
392
"""
393
```
394
395
**reduce**: Reduce elements using associative function
396
```python { .api }
397
def reduce(self, f):
398
"""
399
Reduce elements using associative and commutative function.
400
401
Args:
402
f: Binary function that takes two parameters of same type
403
404
Returns:
405
Single reduced value
406
"""
407
```
408
409
```python
410
numbers = sc.parallelize([1, 2, 3, 4, 5])
411
sum_result = numbers.reduce(lambda a, b: a + b)
412
# Result: 15
413
```
414
415
**foreach**: Apply function to each element (for side effects)
416
```python { .api }
417
def foreach(self, f):
418
"""
419
Apply function to each element for side effects only.
420
421
Args:
422
f: Function to apply to each element
423
"""
424
```
425
426
#### Persistence Operations
427
428
**cache**: Cache RDD in memory
429
```python { .api }
430
def cache(self):
431
"""
432
Cache this RDD in memory using default storage level.
433
434
Returns:
435
Same RDD for method chaining
436
"""
437
```
438
439
**persist**: Cache with specific storage level
440
```python { .api }
441
def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
442
"""
443
Cache RDD with specified storage level.
444
445
Args:
446
storageLevel: Storage level from StorageLevel class
447
448
Returns:
449
Same RDD for method chaining
450
"""
451
```
452
453
```python
454
from pyspark import StorageLevel
455
456
rdd = sc.textFile("large-file.txt")
457
rdd.persist(StorageLevel.MEMORY_AND_DISK)
458
rdd.cache() # Equivalent to persist(StorageLevel.MEMORY_ONLY)
459
```
460
461
**unpersist**: Remove from cache
462
```python { .api }
463
def unpersist(self, blocking=False):
464
"""
465
Remove this RDD from cache/storage.
466
467
Args:
468
blocking: Whether to block until removal is complete
469
470
Returns:
471
Same RDD for method chaining
472
"""
473
```
474
475
#### Key-Value Operations (PairRDD Functions)
476
477
When RDD contains tuples, additional operations are available:
478
479
**reduceByKey**: Combine values by key
480
```python { .api }
481
def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):
482
"""
483
Combine values with same key using associative function.
484
485
Args:
486
func: Binary function to combine values
487
numPartitions: Number of partitions in result (optional)
488
partitionFunc: Partitioning function (optional)
489
490
Returns:
491
New RDD with combined values per key
492
"""
493
```
494
495
```python
496
pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
497
sums = pairs.reduceByKey(lambda a, b: a + b)
498
# Result: [("a", 4), ("b", 2)]
499
```
500
501
**groupByKey**: Group values by key
502
```python { .api }
503
def groupByKey(self, numPartitions=None, partitionFunc=portable_hash):
504
"""
505
Group values with same key into iterables.
506
507
Args:
508
numPartitions: Number of partitions in result (optional)
509
partitionFunc: Partitioning function (optional)
510
511
Returns:
512
New RDD with grouped values per key
513
"""
514
```
515
516
**mapValues**: Transform values, preserve keys
517
```python { .api }
518
def mapValues(self, f):
519
"""
520
Apply function to values while preserving keys.
521
522
Args:
523
f: Function to apply to each value
524
525
Returns:
526
New RDD with transformed values
527
"""
528
```
529
530
**join**: Inner join on keys
531
```python { .api }
532
def join(self, other, numPartitions=None):
533
"""
534
Inner join with another RDD on keys.
535
536
Args:
537
other: Another pair RDD to join with
538
numPartitions: Number of partitions in result (optional)
539
540
Returns:
541
New RDD with joined key-value pairs
542
"""
543
```
544
545
### SparkConf
546
547
Configuration class for Spark applications.
548
549
```python { .api }
550
class SparkConf(object):
551
"""Configuration for a Spark application."""
552
553
def __init__(self, loadDefaults=True, _jvm=None, _jconf=None):
554
"""Create SparkConf object."""
555
556
def set(self, key, value):
557
"""Set configuration property."""
558
559
def setMaster(self, value):
560
"""Set master URL."""
561
562
def setAppName(self, value):
563
"""Set application name."""
564
565
def setSparkHome(self, value):
566
"""Set Spark installation directory."""
567
568
def setExecutorEnv(self, key=None, value=None, pairs=None):
569
"""Set environment variables for executors."""
570
571
def get(self, key, defaultValue=None):
572
"""Get configuration value."""
573
574
def getAll(self):
575
"""Get all configuration as list of (key, value) pairs."""
576
```
577
578
```python
579
conf = SparkConf() \
580
.setAppName("My Application") \
581
.setMaster("local[4]") \
582
.set("spark.executor.memory", "4g") \
583
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
584
```
585
586
### Broadcast Variables
587
588
Read-only variables distributed to all nodes.
589
590
```python { .api }
591
class Broadcast(object):
592
"""A broadcast variable created with SparkContext.broadcast()."""
593
594
@property
595
def value(self):
596
"""Get the broadcasted value."""
597
598
def unpersist(self, blocking=False):
599
"""Delete cached copies of this broadcast on executors."""
600
601
def destroy(self):
602
"""Destroy all data and metadata related to this broadcast."""
603
```
604
605
### Accumulators
606
607
Shared variables for aggregating information.
608
609
```python { .api }
610
class Accumulator(object):
611
"""Shared variable that can only be added to."""
612
613
def add(self, term):
614
"""Add a term to this accumulator."""
615
616
@property
617
def value(self):
618
"""Get the accumulator's value (only valid on driver)."""
619
```
620
621
### StorageLevel
622
623
Constants for RDD persistence levels.
624
625
```python { .api }
626
class StorageLevel(object):
627
"""Storage levels for persisting RDDs."""
628
629
DISK_ONLY = StorageLevel(True, False, False, False, 1)
630
DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
631
MEMORY_ONLY = StorageLevel(False, True, False, False, 1)
632
MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
633
MEMORY_ONLY_SER = StorageLevel(False, True, False, True, 1)
634
MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, True, 2)
635
MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)
636
MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
637
MEMORY_AND_DISK_SER = StorageLevel(True, True, False, True, 1)
638
MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, True, 2)
639
```
640
641
### SparkFiles
642
643
Utility for accessing files distributed with Spark job.
644
645
```python { .api }
646
class SparkFiles(object):
647
"""Access files distributed via SparkContext.addFile()."""
648
649
@classmethod
650
def get(cls, filename):
651
"""
652
Get path to file added via SparkContext.addFile().
653
654
Args:
655
filename: Name of file to locate
656
657
Returns:
658
Absolute path to file on current node
659
"""
660
661
@classmethod
662
def getRootDirectory(cls):
663
"""
664
Get root directory for files added via addFile().
665
666
Returns:
667
Path to root directory containing distributed files
668
"""
669
```
670
671
## Usage Examples
672
673
### Word Count
674
```python
675
text_file = sc.textFile("hdfs://...")
676
counts = text_file \
677
.flatMap(lambda line: line.split(" ")) \
678
.map(lambda word: (word, 1)) \
679
.reduceByKey(lambda a, b: a + b)
680
681
counts.saveAsTextFile("hdfs://output")
682
```
683
684
### Log Analysis
685
```python
686
log_file = sc.textFile("access.log")
687
errors = log_file.filter(lambda line: "ERROR" in line)
688
error_counts = errors \
689
.map(lambda line: (extract_host(line), 1)) \
690
.reduceByKey(lambda a, b: a + b)
691
692
result = error_counts.collect()
693
```
694
695
### Using Broadcast Variables
696
```python
697
lookup_table = {"user1": "admin", "user2": "guest"}
698
broadcast_lookup = sc.broadcast(lookup_table)
699
700
user_logs = sc.textFile("user_activity.log")
701
enriched_logs = user_logs.map(lambda log: {
702
"log": log,
703
"role": broadcast_lookup.value.get(extract_user(log), "unknown")
704
})
705
```
706
707
### Caching for Performance
708
```python
709
large_dataset = sc.textFile("huge_file.txt")
710
filtered_data = large_dataset.filter(lambda line: "important" in line)
711
712
# Cache the filtered data since we'll use it multiple times
713
filtered_data.cache()
714
715
# Multiple operations on cached data
716
count = filtered_data.count()
717
sample = filtered_data.sample(False, 0.1).collect()
718
unique_words = filtered_data.flatMap(lambda line: line.split()).distinct().count()
719
```
720
721
## Error Handling
722
723
Common exceptions and error patterns in PySpark:
724
725
**Py4JJavaError**: Most common error, indicates Java exception
726
```python
727
try:
728
result = rdd.collect()
729
except Py4JJavaError as e:
730
print(f"Java exception occurred: {e}")
731
```
732
733
**SparkContext Errors**: Only one SparkContext per JVM
734
```python
735
try:
736
sc = SparkContext()
737
except ValueError as e:
738
print("SparkContext already exists")
739
```
740
741
**File Not Found**: When reading non-existent files
742
```python
743
try:
744
rdd = sc.textFile("nonexistent_file.txt")
745
rdd.count() # Error occurs on action, not creation
746
except Exception as e:
747
print(f"File access error: {e}")
748
```
749
750
The Python API provides a Pythonic interface to Spark's distributed computing capabilities while maintaining compatibility with the underlying Scala/Java implementation.