or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

caching-persistence.mdcore-rdd.mddata-sources.mdgraphx.mdindex.mdjava-api.mdkey-value-operations.mdmllib.mdpython-api.mdspark-context.mdsql.mdstreaming.md

spark-context.mddocs/

0

# SparkContext API

1

2

The SparkContext is the main entry point for all Spark functionality. It represents the connection to a Spark cluster and coordinates the execution of operations across the cluster. Every Spark application must create exactly one active SparkContext.

3

4

## SparkContext Class

5

6

```scala { .api }

7

class SparkContext(config: SparkConf) extends Logging {

8

// Primary constructor

9

def this() = this(new SparkConf())

10

def this(master: String, appName: String, conf: SparkConf) = { /* ... */ }

11

def this(master: String, appName: String, sparkHome: String, jars: Seq[String], environment: Map[String, String] = Map()) = { /* ... */ }

12

}

13

```

14

15

## Creating SparkContext

16

17

### Basic Construction

18

19

```scala { .api }

20

import org.apache.spark.{SparkContext, SparkConf}

21

22

// Using SparkConf (recommended)

23

val conf = new SparkConf()

24

.setAppName("My Application")

25

.setMaster("local[*]")

26

.set("spark.executor.memory", "2g")

27

28

val sc = new SparkContext(conf)

29

```

30

31

### Alternative Constructors

32

33

```scala { .api }

34

// Default constructor (loads from system properties)

35

val sc = new SparkContext()

36

37

// With master and app name

38

val sc = new SparkContext("local[*]", "My App")

39

40

// Full constructor with all parameters

41

val sc = new SparkContext(

42

master = "local[*]",

43

appName = "My App",

44

sparkHome = "/path/to/spark",

45

jars = Seq("myapp.jar"),

46

environment = Map("SPARK_ENV_VAR" -> "value")

47

)

48

```

49

50

### Developer API Constructor (for YARN)

51

52

```scala { .api }

53

// @DeveloperApi - for internal use, typically in YARN mode

54

val sc = new SparkContext(

55

config = conf,

56

preferredNodeLocationData = Map[String, Set[SplitInfo]]()

57

)

58

```

59

60

## RDD Creation Methods

61

62

### From Collections

63

64

**parallelize**: Distribute a local collection to form an RDD

65

```scala { .api }

66

def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]

67

def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] // alias

68

```

69

70

```scala

71

val data = Array(1, 2, 3, 4, 5)

72

val rdd = sc.parallelize(data) // Use default parallelism

73

val rddWithPartitions = sc.parallelize(data, 4) // Specify 4 partitions

74

```

75

76

**With location preferences**:

77

```scala { .api }

78

def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]

79

```

80

81

```scala

82

// Create RDD with preferred locations for each element

83

val dataWithPrefs = Seq(

84

(1, Seq("host1", "host2")),

85

(2, Seq("host3")),

86

(3, Seq("host1"))

87

)

88

val rdd = sc.makeRDD(dataWithPrefs)

89

```

90

91

### From Files

92

93

**textFile**: Read text files from HDFS or local filesystem

94

```scala { .api }

95

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

96

```

97

98

```scala

99

val lines = sc.textFile("hdfs://namenode:port/path/to/file.txt")

100

val linesLocal = sc.textFile("file:///local/path/file.txt")

101

val linesWithPartitions = sc.textFile("hdfs://path/to/file.txt", 8)

102

```

103

104

**wholeTextFiles**: Read directory of text files as key-value pairs

105

```scala { .api }

106

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

107

```

108

109

```scala

110

// Returns RDD[(filename, content)]

111

val files = sc.wholeTextFiles("hdfs://path/to/directory/")

112

files.foreach { case (filename, content) =>

113

println(s"File: $filename, Size: ${content.length}")

114

}

115

```

116

117

### Hadoop Files

118

119

**sequenceFile**: Read Hadoop SequenceFiles

120

```scala { .api }

121

def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

122

```

123

124

```scala

125

import org.apache.hadoop.io.{IntWritable, Text}

126

127

val seqFile = sc.sequenceFile[IntWritable, Text]("path/to/sequencefile",

128

classOf[IntWritable], classOf[Text])

129

```

