0
# Core Spark Context and RDDs
1
2
Low-level distributed computing functionality providing the foundational building blocks for Spark applications. This includes SparkContext for cluster coordination, RDDs for distributed data processing, broadcast variables for efficient data sharing, and accumulators for distributed counters and sums.
3
4
## Capabilities
5
6
### Spark Context
7
8
Main entry point for Spark functionality that coordinates the Spark application and manages cluster resources.
9
10
```python { .api }
11
class SparkContext:
12
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
13
environment=None, batchSize=0, serializer=CPickleSerializer(),
14
conf=None, gateway=None, jsc=None, profiler_cls=BasicProfiler):
15
"""
16
Create a new SparkContext.
17
18
Parameters:
19
- master (str): Cluster URL to connect to (e.g. "local", "local[4]", or "spark://master:7077")
20
- appName (str): Name of the application
21
- sparkHome (str): Spark installation directory on cluster nodes
22
- pyFiles (list): Python files to send to the cluster
23
- environment (dict): Environment variables to set on worker nodes
24
- batchSize (int): Number of Python objects represented as a single Java object
25
- serializer: Serializer for RDDs
26
- conf (SparkConf): SparkConf object with configuration
27
- profiler_cls: Profiler class to use for profiling
28
"""
29
30
def parallelize(self, c, numSlices=None):
31
"""
32
Distribute a local Python collection to form an RDD.
33
34
Parameters:
35
- c: Collection to distribute (list, tuple, etc.)
36
- numSlices (int): Number of partitions to create
37
38
Returns:
39
RDD containing the elements of the collection
40
"""
41
42
def textFile(self, name, minPartitions=None, use_unicode=True):
43
"""
44
Read a text file from HDFS/local filesystem/any Hadoop-supported filesystem.
45
46
Parameters:
47
- name (str): Path to the text file
48
- minPartitions (int): Minimum number of partitions
49
- use_unicode (bool): Whether to convert to unicode
50
51
Returns:
52
RDD of strings
53
"""
54
55
def wholeTextFiles(self, path, minPartitions=None, use_unicode=True):
56
"""
57
Read text files from a directory, returning each file as a (filename, content) pair.
58
59
Parameters:
60
- path (str): Directory path
61
- minPartitions (int): Minimum number of partitions
62
- use_unicode (bool): Whether to convert to unicode
63
64
Returns:
65
RDD of (filename, content) pairs
66
"""
67
68
def broadcast(self, value):
69
"""
70
Broadcast a read-only variable to the cluster.
71
72
Parameters:
73
- value: Value to broadcast
74
75
Returns:
76
Broadcast variable
77
"""
78
79
def accumulator(self, value, accum_param=None):
80
"""
81
Create an accumulator with the given initial value.
82
83
Parameters:
84
- value: Initial value
85
- accum_param: AccumulatorParam object
86
87
Returns:
88
Accumulator
89
"""
90
91
def stop(self):
92
"""Shut down the SparkContext."""
93
94
def setCheckpointDir(self, dirName):
95
"""
96
Set the directory under which RDDs are going to be checkpointed.
97
98
Parameters:
99
- dirName (str): Checkpoint directory path
100
"""
101
102
def setLogLevel(self, logLevel):
103
"""
104
Control the global logging level.
105
106
Parameters:
107
- logLevel (str): Log level ("ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN")
108
"""
109
```
110
111
### Resilient Distributed Datasets (RDDs)
112
113
Fundamental distributed data abstraction that represents an immutable, partitioned collection of elements.
114
115
```python { .api }
116
class RDD:
117
def map(self, f):
118
"""
119
Return a new RDD by applying a function to each element.
120
121
Parameters:
122
- f: Function to apply to each element
123
124
Returns:
125
New RDD with transformed elements
126
"""
127
128
def filter(self, f):
129
"""
130
Return a new RDD containing only elements that satisfy a predicate.
131
132
Parameters:
133
- f: Predicate function
134
135
Returns:
136
Filtered RDD
137
"""
138
139
def flatMap(self, f):
140
"""
141
Return a new RDD by first applying a function and then flattening the results.
142
143
Parameters:
144
- f: Function that returns a sequence
145
146
Returns:
147
Flattened RDD
148
"""
149
150
def mapPartitions(self, f, preservesPartitioning=False):
151
"""
152
Return a new RDD by applying a function to each partition.
153
154
Parameters:
155
- f: Function to apply to each partition iterator
156
- preservesPartitioning (bool): Whether partitioning is preserved
157
158
Returns:
159
New RDD
160
"""
161
162
def reduce(self, f):
163
"""
164
Reduce the elements of the RDD using the specified commutative and associative binary operator.
165
166
Parameters:
167
- f: Binary function for reduction
168
169
Returns:
170
Single reduced value
171
"""
172
173
def fold(self, zeroValue, op):
174
"""
175
Aggregate the elements using a given associative function and a neutral "zero value".
176
177
Parameters:
178
- zeroValue: Neutral zero value
179
- op: Associative function
180
181
Returns:
182
Aggregated result
183
"""
184
185
def aggregate(self, zeroValue, seqOp, combOp):
186
"""
187
Aggregate elements using given combine functions and a neutral "zero value".
188
189
Parameters:
190
- zeroValue: Neutral zero value
191
- seqOp: Function to combine elements within partitions
192
- combOp: Function to combine results from partitions
193
194
Returns:
195
Aggregated result
196
"""
197
198
def collect(self):
199
"""
200
Return all elements of the RDD as a list.
201
202
Returns:
203
List containing all RDD elements
204
"""
205
206
def take(self, num):
207
"""
208
Take the first num elements of the RDD.
209
210
Parameters:
211
- num (int): Number of elements to take
212
213
Returns:
214
List of first num elements
215
"""
216
217
def first(self):
218
"""
219
Return the first element of the RDD.
220
221
Returns:
222
First element
223
"""
224
225
def count(self):
226
"""
227
Return the number of elements in the RDD.
228
229
Returns:
230
Number of elements
231
"""
232
233
def distinct(self, numPartitions=None):
234
"""
235
Return a new RDD containing distinct elements.
236
237
Parameters:
238
- numPartitions (int): Number of partitions in result
239
240
Returns:
241
RDD with distinct elements
242
"""
243
244
def union(self, other):
245
"""
246
Return the union of this RDD and another one.
247
248
Parameters:
249
- other (RDD): Another RDD
250
251
Returns:
252
Union RDD
253
"""
254
255
def intersection(self, other):
256
"""
257
Return the intersection of this RDD and another one.
258
259
Parameters:
260
- other (RDD): Another RDD
261
262
Returns:
263
Intersection RDD
264
"""
265
266
def groupBy(self, f, numPartitions=None):
267
"""
268
Group RDD elements by a key function.
269
270
Parameters:
271
- f: Key function
272
- numPartitions (int): Number of partitions
273
274
Returns:
275
RDD of grouped elements
276
"""
277
278
def sortBy(self, keyfunc, ascending=True, numPartitions=None):
279
"""
280
Sort the RDD by a key function.
281
282
Parameters:
283
- keyfunc: Function to compute key for sorting
284
- ascending (bool): Sort in ascending order
285
- numPartitions (int): Number of partitions
286
287
Returns:
288
Sorted RDD
289
"""
290
291
def cache(self):
292
"""
293
Persist this RDD with the default storage level (MEMORY_ONLY).
294
295
Returns:
296
This RDD
297
"""
298
299
def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
300
"""
301
Persist this RDD with the specified storage level.
302
303
Parameters:
304
- storageLevel (StorageLevel): Storage level
305
306
Returns:
307
This RDD
308
"""
309
310
def checkpoint(self):
311
"""Mark this RDD for checkpointing."""
312
313
def getNumPartitions(self):
314
"""
315
Return the number of partitions of this RDD.
316
317
Returns:
318
Number of partitions
319
"""
320
321
def coalesce(self, numPartitions, shuffle=False):
322
"""
323
Return a new RDD with reduced number of partitions.
324
325
Parameters:
326
- numPartitions (int): Target number of partitions
327
- shuffle (bool): Whether to shuffle data
328
329
Returns:
330
Coalesced RDD
331
"""
332
333
def repartition(self, numPartitions):
334
"""
335
Return a new RDD with exactly numPartitions partitions.
336
337
Parameters:
338
- numPartitions (int): Number of partitions
339
340
Returns:
341
Repartitioned RDD
342
"""
343
```
344
345
### Paired RDD Operations
346
347
Operations available on RDDs of key-value pairs.
348
349
```python { .api }
350
class RDD:
351
def groupByKey(self, numPartitions=None):
352
"""
353
Group values for each key in the RDD into a single sequence.
354
355
Parameters:
356
- numPartitions (int): Number of partitions
357
358
Returns:
359
RDD of (key, iterable of values) pairs
360
"""
361
362
def reduceByKey(self, func, numPartitions=None):
363
"""
364
Merge values for each key using an associative reduce function.
365
366
Parameters:
367
- func: Associative reduce function
368
- numPartitions (int): Number of partitions
369
370
Returns:
371
RDD of (key, reduced value) pairs
372
"""
373
374
def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None):
375
"""
376
Aggregate values for each key using given combine functions.
377
378
Parameters:
379
- zeroValue: Initial value for each key
380
- seqFunc: Function to combine values within partitions
381
- combFunc: Function to combine results across partitions
382
- numPartitions (int): Number of partitions
383
384
Returns:
385
RDD of (key, aggregated value) pairs
386
"""
387
388
def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
389
"""
390
Sort RDD by keys.
391
392
Parameters:
393
- ascending (bool): Sort in ascending order
394
- numPartitions (int): Number of partitions
395
- keyfunc: Function to compute sort key
396
397
Returns:
398
Sorted RDD
399
"""
400
401
def join(self, other, numPartitions=None):
402
"""
403
Return an RDD containing all pairs of elements with matching keys.
404
405
Parameters:
406
- other (RDD): Another RDD to join with
407
- numPartitions (int): Number of partitions
408
409
Returns:
410
RDD of (key, (value1, value2)) pairs
411
"""
412
413
def leftOuterJoin(self, other, numPartitions=None):
414
"""
415
Perform a left outer join of this RDD and another one.
416
417
Parameters:
418
- other (RDD): Another RDD to join with
419
- numPartitions (int): Number of partitions
420
421
Returns:
422
RDD of (key, (value1, Optional[value2])) pairs
423
"""
424
425
def rightOuterJoin(self, other, numPartitions=None):
426
"""
427
Perform a right outer join of this RDD and another one.
428
429
Parameters:
430
- other (RDD): Another RDD to join with
431
- numPartitions (int): Number of partitions
432
433
Returns:
434
RDD of (key, (Optional[value1], value2)) pairs
435
"""
436
437
def fullOuterJoin(self, other, numPartitions=None):
438
"""
439
Perform a full outer join of this RDD and another one.
440
441
Parameters:
442
- other (RDD): Another RDD to join with
443
- numPartitions (int): Number of partitions
444
445
Returns:
446
RDD of (key, (Optional[value1], Optional[value2])) pairs
447
"""
448
```
449
450
### Broadcast Variables
451
452
Read-only variables cached on each machine for efficient data sharing.
453
454
```python { .api }
455
class Broadcast:
456
def value(self):
457
"""
458
Return the broadcasted value.
459
460
Returns:
461
The broadcasted value
462
"""
463
464
def destroy(self):
465
"""Destroy all data and metadata related to this broadcast variable."""
466
467
def unpersist(self, blocking=False):
468
"""
469
Delete cached copies of this broadcast on the executors.
470
471
Parameters:
472
- blocking (bool): Whether to block until unpersisting is complete
473
"""
474
```
475
476
### Accumulators
477
478
Shared variables that can be accumulated across tasks.
479
480
```python { .api }
481
class Accumulator:
482
def add(self, term):
483
"""
484
Add a term to this accumulator.
485
486
Parameters:
487
- term: Value to add
488
"""
489
490
def value(self):
491
"""
492
Get the accumulator's value.
493
494
Returns:
495
Current accumulator value
496
"""
497
498
class AccumulatorParam:
499
def zero(self, value):
500
"""
501
Provide a "zero value" for the accumulator type.
502
503
Parameters:
504
- value: Sample value
505
506
Returns:
507
Zero value
508
"""
509
510
def addInPlace(self, value1, value2):
511
"""
512
Add two values of the accumulator's data type.
513
514
Parameters:
515
- value1: First value
516
- value2: Second value
517
518
Returns:
519
Sum of the values
520
"""
521
```
522
523
### Spark Configuration
524
525
Configuration settings for Spark applications.
526
527
```python { .api }
528
class SparkConf:
529
def __init__(self, loadDefaults=True, _jvm=None, _jconf=None):
530
"""
531
Create a new Spark configuration.
532
533
Parameters:
534
- loadDefaults (bool): Whether to load default values
535
"""
536
537
def setAppName(self, value):
538
"""
539
Set application name.
540
541
Parameters:
542
- value (str): Application name
543
544
Returns:
545
This SparkConf object
546
"""
547
548
def setMaster(self, value):
549
"""
550
Set master URL.
551
552
Parameters:
553
- value (str): Master URL
554
555
Returns:
556
This SparkConf object
557
"""
558
559
def set(self, key, value):
560
"""
561
Set a configuration property.
562
563
Parameters:
564
- key (str): Configuration key
565
- value (str): Configuration value
566
567
Returns:
568
This SparkConf object
569
"""
570
571
def get(self, key, defaultValue=None):
572
"""
573
Get a configuration value.
574
575
Parameters:
576
- key (str): Configuration key
577
- defaultValue (str): Default value if key not found
578
579
Returns:
580
Configuration value
581
"""
582
583
def setSparkHome(self, value):
584
"""
585
Set Spark installation path.
586
587
Parameters:
588
- value (str): Spark home directory
589
590
Returns:
591
This SparkConf object
592
"""
593
594
def setExecutorEnv(self, key=None, value=None, pairs=None):
595
"""
596
Set environment variables for executor processes.
597
598
Parameters:
599
- key (str): Environment variable name
600
- value (str): Environment variable value
601
- pairs (list): List of (key, value) pairs
602
603
Returns:
604
This SparkConf object
605
"""
606
```
607
608
## Types
609
610
```python { .api }
611
class StorageLevel:
612
"""Storage levels for RDD persistence."""
613
DISK_ONLY: StorageLevel
614
DISK_ONLY_2: StorageLevel
615
MEMORY_ONLY: StorageLevel
616
MEMORY_ONLY_2: StorageLevel
617
MEMORY_ONLY_SER: StorageLevel
618
MEMORY_ONLY_SER_2: StorageLevel
619
MEMORY_AND_DISK: StorageLevel
620
MEMORY_AND_DISK_2: StorageLevel
621
MEMORY_AND_DISK_SER: StorageLevel
622
MEMORY_AND_DISK_SER_2: StorageLevel
623
OFF_HEAP: StorageLevel
624
625
class TaskContext:
626
"""Information about the task currently being executed."""
627
628
def attemptNumber(self):
629
"""How many times this task has been attempted."""
630
631
def partitionId(self):
632
"""The ID of the RDD partition that is computed by this task."""
633
634
def stageId(self):
635
"""The ID of the stage that this task belong to."""
636
637
def taskAttemptId(self):
638
"""An ID that is unique to this task attempt."""
639
640
class StatusTracker:
641
"""Low-level status reporting APIs for monitoring job and stage progress."""
642
643
def getJobIdsForGroup(self, jobGroup):
644
"""Return a list of all known jobs in a particular job group."""
645
646
def getActiveStageIds(self):
647
"""Returns an array containing the ids of all active stages."""
648
649
def getExecutorInfos(self):
650
"""Returns information about all known executors."""
651
```