or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcasting-accumulators.mdindex.mdjava-api.mdrdd-operations.mdspark-context.mdstorage-persistence.md

spark-context.mddocs/

0

# Core SparkContext and Configuration

1

2

The SparkContext and SparkConf classes provide the foundation for all Spark applications, handling cluster connections, resource management, and application configuration.

3

4

## SparkContext

5

6

The main entry point for Spark functionality that represents the connection to a Spark cluster.

7

8

### Constructors

9

10

```scala { .api }

11

class SparkContext(config: SparkConf)

12

class SparkContext() // Load from system properties

13

class SparkContext(master: String, appName: String, conf: SparkConf)

14

class SparkContext(

15

master: String,

16

appName: String,

17

sparkHome: String,

18

jars: Seq[String],

19

environment: Map[String, String]

20

)

21

```

22

23

### Configuration & Context Management

24

25

```scala { .api }

26

class SparkContext {

27

def getConf: SparkConf

28

def master: String

29

def appName: String

30

def applicationId: String

31

def version: String

32

def isLocal: Boolean

33

def isStopped: Boolean

34

def stop(): Unit

35

def defaultParallelism: Int

36

}

37

```

38

39

### RDD Creation Methods

40

41

```scala { .api }

42

class SparkContext {

43

def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]

44

def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]

45

def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] // With locality preferences

46

def range(start: Long, end: Long, step: Long = 1, numSlices: Int = defaultParallelism): RDD[Long]

47

def emptyRDD[T: ClassTag]: RDD[T]

48

def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T]

49

}

50

```

51

52

### File I/O Methods

53

54

```scala { .api }

55

class SparkContext {

56

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

57

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

58

def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]

59

def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration): RDD[Array[Byte]]

60

def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]

61

def sequenceFile[K, V](

62

path: String,

63

keyClass: Class[K],

64

valueClass: Class[V],

65

minPartitions: Int = defaultMinPartitions

66

): RDD[(K, V)]

67

}

68

```

69

70

### Hadoop Integration Methods

71

72

```scala { .api }

73

class SparkContext {

74

def hadoopRDD[K, V](

75

conf: JobConf,

76

inputFormatClass: Class[_ <: InputFormat[K, V]],

77

keyClass: Class[K],

78

valueClass: Class[V],

79

minPartitions: Int = defaultMinPartitions

80

): RDD[(K, V)]

81

82

def hadoopFile[K, V](

83

path: String,

84

inputFormatClass: Class[_ <: InputFormat[K, V]],

85

keyClass: Class[K],

86

valueClass: Class[V],

87

minPartitions: Int = defaultMinPartitions

88

): RDD[(K, V)]

89

90

def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](

91

path: String,

92

fClass: Class[F],

93

kClass: Class[K],

94

vClass: Class[V],

95

conf: Configuration = hadoopConfiguration

96

): RDD[(K, V)]

97

98

def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](

99

conf: Configuration,

100

fClass: Class[F],

101

kClass: Class[K],

102

vClass: Class[V]

103

): RDD[(K, V)]

104

}

105

```

106

107

### Job Control

108

109

```scala { .api }

110

class SparkContext {

111

def runJob[T, U: ClassTag](

112

rdd: RDD[T],

113

func: (TaskContext, Iterator[T]) => U,

114

partitions: Seq[Int],

115

resultHandler: (Int, U) => Unit

116

): Unit

117

118

def runJob[T, U: ClassTag](

119

rdd: RDD[T],

120

func: (TaskContext, Iterator[T]) => U,

121

partitions: Seq[Int]

122

): Array[U]

123

124

def runJob[T, U: ClassTag](

125

rdd: RDD[T],

126

func: Iterator[T] => U,

127

partitions: Seq[Int]

128

): Array[U]

129

130

def runJob[T, U: ClassTag](

131

rdd: RDD[T],

132

func: (TaskContext, Iterator[T]) => U

133

): Array[U]

134

135

def runApproximateJob[T, U, R](

136

rdd: RDD[T],

137

func: (TaskContext, Iterator[T]) => U,

138

evaluator: ApproximateEvaluator[U, R],

139

timeout: Long

140

): PartialResult[R]

141

142

def submitJob[T, U, R](

143

rdd: RDD[T],

144

processPartition: Iterator[T] => U,

145

partitions: Seq[Int],

146

resultHandler: (Int, U) => Unit,

147

resultFunc: => R

148

): SimpleFutureAction[R]

149

150

def cancelJobGroup(groupId: String): Unit

151

def cancelAllJobs(): Unit

152

def cancelJob(jobId: Int): Unit

153

def cancelStage(stageId: Int): Unit

154

}

155

```

