or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

broadcast-accumulators.mddata-io-persistence.mdindex.mdkey-value-operations.mdrdd-operations.mdsparkcontext.md

sparkcontext.mddocs/

0

# SparkContext and Configuration

1

2

SparkContext serves as the main entry point to Spark functionality, coordinating with cluster managers and providing methods to create RDDs, manage resources, and control application lifecycle. SparkConf manages application configuration and cluster settings.

3

4

## Capabilities

5

6

### SparkContext Creation and Lifecycle

7

8

SparkContext initialization and termination for application lifecycle management.

9

10

```scala { .api }

11

class SparkContext(config: SparkConf) {

12

def this() = this(new SparkConf())

13

}

14

15

object SparkContext {

16

def getOrCreate(): SparkContext

17

def getOrCreate(conf: SparkConf): SparkContext

18

}

19

20

// Lifecycle methods

21

def stop(): Unit

22

def version: String

23

def applicationId: String

24

def applicationAttemptId: Option[String]

25

```

26

27

**Usage Example:**

28

```scala

29

val conf = new SparkConf().setAppName("MyApp").setMaster("local[*]")

30

val sc = new SparkContext(conf)

31

try {

32

// Application logic

33

} finally {

34

sc.stop()

35

}

36

```

37

38

### RDD Creation

39

40

Methods to create RDDs from various data sources.

41

42

```scala { .api }

43

// From collections

44

def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]

45

def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]

46

def range(start: Long, end: Long, step: Long = 1, numSlices: Int = defaultParallelism): RDD[Long]

47

def emptyRDD[T: ClassTag]: RDD[T]

48

49

// From files

50

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

51

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

52

def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]

53

54

// From Hadoop

55

def hadoopRDD[K, V](conf: JobConf, inputFormat: Class[_ <: InputFormat[K, V]],

56

keyClass: Class[K], valueClass: Class[V],

57

minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

58

59

def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](conf: Configuration,

60

fClass: Class[F], kClass: Class[K], vClass: Class[V]): RDD[(K, V)]

61

62

def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V],

63

minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

64

```

65

66

**Usage Example:**

67

```scala

68

// From collection

69

val data = sc.parallelize(1 to 100, 4)

70

71

// Create numeric range RDD

72

val numbers = sc.range(1, 1000000, step = 2, numSlices = 8)

73

74

// Create empty RDD

75

val empty = sc.emptyRDD[String]

76

77

// From text file

78

val lines = sc.textFile("hdfs://path/to/file.txt")

79

80

// Multiple files with their names

81

val files = sc.wholeTextFiles("hdfs://path/to/directory")

82

83

// Binary files as PortableDataStream

84

val binaries = sc.binaryFiles("hdfs://path/to/images/*.jpg")

85

```

86

87

### Shared Variables

88

89

Create broadcast variables and accumulators for efficient cluster-wide data sharing.

90

91

```scala { .api }

92

// Broadcast variables

93

def broadcast[T: ClassTag](value: T): Broadcast[T]

94

95

// Accumulators

96

def longAccumulator(): LongAccumulator

97

def longAccumulator(name: String): LongAccumulator

98

def doubleAccumulator(): DoubleAccumulator

99

def doubleAccumulator(name: String): DoubleAccumulator

100

def collectionAccumulator[T](): CollectionAccumulator[T]

101

def collectionAccumulator[T](name: String): CollectionAccumulator[T]

102

```

103

104

**Usage Example:**

105

```scala

106

// Broadcast variable

107

val lookupTable = Map("key1" -> "value1", "key2" -> "value2")

108

val broadcastVar = sc.broadcast(lookupTable)

109

110

// Long accumulator

111

val errorCount = sc.longAccumulator("Errors")

112

```

113

114

### Configuration Management

115

116

Access and modify Spark configuration during runtime.

117

118

```scala { .api }

119

def getConf: SparkConf

120

def setLocalProperty(key: String, value: String): Unit

121

def getLocalProperty(key: String): String

122

def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit

123

def clearJobGroup(): Unit

124

```

125

126

### Cluster Resource Management

127

128

Control cluster resources and executor allocation.

129

130

```scala { .api }

131

def requestTotalExecutors(numExecutors: Int, localityAwareTasks: Int,

132

hostToLocalTaskCount: Map[String, Int]): Boolean

133

def requestExecutors(numAdditionalExecutors: Int): Boolean

134

def killExecutors(executorIds: Seq[String]): Boolean

135

def killExecutor(executorId: String): Boolean

136

def getExecutorIds: Seq[String]

137

```

138

139

### Job Control

140

141

Monitor and control running jobs.

142

143

```scala { .api }

144

def cancelJob(jobId: Int): Unit

145

def cancelJobGroup(groupId: String): Unit

146

def cancelJobsWithTag(tag: String): Unit

147

def cancelAllJobs(): Unit

148

def setJobDescription(value: String): Unit

149

def statusTracker: SparkStatusTracker

150

```

151

152

## SparkConf Configuration

153

154

### Basic Configuration

155

156

Core application and cluster configuration methods.

157

158

```scala { .api }

159

class SparkConf(loadDefaults: Boolean = true) {

160

def set(key: String, value: String): SparkConf

161

def setAppName(name: String): SparkConf

162

def setMaster(master: String): SparkConf

163

def setJars(jars: Seq[String]): SparkConf

164

def setIfMissing(key: String, value: String): SparkConf

165

}

166

```

167

168

**Usage Example:**

169

```scala

170

val conf = new SparkConf()

171

.setAppName("Data Processing Job")

172

.setMaster("yarn")

173

.set("spark.executor.memory", "4g")

174

.set("spark.executor.cores", "2")

175

.set("spark.sql.adaptive.enabled", "true")

176

```

177

178

### Configuration Access

179

180

Retrieve configuration values and metadata.

181

182

```scala { .api }

183

def get(key: String): String

184

def get(key: String, defaultValue: String): String

185

def getOption(key: String): Option[String]

186

def getAll: Array[(String, String)]

187

def contains(key: String): Boolean

188

def getAppId: String

189

```

190

191

### Configuration Validation

192

193

Validate and clone configuration instances.

194

195

```scala { .api }

196

def clone(): SparkConf

197

def validateSettings(): SparkConf

198

def setAll(settings: Traversable[(String, String)]): SparkConf

199

def remove(key: String): SparkConf

200

```

201

202

## Types

203

204

```scala { .api }

205

// Configuration entry point

206

class SparkConf(loadDefaults: Boolean = true)

207

208

// Main application context

209

class SparkContext(config: SparkConf) extends Logging

210

211

// Status tracking

212

class SparkStatusTracker private[spark] (sc: SparkContext)

213

214

// Accumulator types

215

class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long]

216

class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double]

217

class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]]

218

```