or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcast-accumulators.mdcontext-management.mdindex.mdjava-api.mdpair-rdd-operations.mdrdd-operations.mdstorage-persistence.md

context-management.mddocs/

0

# Context Management

1

2

Core functionality for creating and managing Spark applications, including cluster connections, resource allocation, and application lifecycle management.

3

4

## Capabilities

5

6

### SparkContext

7

8

The main entry point for Spark functionality that represents the connection to a Spark cluster and coordinates all Spark operations.

9

10

```scala { .api }

11

/**

12

* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark

13

* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.

14

* Only one SparkContext may be active per JVM.

15

*

16

* @param config a Spark Config object describing the application configuration

17

*/

18

class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {

19

20

/** Create an RDD from a local Scala collection */

21

def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]

22

23

/** Read a text file from HDFS, local file system, or any Hadoop-supported file system URI */

24

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

25

26

/** Read a directory of text files from HDFS, local file system, or any Hadoop-supported file system URI */

27

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

28

29

/** Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS */

30

def sequenceFile[K, V](path: String,

31

keyClass: Class[K],

32

valueClass: Class[V],

33

minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

34

35

/** Load an RDD saved as a SequenceFile containing serialized objects */

36

def objectFile[T: ClassTag](path: String, minPartitions: Int = defaultMinPartitions): RDD[T]

37

38

/** Get an RDD for a Hadoop file with an arbitrary InputFormat */

39

def hadoopFile[K, V](path: String,

40

inputFormatClass: Class[_ <: InputFormat[K, V]],

41

keyClass: Class[K],

42

valueClass: Class[V],

43

minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

44

45

/** Get RDD for a Hadoop file using the new MapReduce API */

46

def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String,

47

fClass: Class[F],

48

kClass: Class[K],

49

vClass: Class[V],

50

conf: Configuration = hadoopConfiguration): RDD[(K, V)]

51

52

/** Broadcast a read-only variable to all worker nodes */

53

def broadcast[T: ClassTag](value: T): Broadcast[T]

54

55

/** Create an accumulator variable of a given type */

56

def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]

57

58

/** Create an accumulator variable with a name for display in the Spark UI */

59

def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]): Accumulator[T]

60

61

/** Add a file to be downloaded with this Spark job on every node */

62

def addFile(path: String): Unit

63

64

/** Add a JAR dependency for all tasks to be executed on this SparkContext */

65

def addJar(path: String): Unit

66

67

/** Control our logLevel. This overrides any user-defined log settings */

68

def setLogLevel(logLevel: String): Unit

69

70

/** Assign a group ID to all the jobs started by this thread until the group ID is set to a different value */

71

def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit

72

73

/** Clear the current thread's job group ID and its description */

74

def clearJobGroup(): Unit

75

76

/** Set a local property that affects jobs submitted from this thread */

77

def setLocalProperty(key: String, value: String): Unit

78

79

/** Get a local property set in this thread, or null if it is missing */

80

def getLocalProperty(key: String): String

81

82

/** Set the directory under which RDDs are going to be checkpointed */

83

def setCheckpointDir(directory: String): Unit

84

85

/** Return the directory where checkpoints are stored */

86

def getCheckpointDir: Option[String]

87

88

/** Shut down the SparkContext */

89

def stop(): Unit

90

91

/** Default level of parallelism to use when not given by user */

92

def defaultParallelism: Int

93

94

/** Default min number of partitions for Hadoop RDDs when not given by user */

95

def defaultMinPartitions: Int

96

97

/** The version of Spark on which this application is running */

98

def version: String

99

100

/** Return the Spark configuration */

101

def getConf: SparkConf

102

103

/** The application name */

104

def appName: String

105

106

/** A unique identifier for the Spark application */

107

def applicationId: String

108

109

/** Return the URL of the SparkUI instance started by this SparkContext */

110

def uiWebUrl: Option[String]

111

112

/** Get the status of a Spark job */

113

def statusTracker: SparkStatusTracker

114

}

115

```

116

117

**Usage Examples:**

118

119