156

157

### Thread-Local Properties & Job Management

158

159

```scala { .api }

160

class SparkContext {

161

def setLocalProperty(key: String, value: String): Unit

162

def getLocalProperty(key: String): String

163

def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit

164

def clearJobGroup(): Unit

165

def setJobDescription(value: String): Unit

166

}

167

```

168

169

### Storage & Persistence Information

170

171

```scala { .api }

172

class SparkContext {

173

def getPersistentRDDs: Map[Int, RDD[_]]

174

def getRDDStorageInfo: Array[RDDInfo]

175

def getExecutorMemoryStatus: Map[String, (Long, Long)]

176

}

177

```

178

179

### Files & JARs

180

181

```scala { .api }

182

class SparkContext {

183

def addFile(path: String): Unit

184

def addFile(path: String, recursive: Boolean): Unit

185

def listFiles(): Seq[String]

186

def addJar(path: String): Unit

187

def listJars(): Seq[String]

188

}

189

```

190

191

### Checkpointing

192

193

```scala { .api }

194

class SparkContext {

195

def setCheckpointDir(directory: String): Unit

196

def getCheckpointFile: Option[String]

197

}

198

```

199

200

### Event Listeners

201

202

```scala { .api }

203

class SparkContext {

204

def addSparkListener(listener: SparkListenerInterface): Unit

205

def removeSparkListener(listener: SparkListenerInterface): Unit

206

}

207

```

208

209

### Utility Methods

210

211

```scala { .api }

212

class SparkContext {

213

def setLogLevel(logLevel: String): Unit

214

}

215

```

216

217

## SparkConf

218

219

Configuration for a Spark application with key-value pairs for various Spark parameters.

220

221

### Constructors

222

223

```scala { .api }

224

class SparkConf(loadDefaults: Boolean = true)

225

```

226

227

### Configuration Setting

228

229

```scala { .api }

230

class SparkConf {

231

def set(key: String, value: String): SparkConf

232

def setMaster(master: String): SparkConf

233

def setAppName(name: String): SparkConf

234

def setJars(jars: Seq[String]): SparkConf

235

def setJars(jars: Array[String]): SparkConf

236

def setExecutorEnv(variable: String, value: String): SparkConf

237

def setExecutorEnv(variables: Seq[(String, String)]): SparkConf

238

def setExecutorEnv(variables: Array[(String, String)]): SparkConf

239

def setSparkHome(home: String): SparkConf

240

def setAll(settings: Traversable[(String, String)]): SparkConf

241

def setIfMissing(key: String, value: String): SparkConf

242

}

243

```

244

245

### Configuration Retrieval

246

247

```scala { .api }

248

class SparkConf {

249

def get(key: String): String

250

def get(key: String, defaultValue: String): String

251

def getOption(key: String): Option[String]

252

def getAll: Array[(String, String)]

253

def getAllWithPrefix(prefix: String): Array[(String, String)]

254

def getInt(key: String, defaultValue: Int): Int

255

def getLong(key: String, defaultValue: Long): Long

256

def getDouble(key: String, defaultValue: Double): Double

257

def getBoolean(key: String, defaultValue: Boolean): Boolean

258

def getExecutorEnv: Seq[(String, String)]

259

def getAppId: String

260

}

261

```

262

263

### Time & Size Configuration

264

265

```scala { .api }

266

class SparkConf {

267

def getTimeAsSeconds(key: String): Long

268

def getTimeAsSeconds(key: String, defaultValue: String): Long

269

def getTimeAsMs(key: String): Long

270

def getTimeAsMs(key: String, defaultValue: String): Long

271

def getSizeAsBytes(key: String): Long

272

def getSizeAsBytes(key: String, defaultValue: String): Long

273

def getSizeAsBytes(key: String, defaultValue: Long): Long

274

def getSizeAsKb(key: String): Long

275

def getSizeAsMb(key: String): Long

276

def getSizeAsGb(key: String): Long

277

}

278

```

