or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcast-accumulators.mdcontext-configuration.mdindex.mdjava-api.mdkey-value-operations.mdrdd-operations.mdstatus-monitoring.mdstorage-persistence.mdtask-context.md

context-configuration.mddocs/

0

# Context and Configuration

1

2

This module provides the primary entry points for creating and configuring Spark applications. The SparkContext serves as the main coordination point for distributed computation, while SparkConf handles all configuration parameters.

3

4

## SparkConf

5

6

SparkConf manages all configuration parameters for a Spark application.

7

8

```scala { .api }

9

class SparkConf(loadDefaults: Boolean = true) {

10

def set(key: String, value: String): SparkConf

11

def setMaster(master: String): SparkConf

12

def setAppName(name: String): SparkConf

13

def setJars(jars: Seq[String]): SparkConf

14

def setExecutorEnv(variable: String, value: String): SparkConf

15

def setExecutorEnv(variables: Seq[(String, String)]): SparkConf

16

def setExecutorEnv(variables: Array[(String, String)]): SparkConf

17

def get(key: String): String

18

def get(key: String, defaultValue: String): String

19

def getOption(key: String): Option[String]

20

def getAll: Array[(String, String)]

21

def remove(key: String): SparkConf

22

def contains(key: String): Boolean

23

def clone(): SparkConf

24

def setSparkHome(home: String): SparkConf

25

def setIfMissing(key: String, value: String): SparkConf

26

}

27

```

28

29

### Usage Examples

30

31

```scala

32

import org.apache.spark.SparkConf

33

34

// Basic configuration

35

val conf = new SparkConf()

36

.setAppName("My Spark Application")

37

.setMaster("local[4]")

38

.set("spark.sql.adaptive.enabled", "true")

39

.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

40

41

// Configure executor settings

42

conf.set("spark.executor.memory", "4g")

43

.set("spark.executor.cores", "2")

44

.set("spark.executor.instances", "10")

45

46

// Set environment variables for executors

47

conf.setExecutorEnv("JAVA_HOME", "/usr/lib/jvm/java-8-openjdk")

48

.setExecutorEnv(Seq(

49

("SPARK_LOCAL_DIRS", "/tmp/spark"),

50

("SPARK_WORKER_DIR", "/tmp/spark-worker")

51

))

52

53

// Conditional configuration

54

if (!conf.contains("spark.master")) {

55

conf.setMaster("local[*]")

56

}

57

58

// Clone configuration for different contexts

59

val testConf = conf.clone()

60

.setAppName("Test Application")

61

.set("spark.sql.execution.arrow.pyspark.enabled", "false")

62

```

63

64

## SparkContext

65

66

SparkContext is the main entry point for Spark functionality. Only one SparkContext should be active per JVM.

67

68

```scala { .api }

69

class SparkContext(config: SparkConf) {

70

// RDD Creation

71

def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]

72

def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]

73

def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]

74

def emptyRDD[T: ClassTag]: RDD[T]

75

def range(start: Long, end: Long, step: Long = 1, numSlices: Int = defaultParallelism): RDD[Long]

76

77

// File Input

78

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

79

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

80

def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]

81

def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

82

def hadoopRDD[K, V](conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

83

def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](conf: Configuration = hadoopConfiguration, fClass: Class[F], kClass: Class[K], vClass: Class[V]): RDD[(K, V)]

84

85

// Broadcast and Accumulators

86

def broadcast[T: ClassTag](value: T): Broadcast[T]

87

def longAccumulator(): LongAccumulator

88

def longAccumulator(name: String): LongAccumulator

89

def doubleAccumulator(): DoubleAccumulator

90

def doubleAccumulator(name: String): DoubleAccumulator

91

def collectionAccumulator[T](): CollectionAccumulator[T]

92

def collectionAccumulator[T](name: String): CollectionAccumulator[T]

93

94

// Application Control

95

def stop(): Unit

96

def addFile(path: String): Unit

97

def addFile(path: String, recursive: Boolean): Unit

98

def addJar(path: String): Unit

99

def clearFiles(): Unit

100

def clearJars(): Unit

101

102

// Configuration and Environment

103

def setLogLevel(logLevel: String): Unit

104

def setLocalProperty(key: String, value: String): Unit

105

def getLocalProperty(key: String): String

106

def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit

107

def clearJobGroup(): Unit

108

def setJobDescription(value: String): Unit

109

110

// Dynamic Resource Allocation

111

def requestTotalExecutors(requestedTotal: Int, localityAwareTasks: Int = 0, hostToLocalTaskCount: Map[String, Int] = Map.empty): Boolean

112

def requestExecutors(numAdditionalExecutors: Int): Boolean

113

def killExecutors(executorIds: Seq[String]): Boolean

114

def killExecutor(executorId: String): Boolean

115

116

// Properties

117

def master: String

118

def appName: String

119

def jars: Seq[String]

120

def files: Seq[String]

121

def startTime: Long

122

def version: String

123

def defaultParallelism: Int

124

def defaultMinPartitions: Int

125

def conf: SparkConf

126

def statusTracker: SparkStatusTracker

127

}

128

```

