or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdio-operations.mdkey-value-operations.mdpartitioning-shuffling.mdrdd-operations.mdshared-variables.mdspark-context.mdstorage-persistence.md

spark-context.mddocs/

0

# SparkContext and Configuration

1

2

Core entry point for Spark applications providing configuration management, resource coordination, and the foundation for all Spark operations.

3

4

## Capabilities

5

6

### SparkContext

7

8

Main entry point for Spark functionality representing the connection to a Spark cluster. Used to create RDDs, accumulators, and broadcast variables.

9

10

```scala { .api }

11

/**

12

* Main entry point for Spark functionality

13

* @param config SparkConf object describing application configuration

14

*/

15

class SparkContext(config: SparkConf) extends ExecutorAllocationClient

16

17

// Core RDD creation methods

18

def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]

19

def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]

20

def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]

21

def emptyRDD[T: ClassTag]: RDD[T]

22

def range(start: Long, end: Long, step: Long = 1, numSlices: Int = defaultParallelism): RDD[Long]

23

def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T]

24

def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T]

25

26

// File-based RDD creation

27

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

28

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

29

def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]

30

def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration): RDD[Array[Byte]]

31

def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]

32

33

// Hadoop integration

34

def hadoopRDD[K, V](conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]],

35

keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

36

def hadoopFile[K, V](path: String, inputFormatClass: Class[_ <: InputFormat[K, V]],

37

keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

38

def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String, fClass: Class[F],

39

kClass: Class[K], vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)]

40

def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](conf: Configuration = hadoopConfiguration,

41

fClass: Class[F], kClass: Class[K], vClass: Class[V]): RDD[(K, V)]

42

def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V],

43

minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

44

def hadoopConfiguration: Configuration

45

46

// Shared variables

47

def broadcast[T: ClassTag](value: T): Broadcast[T]

48

def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]

49

def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]): Accumulator[T]

50

def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]): Accumulable[R, T]

51

def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]): Accumulable[R, T]

52

def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T](initialValue: R): Accumulable[R, T]

53

54

// Resource management

55

def stop(): Unit

56

def setCheckpointDir(directory: String): Unit

57

def getCheckpointDir: Option[String]

58

def requestExecutors(numAdditionalExecutors: Int): Boolean

59

def killExecutors(executorIds: Seq[String]): Boolean

60

def killExecutor(executorId: String): Boolean

61

62

// Job execution and management

63

def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit

64

def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]): Array[U]

65

def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]): Array[U]

66

def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U]

67

def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U]

68

def submitJob[T, U, R](rdd: RDD[T], processPartition: Iterator[T] => U, partitions: Seq[Int],

69

resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R]

70

def runApproximateJob[T, U, R](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U,

71

evaluator: ApproximateEvaluator[U, R], timeout: Long): PartialResult[R]

72

def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit

73

def setJobDescription(value: String): Unit

74

def clearJobGroup(): Unit

75

def cancelJobGroup(groupId: String): Unit

76

def cancelAllJobs(): Unit

77

78

// Files and JARs

79

def addFile(path: String): Unit

80

def addFile(path: String, recursive: Boolean): Unit

81

def addJar(path: String): Unit

82

def setLocalProperty(key: String, value: String): Unit

83

def getLocalProperty(key: String): String

84

85

// Status and monitoring

86

def statusTracker: SparkStatusTracker

87

def getExecutorMemoryStatus: Map[String, (Long, Long)]

88

def getExecutorStorageStatus: Array[StorageStatus]

89

def getRDDStorageInfo: Array[RDDInfo]

90

def getPersistentRDDs: Map[Int, RDD[_]]

91

def getAllPools: Seq[Schedulable]

92

def getPoolForName(pool: String): Option[Schedulable]

93

def getSchedulingMode: SchedulingMode.SchedulingMode

94

def metricsSystem: MetricsSystem

95

def addSparkListener(listener: SparkListener): Unit

96

97

// Configuration and context properties

98

def getConf: SparkConf

99

def setLogLevel(logLevel: String): Unit

100

def master: String

101

def appName: String

102

def applicationId: String

103

def applicationAttemptId: Option[String]

104

def version: String

105

def startTime: Long

106

def isLocal: Boolean

107

def isStopped: Boolean

108

def defaultParallelism: Int

109

def defaultMinPartitions: Int

110

111

// Call site management

112

def setCallSite(shortCallSite: String): Unit

113

def clearCallSite(): Unit

114

```