130

131

**hadoopFile**: Read files with arbitrary Hadoop InputFormat

132

```scala { .api }

133

def hadoopFile[K, V](

134

path: String,

135

inputFormatClass: Class[_ <: InputFormat[K, V]],

136

keyClass: Class[K],

137

valueClass: Class[V],

138

minPartitions: Int = defaultMinPartitions

139

): RDD[(K, V)]

140

```

141

142

```scala

143

import org.apache.hadoop.mapred.TextInputFormat

144

import org.apache.hadoop.io.{LongWritable, Text}

145

146

val hadoopRDD = sc.hadoopFile[LongWritable, Text](

147

"hdfs://path/to/input",

148

classOf[TextInputFormat],

149

classOf[LongWritable],

150

classOf[Text]

151

)

152

```

153

154

**objectFile**: Load RDD saved as SequenceFile of serialized objects

155

```scala { .api }

156

def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]

157

```

158

159

```scala

160

// Load RDD that was previously saved with saveAsObjectFile

161

val restored: RDD[MyClass] = sc.objectFile[MyClass]("path/to/objects")

162

```

163

164

### RDD Manipulation

165

166

**union**: Build union of a list of RDDs

167

```scala { .api }

168

def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T]

169

```

170

171

```scala

172

val rdd1 = sc.parallelize(Array(1, 2, 3))

173

val rdd2 = sc.parallelize(Array(4, 5, 6))

174

val rdd3 = sc.parallelize(Array(7, 8, 9))

175

176

val unionRDD = sc.union(Seq(rdd1, rdd2, rdd3))

177

```

178

179

**emptyRDD**: Create empty RDD with no partitions

180

```scala { .api }

181

def emptyRDD[T: ClassTag]: RDD[T]

182

```

183

184

```scala

185

val empty: RDD[String] = sc.emptyRDD[String]

186

```

187

188

## Shared Variables

189

190

Spark provides two types of shared variables: broadcast variables and accumulators.

191

192

### Broadcast Variables

193

194

**broadcast**: Create a broadcast variable for read-only data

195

```scala { .api }

196

def broadcast[T: ClassTag](value: T): Broadcast[T]

197

```

198

199

```scala

200

val lookupTable = Map("apple" -> 1, "banana" -> 2, "orange" -> 3)

201

val broadcastTable = sc.broadcast(lookupTable)

202

203

val data = sc.parallelize(Array("apple", "banana", "apple"))

204

val mapped = data.map(fruit => broadcastTable.value.getOrElse(fruit, 0))

205

```

206

207

### Accumulators

208

209

**accumulator**: Create a simple accumulator

210

```scala { .api }

211

def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]

212

```

213

214

**accumulable**: Create an accumulable with different result/element types

215

```scala { .api }

216

def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]): Accumulable[T, R]

217

```

218

219

```scala

220

// Simple counter

221

val counter = sc.accumulator(0, "Error Counter")

222

223

val data = sc.parallelize(Array(1, 2, -1, 4, -5))

224

val positive = data.filter { x =>

225

if (x < 0) counter += 1 // Count negative numbers

226

x > 0

227

}

228

positive.count() // Trigger action

229

println(s"Negative numbers: ${counter.value}")

230

231

// Collection accumulator

232

val errorList = sc.accumulableCollection(mutable.Set[String]())

233

```

234

235

#### Built-in AccumulatorParam Types

236

237

```scala { .api }

238

// Available in SparkContext companion object

239

DoubleAccumulatorParam // For Double values

240

IntAccumulatorParam // For Int values

241

LongAccumulatorParam // For Long values

242

FloatAccumulatorParam // For Float values

243

```

244

245

## Job Control and Execution

246

247

### Running Jobs

248

249

**runJob**: Run a function on RDD partitions

250

```scala { .api }

251

def runJob[T, U: ClassTag](

252

rdd: RDD[T],

253

func: (TaskContext, Iterator[T]) => U,

254

partitions: Seq[Int],

255

allowLocal: Boolean = false,

256

resultHandler: (Int, U) => Unit = null

257

): Array[U]

258

259

// Simplified versions

260

def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U]

261

def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U]

262

```