279

280

### Kryo Serialization

281

282

```scala { .api }

283

class SparkConf {

284

def registerKryoClasses(classes: Array[Class[_]]): SparkConf

285

def registerAvroSchemas(schemas: Schema*): SparkConf

286

def getAvroSchema: Map[Long, String]

287

}

288

```

289

290

### Configuration Management

291

292

```scala { .api }

293

class SparkConf {

294

def remove(key: String): SparkConf

295

def contains(key: String): Boolean

296

def clone: SparkConf

297

def toDebugString: String

298

}

299

```

300

301

## Usage Examples

302

303

### Basic SparkContext Setup

304

305

```scala

306

import org.apache.spark.{SparkContext, SparkConf}

307

308

val conf = new SparkConf()

309

.setAppName("My Application")

310

.setMaster("local[4]")

311

.set("spark.executor.memory", "2g")

312

313

val sc = new SparkContext(conf)

314

315

// Use SparkContext

316

val rdd = sc.parallelize(1 to 100)

317

318

// Always stop the context

319

sc.stop()

320

```

321

322

### Advanced Configuration

323

324

```scala

325

val conf = new SparkConf()

326

.setAppName("Advanced App")

327

.setMaster("yarn")

328

.set("spark.executor.instances", "10")

329

.set("spark.executor.memory", "4g")

330

.set("spark.executor.cores", "2")

331

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

332

.setExecutorEnv("JAVA_HOME", "/usr/lib/jvm/java-8-oracle")

333

.registerKryoClasses(Array(classOf[MyCustomClass]))

334

335

val sc = new SparkContext(conf)

336

```

337

338

### File Input/Output

339

340

```scala

341

// Read text files

342

val textRDD = sc.textFile("hdfs://path/to/input")

343

344

// Read whole text files (filename, content)

345

val wholeFilesRDD = sc.wholeTextFiles("hdfs://path/to/directory")

346

347

// Read binary files

348

val binaryRDD = sc.binaryFiles("hdfs://path/to/binary")

349

350

// Read Hadoop SequenceFile

351

val sequenceRDD = sc.sequenceFile[String, Int](

352

"hdfs://path/to/sequence",

353

classOf[String],

354

classOf[Int]

355

)

356

```

357

358

### Job Management

359

360

```scala

361

// Set job group for related jobs

362

sc.setJobGroup("group1", "Processing user data", interruptOnCancel = true)

363

364

// Set job description

365

sc.setJobDescription("Computing user statistics")

366

367

// Execute jobs

368

val result1 = rdd1.collect()

369

val result2 = rdd2.count()

370

371

// Cancel all jobs in group

372

sc.cancelJobGroup("group1")

373

```

374

375

## Configuration Properties

376

377

### Core Application Settings

378

379

- `spark.app.name` - Application name

380

- `spark.master` - Master URL (local[*], yarn, spark://host:port)

381

- `spark.executor.memory` - Memory per executor (e.g., "1g", "2g")

382

- `spark.executor.cores` - CPU cores per executor

383

- `spark.executor.instances` - Number of executors (for YARN/Kubernetes)

384

- `spark.driver.memory` - Driver memory (e.g., "1g")

385

- `spark.driver.cores` - Driver CPU cores

386

387

### Performance Tuning

388

389

- `spark.default.parallelism` - Default number of partitions

390

- `spark.serializer` - Serializer class (KryoSerializer recommended)

391

- `spark.kryo.registrationRequired` - Require Kryo class registration

392

- `spark.rdd.compress` - Compress RDD partitions in memory

393

- `spark.broadcast.compress` - Compress broadcast variables

394

395

### Memory Management

396

397

- `spark.memory.fraction` - Fraction of heap space for execution and storage

398

- `spark.memory.storageFraction` - Fraction of memory for storage

399

- `spark.memory.offHeap.enabled` - Use off-heap memory

400

- `spark.memory.offHeap.size` - Off-heap memory size

401

402

## Important Notes

403

404

- Only one SparkContext can be active per JVM at a time

405

- Always call `stop()` on SparkContext before creating a new one

406

- SparkContext is not thread-safe for modifications

407

- Configuration changes cannot be made after SparkContext creation

408

- Use `setIfMissing()` to provide default values without overriding existing settings