129

130

### Usage Examples

131

132

```scala

133

import org.apache.spark.{SparkContext, SparkConf}

134

import org.apache.spark.storage.StorageLevel

135

136

// Create configuration and context

137

val conf = new SparkConf()

138

.setAppName("Data Processing Pipeline")

139

.setMaster("spark://master:7077")

140

141

val sc = new SparkContext(conf)

142

143

// Create RDDs from various sources

144

val numbers = sc.parallelize(1 to 1000000, numSlices = 100)

145

val lines = sc.textFile("hdfs://data/input.txt", minPartitions = 50)

146

val keyValueData = sc.sequenceFile[String, Int]("hdfs://data/sequence")

147

148

// Set job properties for monitoring

149

sc.setJobGroup("data-processing", "Main data processing pipeline")

150

sc.setJobDescription("Processing customer transaction data")

151

152

// Add application files and JARs

153

sc.addFile("hdfs://shared/config.properties")

154

sc.addJar("s3://libs/custom-transformations.jar")

155

156

// Create broadcast variables for lookup tables

157

val lookupTable = Map("A" -> 1, "B" -> 2, "C" -> 3)

158

val broadcastLookup = sc.broadcast(lookupTable)

159

160

// Create accumulators for metrics

161

val errorCount = sc.longAccumulator("Processing Errors")

162

val processingTime = sc.doubleAccumulator("Total Processing Time")

163

164

// Dynamic resource management

165

if (sc.defaultParallelism < 100) {

166

sc.requestExecutors(20)

167

}

168

169

// Process data using broadcast and accumulators

170

val results = lines.map { line =>

171

try {

172

val startTime = System.currentTimeMillis()

173

val processed = processLine(line, broadcastLookup.value)

174

processingTime.add(System.currentTimeMillis() - startTime)

175

processed

176

} catch {

177

case e: Exception =>

178

errorCount.add(1)

179

throw e

180

}

181

}

182

183

// Persist intermediate results

184

results.persist(StorageLevel.MEMORY_AND_DISK_SER)

185

186

// Trigger computation and collect metrics

187

val finalCount = results.count()

188

println(s"Processed $finalCount records")

189

println(s"Error count: ${errorCount.value}")

190

println(s"Average processing time: ${processingTime.value / finalCount}ms")

191

192

// Clean up

193

sc.stop()

194

```

195

196

## Package Constants

197

198

Access to Spark version and build information.

199

200

```scala { .api }

201

// Available in org.apache.spark package object

202

val SPARK_VERSION: String

203

val SPARK_VERSION_SHORT: String

204

val SPARK_BRANCH: String

205

val SPARK_REVISION: String

206

val SPARK_BUILD_USER: String

207

val SPARK_REPO_URL: String

208

val SPARK_BUILD_DATE: String

209

```

210

211

### Usage Example

212

213

```scala

214

import org.apache.spark._

215

216

println(s"Running Spark version: $SPARK_VERSION")

217

println(s"Built by: $SPARK_BUILD_USER on $SPARK_BUILD_DATE")

218

println(s"Git revision: $SPARK_REVISION")

219

```

220

221

## Best Practices

222

223

### Configuration Management

224

- Use configuration files or environment variables for deployment-specific settings

225

- Set appropriate memory and core allocations based on cluster resources

226

- Enable adaptive query execution for SQL workloads

227

- Configure serialization (prefer Kryo over Java serialization)

228

229

### Resource Management

230

- Monitor executor resource utilization and adjust allocations

231

- Use dynamic resource allocation in multi-tenant environments

232

- Set appropriate timeouts for network operations

233

- Configure storage levels based on data access patterns

234

235

### Error Handling

236

- Implement proper exception handling in transformations and actions

237

- Use accumulator variables to collect error metrics

238

- Set up comprehensive logging and monitoring

239

- Handle node failures gracefully with appropriate retry policies