or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

accumulators.mdapplication-context.mdbroadcast-variables.mdindex.mdjava-api.mdpartitioning.mdrdd-operations.mdserialization.mdstorage-persistence.md

application-context.mddocs/

0

# Application Context

1

2

Application setup and cluster connection management through SparkContext and SparkConf for coordinating distributed Spark applications.

3

4

## Capabilities

5

6

### SparkContext

7

8

Main entry point for Spark functionality, representing the connection to a Spark cluster. Only one SparkContext should be active per JVM.

9

10

```scala { .api }

11

/**

12

* Main entry point for Spark functionality

13

* @param config Spark configuration object

14

*/

15

class SparkContext(config: SparkConf) {

16

/** Create RDD from Scala collection */

17

def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]

18

19

/** Read text files from HDFS, local filesystem, or any Hadoop-supported URI */

20

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

21

22

/** Create RDD from Hadoop InputFormat */

23

def hadoopRDD[K, V](

24

conf: JobConf,

25

inputFormatClass: Class[_ <: InputFormat[K, V]],

26

keyClass: Class[K],

27

valueClass: Class[V],

28

minPartitions: Int = defaultMinPartitions

29

): RDD[(K, V)]

30

31

/** Create RDD from new Hadoop API InputFormat */

32

def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](

33

conf: Configuration,

34

fClass: Class[F],

35

kClass: Class[K],

36

vClass: Class[V]

37

): RDD[(K, V)]

38

39

/** Create broadcast variable */

40

def broadcast[T: ClassTag](value: T): Broadcast[T]

41

42

/** Create accumulator */

43

def longAccumulator(name: String): LongAccumulator

44

def doubleAccumulator(name: String): DoubleAccumulator

45

def collectionAccumulator[T](name: String): CollectionAccumulator[T]

46

47

/** Add file to be downloaded with this Spark job */

48

def addFile(path: String, recursive: Boolean = false): Unit

49

50

/** Add JAR file to be distributed to cluster */

51

def addJar(path: String): Unit

52

53

/** Set checkpoint directory for RDD checkpointing */

54

def setCheckpointDir(directory: String): Unit

55

56

/** Stop SparkContext and release resources */

57

def stop(): Unit

58

59

/** Get current status tracker */

60

def statusTracker: SparkStatusTracker

61

62

/** Default parallelism level */

63

def defaultParallelism: Int

64

65

/** Spark version */

66

def version: String

67

}

68

```

69

70

**Usage Examples:**

71

72

```scala

73

import org.apache.spark.{SparkContext, SparkConf}

74

75

// Basic setup

76

val conf = new SparkConf()

77

.setAppName("MyApp")

78

.setMaster("local[2]")

79

val sc = new SparkContext(conf)

80

81

// Create RDD from collection

82

val numbers = sc.parallelize(1 to 100)

83

84

// Read text file

85

val lines = sc.textFile("hdfs://data/input.txt")

86

87

// Add dependencies

88

sc.addFile("config.properties")

89

sc.addJar("lib/custom.jar")

90

91

// Cleanup

92

sc.stop()

93

```

94

95

### SparkConf

96

97

Configuration for Spark applications with key-value pairs for runtime settings.

98

99

```scala { .api }

100

/**

101

* Configuration for Spark applications

102

* @param loadDefaults whether to load default configurations

103

*/

104

class SparkConf(loadDefaults: Boolean = true) {

105

/** Set configuration property */

106

def set(key: String, value: String): SparkConf

107

108

/** Set application name */

109

def setAppName(name: String): SparkConf

110

111

/** Set master URL */

112

def setMaster(master: String): SparkConf

113

114

/** Set Spark home directory */

115

def setSparkHome(home: String): SparkConf

116

117

/** Set JAR files to distribute to cluster */

118

def setJars(jars: Seq[String]): SparkConf

119

120

/** Set executor memory */

121

def set(key: String, value: String): SparkConf

122

123

/** Get configuration value */

124

def get(key: String): String

125

def get(key: String, defaultValue: String): String

126

127

/** Get all configuration as key-value pairs */

128

def getAll: Array[(String, String)]

129

130

/** Check if configuration contains key */

131

def contains(key: String): Boolean

132

133

/** Remove configuration property */

134

def remove(key: String): SparkConf

135

136

/** Clone configuration */

137

def clone(): SparkConf

138

}

139

```

140

141

**Usage Examples:**

142

143

```scala

144

val conf = new SparkConf()

145

.setAppName("Data Processing")

146

.setMaster("yarn")

147

.set("spark.executor.memory", "2g")

148

.set("spark.executor.cores", "4")

149

.set("spark.sql.adaptive.enabled", "true")

150

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

151

152

// Advanced configuration

153

conf.set("spark.hadoop.fs.s3a.access.key", accessKey)

154

.set("spark.hadoop.fs.s3a.secret.key", secretKey)

155

.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

156

```

157

158

### SparkStatusTracker

159

160

Low-level status API for monitoring Spark applications and job execution.

161

162

```scala { .api }

163

/**

164

* Low-level status tracking API

165

*/

166

class SparkStatusTracker {

167

/** Get IDs of all known jobs */

168

def getJobIdsForGroup(jobGroup: String): Array[Int]

169

170

/** Get information for specific job */

171

def getJobInfo(jobId: Int): Option[SparkJobInfo]

172

173

/** Get information for specific stage */

174

def getStageInfo(stageId: Int): Option[SparkStageInfo]

175

176

/** Get active job IDs */

177

def getActiveJobIds(): Array[Int]

178

179

/** Get active stage IDs */

180

def getActiveStageIds(): Array[Int]

181

182

/** Get executor information */

183

def getExecutorInfos(): Array[SparkExecutorInfo]

184

}

185

186

case class SparkJobInfo(

187

jobId: Int,

188

stageIds: Array[Int],

189

status: JobExecutionStatus

190

)

191

192

case class SparkStageInfo(

193

stageId: Int,

194

currentAttemptId: Int,

195

name: String,

196

numTasks: Int,

197

numActiveTasks: Int,

198

numCompleteTasks: Int,

199

numFailedTasks: Int

200

)

201

```

202

203

### SparkFiles

204

205

Utility for resolving paths to files added through SparkContext.addFile().

206

207

```scala { .api }

208

/**

209

* Resolves paths to files added through SparkContext.addFile()

210

*/

211

object SparkFiles {

212

/** Get local path to file added with SparkContext.addFile() */

213

def get(filename: String): String

214

215

/** Get root directory containing files added with SparkContext.addFile() */

216

def getRootDirectory(): String

217

}

218

```

219

220

## Exception Handling

221

222

```scala { .api }

223

class SparkException(message: String, cause: Throwable = null)

224

extends Exception(message, cause)

225

226

class TaskNotSerializableException(taskName: String, cause: NotSerializableException)

227

extends SparkException(s"Task not serializable: $taskName", cause)

228

229

class SparkFileAlreadyExistsException(outputPath: String)

230

extends SparkException(s"Output path $outputPath already exists")

231

```

232

233

Common exceptions include serialization errors when tasks contain non-serializable closures, and file system errors when output paths already exist.