115

116

### SparkContext Companion Object

117

118

Static methods for SparkContext management and utility functions.

119

120

```scala { .api }

121

object SparkContext {

122

/**

123

* Get existing SparkContext or create a new one with provided configuration

124

*/

125

def getOrCreate(config: SparkConf): SparkContext

126

def getOrCreate(): SparkContext

127

128

/**

129

* Find JAR containing the given class

130

*/

131

def jarOfClass(cls: Class[_]): Option[String]

132

def jarOfObject(obj: AnyRef): Option[String]

133

}

134

```

135

136

**Usage Examples:**

137

138

```scala

139

import org.apache.spark.{SparkContext, SparkConf}

140

141

// Basic SparkContext creation

142

val conf = new SparkConf().setAppName("MyApp").setMaster("local[2]")

143

val sc = new SparkContext(conf)

144

145

// Alternative constructor

146

val sc2 = new SparkContext("local[*]", "MyApp")

147

148

// Create RDDs from collections

149

val data = List(1, 2, 3, 4, 5)

150

val rdd = sc.parallelize(data)

151

val rdd2 = sc.parallelize(data, numSlices = 4) // specify partitions

152

153

// Create RDDs from files

154

val textRDD = sc.textFile("hdfs://path/to/file.txt")

155

val wholeFiles = sc.wholeTextFiles("hdfs://path/to/directory")

156

157

// Resource management

158

sc.setCheckpointDir("hdfs://path/to/checkpoint")

159

sc.addFile("path/to/config.properties")

160

sc.addJar("path/to/dependency.jar")

161

162

// Job management

163

sc.setJobGroup("data-processing", "Processing user data", interruptOnCancel = true)

164

// ... run some jobs

165

sc.clearJobGroup()

166

167

// Clean shutdown

168

sc.stop()

169

```

170

171

### SparkConf

172

173

Configuration object for Spark applications containing key-value settings that control Spark's behavior.

174

175

```scala { .api }

176

/**

177

* Configuration for a Spark application

178

*/

179

class SparkConf(loadDefaults: Boolean = true)

180

181

// Configuration setting methods

182

def set(key: String, value: String): SparkConf

183

def setMaster(master: String): SparkConf

184

def setAppName(name: String): SparkConf

185

def setJars(jars: Seq[String]): SparkConf

186

def setJars(jars: Array[String]): SparkConf

187

def setExecutorEnv(variable: String, value: String): SparkConf

188

def setExecutorEnv(variables: Seq[(String, String)]): SparkConf

189

def setExecutorEnv(variables: Array[(String, String)]): SparkConf

190

191

// Configuration retrieval methods

192

def get(key: String): String

193

def get(key: String, defaultValue: String): String

194

def getOption(key: String): Option[String]

195

def getInt(key: String, defaultValue: Int): Int

196

def getLong(key: String, defaultValue: Long): Long

197

def getDouble(key: String, defaultValue: Double): Double

198

def getBoolean(key: String, defaultValue: Boolean): Boolean

199

def getSizeAsBytes(key: String, defaultValue: String): Long

200

def getSizeAsKb(key: String, defaultValue: String): Long

201

def getSizeAsMb(key: String, defaultValue: String): Long

202

def getSizeAsGb(key: String, defaultValue: String): Long

203

def getTimeAsMs(key: String, defaultValue: String): Long

204

def getTimeAsSeconds(key: String, defaultValue: String): Long

205

206

// Configuration introspection

207

def contains(key: String): Boolean

208

def getAll: Array[(String, String)]

209

def remove(key: String): SparkConf

210

def clone(): SparkConf

211

212

// Validation

213

def validateSettings(): SparkConf

214

```

