Spec RegistrySpec Registry

Help your agents use open-source better. Learn more.

Find usage specs for your project’s dependencies

>

maven-apache-spark

Description
Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
Author
tessl
Last updated

How to use

npx @tessl/cli registry install tessl/maven-apache-spark@1.0.0

spark-context.md docs/

1
# SparkContext API
2
3
The SparkContext is the main entry point for all Spark functionality. It represents the connection to a Spark cluster and coordinates the execution of operations across the cluster. Every Spark application must create exactly one active SparkContext.
4
5
## SparkContext Class
6
7
```scala { .api }
8
class SparkContext(config: SparkConf) extends Logging {
9
// Primary constructor
10
def this() = this(new SparkConf())
11
def this(master: String, appName: String, conf: SparkConf) = { /* ... */ }
12
def this(master: String, appName: String, sparkHome: String, jars: Seq[String], environment: Map[String, String] = Map()) = { /* ... */ }
13
}
14
```
15
16
## Creating SparkContext
17
18
### Basic Construction
19
20
```scala { .api }
21
import org.apache.spark.{SparkContext, SparkConf}
22
23
// Using SparkConf (recommended)
24
val conf = new SparkConf()
25
.setAppName("My Application")
26
.setMaster("local[*]")
27
.set("spark.executor.memory", "2g")
28
29
val sc = new SparkContext(conf)
30
```
31
32
### Alternative Constructors
33
34
```scala { .api }
35
// Default constructor (loads from system properties)
36
val sc = new SparkContext()
37
38
// With master and app name
39
val sc = new SparkContext("local[*]", "My App")
40
41
// Full constructor with all parameters
42
val sc = new SparkContext(
43
master = "local[*]",
44
appName = "My App",
45
sparkHome = "/path/to/spark",
46
jars = Seq("myapp.jar"),
47
environment = Map("SPARK_ENV_VAR" -> "value")
48
)
49
```
50
51
### Developer API Constructor (for YARN)
52
53
```scala { .api }
54
// @DeveloperApi - for internal use, typically in YARN mode
55
val sc = new SparkContext(
56
config = conf,
57
preferredNodeLocationData = Map[String, Set[SplitInfo]]()
58
)
59
```
60
61
## RDD Creation Methods
62
63
### From Collections
64
65
**parallelize**: Distribute a local collection to form an RDD
66
```scala { .api }
67
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
68
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] // alias
69
```
70
71
```scala
72
val data = Array(1, 2, 3, 4, 5)
73
val rdd = sc.parallelize(data) // Use default parallelism
74
val rddWithPartitions = sc.parallelize(data, 4) // Specify 4 partitions
75
```
76
77
**With location preferences**:
78
```scala { .api }
79
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]
80
```
81
82
```scala
83
// Create RDD with preferred locations for each element
84
val dataWithPrefs = Seq(
85
(1, Seq("host1", "host2")),
86
(2, Seq("host3")),
87
(3, Seq("host1"))
88
)
89
val rdd = sc.makeRDD(dataWithPrefs)
90
```
91
92
### From Files
93
94
**textFile**: Read text files from HDFS or local filesystem
95
```scala { .api }
96
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
97
```
98
99
```scala
100
val lines = sc.textFile("hdfs://namenode:port/path/to/file.txt")
101
val linesLocal = sc.textFile("file:///local/path/file.txt")
102
val linesWithPartitions = sc.textFile("hdfs://path/to/file.txt", 8)
103
```
104
105
**wholeTextFiles**: Read directory of text files as key-value pairs
106
```scala { .api }
107
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
108
```
109
110
```scala
111
// Returns RDD[(filename, content)]
112
val files = sc.wholeTextFiles("hdfs://path/to/directory/")
113
files.foreach { case (filename, content) =>
114
println(s"File: $filename, Size: ${content.length}")
115
}
116
```
117
118
### Hadoop Files
119
120
**sequenceFile**: Read Hadoop SequenceFiles
121
```scala { .api }
122
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]
123
```
124
125
```scala
126
import org.apache.hadoop.io.{IntWritable, Text}
127
128
val seqFile = sc.sequenceFile[IntWritable, Text]("path/to/sequencefile",
129
classOf[IntWritable], classOf[Text])
130
```
131
132
**hadoopFile**: Read files with arbitrary Hadoop InputFormat
133
```scala { .api }
134
def hadoopFile[K, V](
135
path: String,
136
inputFormatClass: Class[_ <: InputFormat[K, V]],
137
keyClass: Class[K],
138
valueClass: Class[V],
139
minPartitions: Int = defaultMinPartitions
140
): RDD[(K, V)]
141
```
142
143
```scala
144
import org.apache.hadoop.mapred.TextInputFormat
145
import org.apache.hadoop.io.{LongWritable, Text}
146
147
val hadoopRDD = sc.hadoopFile[LongWritable, Text](
148
"hdfs://path/to/input",
149
classOf[TextInputFormat],
150
classOf[LongWritable],
151
classOf[Text]
152
)
153
```
154
155
**objectFile**: Load RDD saved as SequenceFile of serialized objects
156
```scala { .api }
157
def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]
158
```
159
160
```scala
161
// Load RDD that was previously saved with saveAsObjectFile
162
val restored: RDD[MyClass] = sc.objectFile[MyClass]("path/to/objects")
163
```
164
165
### RDD Manipulation
166
167
**union**: Build union of a list of RDDs
168
```scala { .api }
169
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T]
170
```
171
172
```scala
173
val rdd1 = sc.parallelize(Array(1, 2, 3))
174
val rdd2 = sc.parallelize(Array(4, 5, 6))
175
val rdd3 = sc.parallelize(Array(7, 8, 9))
176
177
val unionRDD = sc.union(Seq(rdd1, rdd2, rdd3))
178
```
179
180
**emptyRDD**: Create empty RDD with no partitions
181
```scala { .api }
182
def emptyRDD[T: ClassTag]: RDD[T]
183
```
184
185
```scala
186
val empty: RDD[String] = sc.emptyRDD[String]
187
```
188
189
## Shared Variables
190
191
Spark provides two types of shared variables: broadcast variables and accumulators.
192
193
### Broadcast Variables
194
195
**broadcast**: Create a broadcast variable for read-only data
196
```scala { .api }
197
def broadcast[T: ClassTag](value: T): Broadcast[T]
198
```
199
200
```scala
201
val lookupTable = Map("apple" -> 1, "banana" -> 2, "orange" -> 3)
202
val broadcastTable = sc.broadcast(lookupTable)
203
204
val data = sc.parallelize(Array("apple", "banana", "apple"))
205
val mapped = data.map(fruit => broadcastTable.value.getOrElse(fruit, 0))
206
```
207
208
### Accumulators
209
210
**accumulator**: Create a simple accumulator
211
```scala { .api }
212
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]
213
```
214
215
**accumulable**: Create an accumulable with different result/element types
216
```scala { .api }
217
def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]): Accumulable[T, R]
218
```
219
220
```scala
221
// Simple counter
222
val counter = sc.accumulator(0, "Error Counter")
223
224
val data = sc.parallelize(Array(1, 2, -1, 4, -5))
225
val positive = data.filter { x =>
226
if (x < 0) counter += 1 // Count negative numbers
227
x > 0
228
}
229
positive.count() // Trigger action
230
println(s"Negative numbers: ${counter.value}")
231
232
// Collection accumulator
233
val errorList = sc.accumulableCollection(mutable.Set[String]())
234
```
235
236
#### Built-in AccumulatorParam Types
237
238
```scala { .api }
239
// Available in SparkContext companion object
240
DoubleAccumulatorParam // For Double values
241
IntAccumulatorParam // For Int values
242
LongAccumulatorParam // For Long values
243
FloatAccumulatorParam // For Float values
244
```
245
246
## Job Control and Execution
247
248
### Running Jobs
249
250
**runJob**: Run a function on RDD partitions
251
```scala { .api }
252
def runJob[T, U: ClassTag](
253
rdd: RDD[T],
254
func: (TaskContext, Iterator[T]) => U,
255
partitions: Seq[Int],
256
allowLocal: Boolean = false,
257
resultHandler: (Int, U) => Unit = null
258
): Array[U]
259
260
// Simplified versions
261
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U]
262
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U]
263
```
264
265
```scala
266
val data = sc.parallelize(1 to 100, 4)
267
268
// Run custom function on each partition
269
val results = sc.runJob(data, (iter: Iterator[Int]) => iter.sum)
270
println(s"Partition sums: ${results.mkString(", ")}")
271
272
// With task context
273
val results2 = sc.runJob(data, (context: TaskContext, iter: Iterator[Int]) => {
274
(context.partitionId, iter.size)
275
})
276
```
277
278
**submitJob**: Submit job asynchronously (Experimental)
279
```scala { .api }
280
def submitJob[T, U, R](
281
rdd: RDD[T],
282
processPartition: Iterator[T] => U,
283
partitions: Seq[Int],
284
resultHandler: (Int, U) => Unit,
285
resultFunc: => R
286
): SimpleFutureAction[R]
287
```
288
289
### Job Groups and Cancellation
290
291
**setJobGroup**: Assign group ID to all jobs started by this thread
292
```scala { .api }
293
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit
294
```
295
296
**clearJobGroup**: Clear the job group for this thread
297
```scala { .api }
298
def clearJobGroup(): Unit
299
```
300
301
**cancelJobGroup**: Cancel all jobs for the given group
302
```scala { .api }
303
def cancelJobGroup(groupId: String): Unit
304
```
305
306
**cancelAllJobs**: Cancel all scheduled or running jobs
307
```scala { .api }
308
def cancelAllJobs(): Unit
309
```
310
311
```scala
312
// Set job group
313
sc.setJobGroup("etl-jobs", "ETL Processing", interruptOnCancel = true)
314
315
val data = sc.textFile("large-file.txt")
316
val processed = data.map(processLine)
317
processed.saveAsTextFile("output")
318
319
// Cancel specific job group from another thread
320
sc.cancelJobGroup("etl-jobs")
321
```
322
323
## Configuration and Properties
324
325
### Configuration Access
326
327
**getConf**: Get a copy of the SparkContext's configuration
328
```scala { .api }
329
def getConf: SparkConf
330
```
331
332
```scala
333
val conf = sc.getConf
334
val appName = conf.get("spark.app.name")
335
val executorMemory = conf.get("spark.executor.memory", "1g")
336
```
337
338
### Local Properties
339
340
**setLocalProperty**: Set local property that affects jobs submitted from this thread
341
```scala { .api }
342
def setLocalProperty(key: String, value: String): Unit
343
```
344
345
**getLocalProperty**: Get local property set in this thread
346
```scala { .api }
347
def getLocalProperty(key: String): String
348
```
349
350
```scala
351
// Set properties that will be passed to tasks
352
sc.setLocalProperty("spark.sql.execution.id", "query-123")
353
sc.setLocalProperty("callSite.short", "MyApp.process")
354
355
val value = sc.getLocalProperty("spark.sql.execution.id")
356
```
357
358
### File and JAR Management
359
360
**addFile**: Add a file to be downloaded with this Spark job on every node
361
```scala { .api }
362
def addFile(path: String): Unit
363
def addFile(path: String, recursive: Boolean): Unit
364
```
365
366
**addJar**: Add a JAR dependency for all tasks to be executed on this SparkContext
367
```scala { .api }
368
def addJar(path: String): Unit
369
```
370
371
```scala
372
// Add files that tasks can access via SparkFiles.get()
373
sc.addFile("/path/to/config.properties")
374
sc.addFile("hdfs://path/to/lookup-table.csv")
375
376
// Add JARs for task execution
377
sc.addJar("/path/to/dependencies.jar")
378
sc.addJar("hdfs://path/to/libs/mylib.jar")
379
```
380
381
### Checkpointing
382
383
**setCheckpointDir**: Set directory for RDD checkpointing
384
```scala { .api }
385
def setCheckpointDir(directory: String): Unit
386
```
387
388
```scala
389
sc.setCheckpointDir("hdfs://namenode/checkpoints")
390
391
val data = sc.textFile("large-dataset.txt")
392
val processed = data.map(complexProcessing).filter(isValid)
393
processed.checkpoint() // Checkpoint this RDD
394
```
395
396
## Monitoring and Information
397
398
### Memory and Storage Status
399
400
**getExecutorMemoryStatus**: Get memory status of all executors
401
```scala { .api }
402
def getExecutorMemoryStatus: Map[String, (Long, Long)]
403
```
404
405
**getExecutorStorageStatus**: Get storage status from all executors
406
```scala { .api }
407
def getExecutorStorageStatus: Array[StorageStatus]
408
```
409
410
**getRDDStorageInfo**: Get information about cached/persisted RDDs
411
```scala { .api }
412
def getRDDStorageInfo: Array[RDDInfo]
413
```
414
415
**getPersistentRDDs**: Get all currently persisted RDDs
416
```scala { .api }
417
def getPersistentRDDs: Map[Int, RDD[_]]
418
```
419
420
```scala
421
// Check memory usage across executors
422
val memoryStatus = sc.getExecutorMemoryStatus
423
memoryStatus.foreach { case (executorId, (maxMemory, remainingMemory)) =>
424
println(s"Executor $executorId: ${remainingMemory}/${maxMemory} bytes available")
425
}
426
427
// Check cached RDDs
428
val cachedRDDs = sc.getRDDStorageInfo
429
cachedRDDs.foreach { rddInfo =>
430
println(s"RDD ${rddInfo.id} (${rddInfo.name}): ${rddInfo.memSize} bytes in memory")
431
}
432
```
433
434
### System Properties
435
436
**version**: Get the version of Spark on which this application is running
437
```scala { .api }
438
def version: String
439
```
440
441
**defaultParallelism**: Default level of parallelism for operations
442
```scala { .api }
443
def defaultParallelism: Int
444
```
445
446
**defaultMinPartitions**: Default min number of partitions for Hadoop RDDs
447
```scala { .api }
448
def defaultMinPartitions: Int
449
```
450
451
```scala
452
println(s"Spark Version: ${sc.version}")
453
println(s"Default Parallelism: ${sc.defaultParallelism}")
454
println(s"Default Min Partitions: ${sc.defaultMinPartitions}")
455
```
456
457
### Hadoop Configuration
458
459
**hadoopConfiguration**: Access to Hadoop Configuration for reuse across operations
460
```scala { .api }
461
def hadoopConfiguration: Configuration
462
```
463
464
```scala
465
import org.apache.hadoop.conf.Configuration
466
467
val hadoopConf = sc.hadoopConfiguration
468
hadoopConf.set("fs.s3a.access.key", "your-access-key")
469
hadoopConf.set("fs.s3a.secret.key", "your-secret-key")
470
471
// Now S3 operations will use these credentials
472
val s3Data = sc.textFile("s3a://bucket/path/to/file")
473
```
474
475
## SparkContext Lifecycle
476
477
### Stopping SparkContext
478
479
**stop**: Shut down the SparkContext
480
```scala { .api }
481
def stop(): Unit
482
```
483
484
```scala
485
try {
486
val sc = new SparkContext(conf)
487
488
// Perform Spark operations
489
val data = sc.textFile("input.txt")
490
val result = data.map(_.toUpperCase).collect()
491
492
} finally {
493
sc.stop() // Always stop the context
494
}
495
```
496
497
### Best Practices
498
499
1. **Single SparkContext**: Only one SparkContext should be active per JVM
500
2. **Proper Shutdown**: Always call `stop()` to release resources
501
3. **Configuration**: Use SparkConf for all configuration rather than constructor parameters
502
4. **Shared Variables**: Use broadcast variables for large read-only data
503
5. **Accumulators**: Only use accumulators for debugging and monitoring
504
505
```scala
506
// Proper pattern for SparkContext usage
507
val conf = new SparkConf()
508
.setAppName("My Spark Application")
509
.setMaster("local[*]")
510
511
val sc = new SparkContext(conf)
512
513
try {
514
// All Spark operations here
515
516
} finally {
517
sc.stop()
518
}
519
```
520
521
The SparkContext is the foundation of all Spark applications and understanding its API is essential for effective Spark programming.