263

264

```scala

265

val data = sc.parallelize(1 to 100, 4)

266

267

// Run custom function on each partition

268

val results = sc.runJob(data, (iter: Iterator[Int]) => iter.sum)

269

println(s"Partition sums: ${results.mkString(", ")}")

270

271

// With task context

272

val results2 = sc.runJob(data, (context: TaskContext, iter: Iterator[Int]) => {

273

(context.partitionId, iter.size)

274

})

275

```

276

277

**submitJob**: Submit job asynchronously (Experimental)

278

```scala { .api }

279

def submitJob[T, U, R](

280

rdd: RDD[T],

281

processPartition: Iterator[T] => U,

282

partitions: Seq[Int],

283

resultHandler: (Int, U) => Unit,

284

resultFunc: => R

285

): SimpleFutureAction[R]

286

```

287

288

### Job Groups and Cancellation

289

290

**setJobGroup**: Assign group ID to all jobs started by this thread

291

```scala { .api }

292

def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit

293

```

294

295

**clearJobGroup**: Clear the job group for this thread

296

```scala { .api }

297

def clearJobGroup(): Unit

298

```

299

300

**cancelJobGroup**: Cancel all jobs for the given group

301

```scala { .api }

302

def cancelJobGroup(groupId: String): Unit

303

```

304

305

**cancelAllJobs**: Cancel all scheduled or running jobs

306

```scala { .api }

307

def cancelAllJobs(): Unit

308

```

309

310

```scala

311

// Set job group

312

sc.setJobGroup("etl-jobs", "ETL Processing", interruptOnCancel = true)

313

314

val data = sc.textFile("large-file.txt")

315

val processed = data.map(processLine)

316

processed.saveAsTextFile("output")

317

318

// Cancel specific job group from another thread

319

sc.cancelJobGroup("etl-jobs")

320

```

321

322

## Configuration and Properties

323

324

### Configuration Access

325

326

**getConf**: Get a copy of the SparkContext's configuration

327

```scala { .api }

328

def getConf: SparkConf

329

```

330

331

```scala

332

val conf = sc.getConf

333

val appName = conf.get("spark.app.name")

334

val executorMemory = conf.get("spark.executor.memory", "1g")

335

```

336

337

### Local Properties

338

339

**setLocalProperty**: Set local property that affects jobs submitted from this thread

340

```scala { .api }

341

def setLocalProperty(key: String, value: String): Unit

342

```

343

344

**getLocalProperty**: Get local property set in this thread

345

```scala { .api }

346

def getLocalProperty(key: String): String

347

```

348

349

```scala

350

// Set properties that will be passed to tasks

351

sc.setLocalProperty("spark.sql.execution.id", "query-123")

352

sc.setLocalProperty("callSite.short", "MyApp.process")

353

354

val value = sc.getLocalProperty("spark.sql.execution.id")

355

```

356

357

### File and JAR Management

358

359

**addFile**: Add a file to be downloaded with this Spark job on every node

360

```scala { .api }

361

def addFile(path: String): Unit

362

def addFile(path: String, recursive: Boolean): Unit

363

```

364

365

**addJar**: Add a JAR dependency for all tasks to be executed on this SparkContext

366

```scala { .api }

367

def addJar(path: String): Unit

368

```

369

370

```scala

371

// Add files that tasks can access via SparkFiles.get()

372

sc.addFile("/path/to/config.properties")

373

sc.addFile("hdfs://path/to/lookup-table.csv")

374

375

// Add JARs for task execution

376

sc.addJar("/path/to/dependencies.jar")

377

sc.addJar("hdfs://path/to/libs/mylib.jar")

378

```

379

380

### Checkpointing

381

382

**setCheckpointDir**: Set directory for RDD checkpointing

383

```scala { .api }

384

def setCheckpointDir(directory: String): Unit

385

```

386

387

```scala

388

sc.setCheckpointDir("hdfs://namenode/checkpoints")

389

390

val data = sc.textFile("large-dataset.txt")

391

val processed = data.map(complexProcessing).filter(isValid)

392

processed.checkpoint() // Checkpoint this RDD

393

```