215

216

**Usage Examples:**

217

218

```scala

219

import org.apache.spark.SparkConf

220

221

// Basic configuration

222

val conf = new SparkConf()

223

.setAppName("Data Processing App")

224

.setMaster("spark://master:7077")

225

.set("spark.executor.memory", "2g")

226

.set("spark.executor.cores", "4")

227

228

// Advanced configuration

229

val advancedConf = new SparkConf()

230

.setAppName("Advanced App")

231

.setMaster("yarn")

232

.setJars(Array("path/to/app.jar", "path/to/dependency.jar"))

233

.setExecutorEnv("PYTHON_PATH", "/opt/python")

234

.set("spark.sql.adaptive.enabled", "true")

235

.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

236

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

237

238

// Reading configuration values

239

val appName = conf.get("spark.app.name")

240

val executorMemory = conf.get("spark.executor.memory", "1g")

241

val executorCores = conf.getInt("spark.executor.cores", 2)

242

val isAdaptiveEnabled = conf.getBoolean("spark.sql.adaptive.enabled", false)

243

244

// Configuration validation and introspection

245

conf.validateSettings()

246

val allSettings = conf.getAll

247

println(s"All settings: ${allSettings.mkString(", ")}")

248

```

249

250

### SparkFiles

251

252

Utility object for resolving paths to files added through SparkContext.addFile().

253

254

```scala { .api }

255

/**

256

* Resolves paths to files added through SparkContext.addFile()

257

*/

258

object SparkFiles {

259

/**

260

* Get the absolute path of a file added through SparkContext.addFile()

261

* @param filename name of the file

262

* @return absolute path to the file

263

*/

264

def get(filename: String): String

265

266

/**

267

* Get the root directory that contains files added through SparkContext.addFile()

268

* @return path to the root directory

269

*/

270

def getRootDirectory(): String

271

}

272

```

273

274

### SparkStatusTracker

275

276

Low-level status reporting APIs for monitoring job and stage progress.

277

278

```scala { .api }

279

/**

280

* Low-level status reporting APIs for monitoring job and stage progress

281

*/

282

class SparkStatusTracker(sc: SparkContext) {

283

/**

284

* Return a list of all known jobs in a particular job group

285

*/

286

def getJobIdsForGroup(jobGroup: String): Array[Int]

287

288

/**

289

* Returns an array containing the IDs of all active jobs

290

*/

291

def getActiveJobIds(): Array[Int]

292

293

/**

294

* Returns an array containing the IDs of all active stages

295

*/

296

def getActiveStageIds(): Array[Int]

297

298

/**

299

* Returns stage information, or None if the stage info is not available

300

*/

301

def getStageInfo(stageId: Int): Option[SparkStageInfo]

302

303

/**

304

* Returns information of all executors known to this SparkContext

305

*/

306

def getExecutorInfos(): Array[SparkExecutorInfo]

307

}

308

```

309

310

## Error Handling

311

312

Common exceptions that may be thrown:

313

314

- **SparkException**: General Spark-related errors

315

- **IllegalArgumentException**: Invalid configuration parameters

316

- **IllegalStateException**: Attempting operations on stopped SparkContext

317

- **UnsupportedOperationException**: Unsupported operations in certain deployment modes

318

319

**Example Error Handling:**

320

321

```scala

322

import org.apache.spark.{SparkContext, SparkConf, SparkException}

323

324

try {

325

val conf = new SparkConf().setAppName("MyApp")

326

val sc = new SparkContext(conf)

327

328

// Your Spark operations here

329

330

} catch {

331

case e: SparkException =>

332

println(s"Spark error: ${e.getMessage}")

333

case e: IllegalArgumentException =>

334

println(s"Invalid configuration: ${e.getMessage}")

335

case e: Exception =>

336

println(s"Unexpected error: ${e.getMessage}")

337

} finally {

338

if (sc != null && !sc.isStopped) {

339

sc.stop()

340

}

341

}

342

```