```scala

120

import org.apache.spark.{SparkContext, SparkConf}

121

122

// Basic context creation

123

val conf = new SparkConf().setAppName("MyApp").setMaster("local[*]")

124

val sc = new SparkContext(conf)

125

126

// Create RDDs from various sources

127

val numberRDD = sc.parallelize(1 to 1000, 4) // 4 partitions

128

val textRDD = sc.textFile("hdfs://path/to/file.txt")

129

val wholeFilesRDD = sc.wholeTextFiles("hdfs://path/to/directory")

130

131

// Configure job properties

132

sc.setJobGroup("data-processing", "Processing user data")

133

sc.setLocalProperty("spark.sql.adaptive.enabled", "true")

134

135

// Add dependencies

136

sc.addFile("s3://bucket/config.properties")

137

sc.addJar("hdfs://path/to/dependency.jar")

138

139

// Always stop the context

140

sc.stop()

141

```

142

143

### SparkConf

144

145

Configuration object for a Spark application, used to set various Spark parameters as key-value pairs.

146

147

```scala { .api }

148

/**

149

* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.

150

* Most of the time, you would create a SparkConf object with new SparkConf(), which will load

151

* values from any spark.* Java system properties set in your application as well.

152

*

153

* @param loadDefaults whether to also load values from Java system properties

154

*/

155

class SparkConf(loadDefaults: Boolean = true) extends Cloneable with Logging {

156

157

/** Set a configuration variable */

158

def set(key: String, value: String): SparkConf

159

160

/** Set multiple parameters together */

161

def setAll(settings: Traversable[(String, String)]): SparkConf

162

163

/** Set the master URL to connect to */

164

def setMaster(master: String): SparkConf

165

166

/** Set the application name */

167

def setAppName(name: String): SparkConf

168

169

/** Set JAR files to distribute to the cluster */

170

def setJars(jars: Seq[String]): SparkConf

171

172

/** Set Spark's home directory on worker nodes */

173

def setSparkHome(home: String): SparkConf

174

175

/** Set the location where Spark is installed on worker nodes */

176

def setExecutorEnv(variable: String, value: String): SparkConf

177

178

/** Set multiple environment variables for executors */

179

def setExecutorEnv(variables: Seq[(String, String)]): SparkConf

180

181

/** Get a parameter; throws a NoSuchElementException if it's not set */

182

def get(key: String): String

183

184

/** Get a parameter, falling back to a default if not set */

185

def get(key: String, defaultValue: String): String

186

187

/** Get a parameter as an Option */

188

def getOption(key: String): Option[String]

189

190

/** Get all parameters as a list of (key, value) pairs */

191

def getAll: Array[(String, String)]

192

193

/** Remove a parameter from the configuration */

194

def remove(key: String): SparkConf

195

196

/** Does the configuration contain a given parameter? */

197

def contains(key: String): Boolean

198

199

/** Clone this SparkConf */

200

override def clone: SparkConf

201

}

202

```

203

204

**Usage Examples:**

205

206

```scala

207

import org.apache.spark.SparkConf

208

209

// Basic configuration

210

val conf = new SparkConf()

211

.setAppName("Data Processing Job")

212

.setMaster("spark://cluster:7077")

213

.set("spark.executor.memory", "4g")

214

.set("spark.executor.cores", "2")

215

216

// Advanced configuration

217

val advancedConf = new SparkConf()

218

.setAppName("ML Pipeline")

219

.setMaster("yarn")

220

.set("spark.submit.deployMode", "cluster")

221

.set("spark.executor.instances", "10")

222

.set("spark.executor.memory", "8g")

223

.set("spark.executor.cores", "4")

224

.set("spark.sql.adaptive.enabled", "true")

225

.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

226

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

227

.setExecutorEnv("JAVA_HOME", "/usr/lib/jvm/java-8-openjdk")

228

229

// Configuration with JAR dependencies

230

val confWithJars = new SparkConf()

231

.setAppName("Spark with Dependencies")

232

.setJars(List("s3://bucket/lib1.jar", "hdfs://path/lib2.jar"))

233

.setSparkHome("/opt/spark")

234

235

// Reading configuration values

236

val master = conf.get("spark.master")

237

val appName = conf.get("spark.app.name")

238

val executorMemory = conf.get("spark.executor.memory", "1g") // with default

239

240

// Check if configuration exists

241

if (conf.contains("spark.sql.adaptive.enabled")) {

242

println("Adaptive query execution is configured")

243

}

244

245

// Clone configuration for modifications

246

val devConf = conf.clone().setMaster("local[*]")

247

```

248

249

### SparkStatusTracker

250

251

Interface for monitoring the status of Spark jobs and stages.