394

395

## Monitoring and Information

396

397

### Memory and Storage Status

398

399

**getExecutorMemoryStatus**: Get memory status of all executors

400

```scala { .api }

401

def getExecutorMemoryStatus: Map[String, (Long, Long)]

402

```

403

404

**getExecutorStorageStatus**: Get storage status from all executors

405

```scala { .api }

406

def getExecutorStorageStatus: Array[StorageStatus]

407

```

408

409

**getRDDStorageInfo**: Get information about cached/persisted RDDs

410

```scala { .api }

411

def getRDDStorageInfo: Array[RDDInfo]

412

```

413

414

**getPersistentRDDs**: Get all currently persisted RDDs

415

```scala { .api }

416

def getPersistentRDDs: Map[Int, RDD[_]]

417

```

418

419

```scala

420

// Check memory usage across executors

421

val memoryStatus = sc.getExecutorMemoryStatus

422

memoryStatus.foreach { case (executorId, (maxMemory, remainingMemory)) =>

423

println(s"Executor $executorId: ${remainingMemory}/${maxMemory} bytes available")

424

}

425

426

// Check cached RDDs

427

val cachedRDDs = sc.getRDDStorageInfo

428

cachedRDDs.foreach { rddInfo =>

429

println(s"RDD ${rddInfo.id} (${rddInfo.name}): ${rddInfo.memSize} bytes in memory")

430

}

431

```

432

433

### System Properties

434

435

**version**: Get the version of Spark on which this application is running

436

```scala { .api }

437

def version: String

438

```

439

440

**defaultParallelism**: Default level of parallelism for operations

441

```scala { .api }

442

def defaultParallelism: Int

443

```

444

445

**defaultMinPartitions**: Default min number of partitions for Hadoop RDDs

446

```scala { .api }

447

def defaultMinPartitions: Int

448

```

449

450

```scala

451

println(s"Spark Version: ${sc.version}")

452

println(s"Default Parallelism: ${sc.defaultParallelism}")

453

println(s"Default Min Partitions: ${sc.defaultMinPartitions}")

454

```

455

456

### Hadoop Configuration

457

458

**hadoopConfiguration**: Access to Hadoop Configuration for reuse across operations

459

```scala { .api }

460

def hadoopConfiguration: Configuration

461

```

462

463

```scala

464

import org.apache.hadoop.conf.Configuration

465

466

val hadoopConf = sc.hadoopConfiguration

467

hadoopConf.set("fs.s3a.access.key", "your-access-key")

468

hadoopConf.set("fs.s3a.secret.key", "your-secret-key")

469

470

// Now S3 operations will use these credentials

471

val s3Data = sc.textFile("s3a://bucket/path/to/file")

472

```

473

474

## SparkContext Lifecycle

475

476

### Stopping SparkContext

477

478

**stop**: Shut down the SparkContext

479

```scala { .api }

480

def stop(): Unit

481

```

482

483

```scala

484

try {

485

val sc = new SparkContext(conf)

486

487

// Perform Spark operations

488

val data = sc.textFile("input.txt")

489

val result = data.map(_.toUpperCase).collect()

490

491

} finally {

492

sc.stop() // Always stop the context

493

}

494

```

495

496

### Best Practices

497

498

1. **Single SparkContext**: Only one SparkContext should be active per JVM

499

2. **Proper Shutdown**: Always call `stop()` to release resources

500

3. **Configuration**: Use SparkConf for all configuration rather than constructor parameters

501

4. **Shared Variables**: Use broadcast variables for large read-only data

502

5. **Accumulators**: Only use accumulators for debugging and monitoring

503

504

```scala

505

// Proper pattern for SparkContext usage

506

val conf = new SparkConf()

507

.setAppName("My Spark Application")

508

.setMaster("local[*]")

509

510

val sc = new SparkContext(conf)

511

512

try {

513

// All Spark operations here

514

515

} finally {

516

sc.stop()

517

}

518

```

519

520

The SparkContext is the foundation of all Spark applications and understanding its API is essential for effective Spark programming.