252

253

```scala { .api }

254

/**

255

* Low-level status reporting APIs for monitoring job and stage progress.

256

*/

257

class SparkStatusTracker private[spark] (sc: SparkContext) {

258

259

/** Get the list of active job IDs */

260

def getJobIdsForGroup(jobGroup: String): Array[Int]

261

262

/** Get information about a specific job */

263

def getJobInfo(jobId: Int): Option[SparkJobInfo]

264

265

/** Get information about a specific stage */

266

def getStageInfo(stageId: Int): Option[SparkStageInfo]

267

268

/** Get information about all active stages */

269

def getActiveStageInfos(): Array[SparkStageInfo]

270

271

/** Get information about all active jobs */

272

def getActiveJobsInfos(): Array[SparkJobInfo]

273

274

/** Get executor information for all executors */

275

def getExecutorInfos(): Array[SparkExecutorInfo]

276

}

277

278

/** Information about a Spark job */

279

class SparkJobInfo(

280

val jobId: Int,

281

val stageIds: Array[Int],

282

val status: JobExecutionStatus)

283

284

/** Information about a Spark stage */

285

class SparkStageInfo(

286

val stageId: Int,

287

val currentAttemptId: Int,

288

val name: String,

289

val numTasks: Int,

290

val numActiveTasks: Int,

291

val numCompletedTasks: Int,

292

val numFailedTasks: Int)

293

```

294

295

**Usage Examples:**

296

297

```scala

298

val sc = new SparkContext(conf)

299

val statusTracker = sc.statusTracker

300

301

// Monitor active jobs

302

val activeJobs = statusTracker.getActiveJobsInfos()

303

activeJobs.foreach { job =>

304

println(s"Job ${job.jobId}: ${job.status}")

305

}

306

307

// Monitor stages

308

val activeStages = statusTracker.getActiveStageInfos()

309

activeStages.foreach { stage =>

310

val progress = stage.numCompletedTasks.toDouble / stage.numTasks

311

println(s"Stage ${stage.stageId}: ${(progress * 100).toInt}% complete")

312

}

313

314

// Monitor executors

315

val executors = statusTracker.getExecutorInfos()

316

executors.foreach { executor =>

317

println(s"Executor ${executor.executorId}: ${executor.totalCores} cores")

318

}

319

```

320

321

## Configuration Properties

322

323

Key configuration properties for SparkConf:

324

325

### Application Properties

326

- `spark.app.name` - Application name displayed in UI and logs

327

- `spark.master` - Cluster manager to connect to (local, yarn, spark://host:port)

328

- `spark.submit.deployMode` - Deploy mode: client or cluster

329

330

### Runtime Environment

331

- `spark.driver.memory` - Amount of memory for driver process (e.g., "1g", "2048m")

332

- `spark.driver.cores` - Number of cores for driver in cluster mode

333

- `spark.executor.memory` - Amount of memory per executor process

334

- `spark.executor.cores` - Number of cores per executor

335

- `spark.executor.instances` - Number of executor instances (static allocation)

336

337

### Dynamic Allocation

338

- `spark.dynamicAllocation.enabled` - Enable dynamic allocation of executors

339

- `spark.dynamicAllocation.minExecutors` - Minimum number of executors

340

- `spark.dynamicAllocation.maxExecutors` - Maximum number of executors

341

- `spark.dynamicAllocation.initialExecutors` - Initial number of executors

342

343

### Memory Management

344

- `spark.driver.maxResultSize` - Maximum size of results that can be collected to driver

345

- `spark.executor.heartbeatInterval` - Interval between executor heartbeats

346

- `spark.network.timeout` - Default timeout for network interactions

347

348

### Spark UI

349

- `spark.ui.enabled` - Whether to run the web UI (default: true)

350

- `spark.ui.port` - Port for web UI (default: 4040)

351

- `spark.eventLog.enabled` - Whether to log events for replay in UI

352

- `spark.eventLog.dir` - Directory for event logs

353

354

## Error Handling

355

356

Common exceptions thrown by SparkContext operations:

357

358

- `SparkException` - General Spark-related errors

359

- `IllegalStateException` - When trying to use stopped SparkContext

360

- `IllegalArgumentException` - Invalid configuration parameters

361

- `FileNotFoundException` - When input files don't exist

362

- `OutOfMemoryError` - Insufficient memory for operations