or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-launcher.mdcore-engine.mdgraph-processing.mdindex.mdmachine-learning.mdsql-dataframes.mdstream-processing.md

core-engine.mddocs/

0

# Core Engine APIs

1

2

The Spark Core engine provides the fundamental distributed computing capabilities through Resilient Distributed Datasets (RDDs) and the SparkContext. This is the foundation upon which all other Spark components are built.

3

4

## SparkContext

5

6

The SparkContext is the main entry point for Spark functionality and represents the connection to a Spark cluster.

7

8

```scala { .api }

9

class SparkContext(config: SparkConf) {

10

// Core data creation

11

def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]

12

def makeRDD[T: ClassTag](seq: Seq[T]): RDD[T]

13

def range(end: Long): RDD[Long]

14

def range(start: Long, end: Long): RDD[Long]

15

def range(start: Long, end: Long, step: Long): RDD[Long]

16

def range(start: Long, end: Long, step: Long, numPartitions: Int): RDD[Long]

17

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

18

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

19

def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]

20

21

// Hadoop integration

22

def hadoopFile[K, V](path: String, inputFormatClass: Class[InputFormat[K, V]],

23

keyClass: Class[K], valueClass: Class[V],

24

minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

25

def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V],

26

minPartitions: Int = defaultMinPartitions): RDD[(K, V)]

27

28

// Shared variables

29

def broadcast[T: ClassTag](value: T): Broadcast[T]

30

def longAccumulator(): LongAccumulator

31

def longAccumulator(name: String): LongAccumulator

32

def doubleAccumulator(): DoubleAccumulator

33

def doubleAccumulator(name: String): DoubleAccumulator

34

def collectionAccumulator[T](): CollectionAccumulator[T]

35

def collectionAccumulator[T](name: String): CollectionAccumulator[T]

36

37

// Application management

38

def stop(): Unit

39

def setLogLevel(logLevel: String): Unit

40

def setCheckpointDir(directory: String): Unit

41

def addFile(path: String): Unit

42

def addJar(path: String): Unit

43

44

// Properties

45

def master: String

46

def appName: String

47

def version: String

48

def defaultParallelism: Int

49

def defaultMinPartitions: Int

50

def startTime: Long

51

}

52

```

53

54

### Usage Examples

55

56

```scala

57

import org.apache.spark.{SparkContext, SparkConf}

58

59

// Create SparkContext

60

val conf = new SparkConf()

61

.setAppName("MyApp")

62

.setMaster("local[*]")

63

val sc = new SparkContext(conf)

64

65

// Create RDD from collection

66

val data = sc.parallelize(Seq(1, 2, 3, 4, 5))

67

68

// Create RDD from range

69

val numbers = sc.range(1, 1000000, 2) // start=1, end=1000000, step=2

70

71

// Read from files

72

val textRDD = sc.textFile("hdfs://path/to/file.txt")

73

val wholeFiles = sc.wholeTextFiles("hdfs://path/to/directory")

74

val binaryFiles = sc.binaryFiles("hdfs://path/to/binary/files")

75

76

// Create broadcast variable

77

val broadcastVar = sc.broadcast(Map("key1" -> "value1", "key2" -> "value2"))

78

79

// Create accumulator

80

val counter = sc.longAccumulator("Counter")

81

82

sc.stop()

83

```

84

85

## SparkConf

86

87

Configuration object for Spark applications.

88

89

```scala { .api }

90

class SparkConf(loadDefaults: Boolean = true) {

91

// Configuration methods

92

def set(key: String, value: String): SparkConf

93

def setMaster(master: String): SparkConf

94

def setAppName(name: String): SparkConf

95

def setJars(jars: Seq[String]): SparkConf

96

def setExecutorEnv(variable: String, value: String): SparkConf

97

def setExecutorEnv(variables: Seq[(String, String)]): SparkConf

98

def setSparkHome(home: String): SparkConf

99

100

// Retrieval methods

101

def get(key: String): String

102

def get(key: String, defaultValue: String): String

103

def getOption(key: String): Option[String]

104

def getAll: Array[(String, String)]

105

def contains(key: String): Boolean

106

def remove(key: String): SparkConf

107

108

// Utility methods

109

def clone(): SparkConf

110

def toDebugString: String

111

}

112

```

113

114

### Usage Examples

115

116

```scala

117

import org.apache.spark.SparkConf

118

119

val conf = new SparkConf()

120

.setAppName("MyApplication")

121

.setMaster("yarn")

122

.set("spark.executor.memory", "2g")

123

.set("spark.executor.cores", "2")

124

.set("spark.sql.adaptive.enabled", "true")

125

.setExecutorEnv("JAVA_HOME", "/usr/lib/jvm/java-8-openjdk")

126

127

// Get configuration values

128

val appName = conf.get("spark.app.name")

129

val executorMemory = conf.getOption("spark.executor.memory")

130

```

131

132

## RDD (Resilient Distributed Dataset)

133

134

The fundamental data structure in Spark representing an immutable, partitioned collection of elements.

135

136

```scala { .api }

137

abstract class RDD[T: ClassTag](

138

@transient private var _sc: SparkContext,

139

@transient private var deps: Seq[Dependency[_]]) {

140

141

// Transformations (lazy operations)

142

def map[U: ClassTag](f: T => U): RDD[U]

143

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

144

def filter(f: T => Boolean): RDD[T]

145

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],

146

preservePartitioning: Boolean = false): RDD[U]

147

def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],

148

preservePartitioning: Boolean = false): RDD[U]

149

150

// Set operations

151

def distinct(numPartitions: Int = partitions.length): RDD[T]

152

def union(other: RDD[T]): RDD[T]

153

def intersection(other: RDD[T]): RDD[T]

154

def subtract(other: RDD[T]): RDD[T]

155

def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]

156

157

// Sampling and partitioning

158

def sample(withReplacement: Boolean, fraction: Double,

159

seed: Long = Utils.random.nextLong): RDD[T]

160

def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]

161

def repartition(numPartitions: Int): RDD[T]

162

def sortBy[K](f: T => K, ascending: Boolean = true,

163

numPartitions: Int = this.partitions.length)

164

(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

165

166

// Utility transformations

167

def pipe(command: String): RDD[String]

168

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

169

def zipWithIndex(): RDD[(T, Long)]

170

def zipWithUniqueId(): RDD[(T, Long)]

171

def glom(): RDD[Array[T]]

172

def keyBy[K](f: T => K): RDD[(K, T)]

173

174

// Actions (trigger computation)

175

def collect(): Array[T]

176

def count(): Long

177

def first(): T

178

def take(num: Int): Array[T]

179

def takeSample(withReplacement: Boolean, num: Int,

180

seed: Long = Utils.random.nextLong): Array[T]

181

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

182

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

183

184

// Aggregation operations

185

def reduce(f: (T, T) => T): T

186

def fold(zeroValue: T)(op: (T, T) => T): T

187

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

188

def treeReduce(f: (T, T) => T, depth: Int = 2): T

189

def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U,

190

depth: Int = 2): U

191

192

// Statistical operations

193

def countByValue()(implicit ord: Ordering[T]): Map[T, Long]

194

def max()(implicit ord: Ordering[T]): T

195

def min()(implicit ord: Ordering[T]): T

196

197

// Side effects

198

def foreach(f: T => Unit): Unit

199

def foreachPartition(f: Iterator[T] => Unit): Unit

200

201

// Persistence

202

def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): this.type

203

def cache(): this.type

204

def unpersist(blocking: Boolean = false): this.type

205

def checkpoint(): Unit

206

def localCheckpoint(): this.type

207

208

// Storage and output

209

def saveAsTextFile(path: String): Unit

210

def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit

211

def saveAsObjectFile(path: String): Unit

212

213

// Metadata

214

def getNumPartitions: Int

215

def partitions: Array[Partition]

216

def getStorageLevel: StorageLevel

217

def setName(name: String): this.type

218

def name: String

219

def id: Int

220

def context: SparkContext

221

def sparkContext: SparkContext

222

}

223

```

224

225

### Pair RDD Operations

226

227

For RDDs containing key-value pairs, additional operations are available:

228

229

```scala { .api }

230

// Available on RDD[(K, V)] through implicit conversions

231

class PairRDDFunctions[K, V](self: RDD[(K, V)]) {

232

// Aggregation by key

233

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

234

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

235

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

236

def groupByKey(): RDD[(K, Iterable[V])]

237

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

238

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

239

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,

240

mergeCombiners: (C, C) => C): RDD[(K, C)]

241

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

242

243

// Joins

244

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

245

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

246

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

247

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

248

def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]

249

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

250

def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]

251

252

// Sorting and partitioning

253

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]

254

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

255

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]

256

257

// Output operations

258

def countByKey(): Map[K, Long]

259

def collectAsMap(): Map[K, V]

260

def lookup(key: K): Seq[V]

261

def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String,

262

keyClass: Class[_], valueClass: Class[_],

263

outputFormatClass: Class[F]): Unit

264

}

265

```

266

267

### Usage Examples

268

269

```scala

270

import org.apache.spark.{SparkContext, SparkConf}

271

272

val sc = new SparkContext(new SparkConf().setAppName("RDD Example").setMaster("local[*]"))

273

274

// Basic transformations

275

val numbers = sc.parallelize(1 to 10)

276

val squared = numbers.map(x => x * x)

277

val evens = numbers.filter(_ % 2 == 0)

278

val words = sc.textFile("file.txt").flatMap(_.split(" "))

279

280

// Pair RDD operations

281

val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3), ("b", 4)))

282

val grouped = pairs.groupByKey()

283

val reduced = pairs.reduceByKey(_ + _)

284

val joined = pairs.join(sc.parallelize(Seq(("a", "apple"), ("b", "banana"))))

285

286

// Actions

287

val result = squared.collect()

288

val count = words.count()

289

val wordCounts = words.map((_, 1)).reduceByKey(_ + _).collectAsMap()

290

291

// Persistence

292

val cachedRDD = numbers.cache()

293

val persistedRDD = numbers.persist(StorageLevel.MEMORY_AND_DISK)

294

295

sc.stop()

296

```

297

298

## Broadcast Variables

299

300

Efficiently distribute large read-only data to all nodes.

301

302

```scala { .api }

303

abstract class Broadcast[T](val id: Long) {

304

def value: T

305

def unpersist(): Unit

306

def unpersist(blocking: Boolean): Unit

307

def destroy(): Unit

308

def toString: String

309

}

310

```

311

312

### Usage Examples

313

314

```scala

315

val broadcastMap = sc.broadcast(Map("key1" -> "value1", "key2" -> "value2"))

316

317

val rdd = sc.parallelize(Seq("key1", "key2", "key3"))

318

val result = rdd.map(key => broadcastMap.value.getOrElse(key, "unknown"))

319

320

// Clean up when done

321

broadcastMap.destroy()

322

```

323

324

## Accumulators

325

326

Variables that can be added to from executors and read from the driver.

327

328

```scala { .api }

329

abstract class AccumulatorV2[IN, OUT] {

330

def isZero: Boolean

331

def copy(): AccumulatorV2[IN, OUT]

332

def reset(): Unit

333

def add(v: IN): Unit

334

def merge(other: AccumulatorV2[IN, OUT]): Unit

335

def value: OUT

336

}

337

338

class LongAccumulator extends AccumulatorV2[java.lang.Long, java.lang.Long] {

339

def add(v: Long): Unit

340

def add(v: java.lang.Long): Unit

341

def count: Long

342

def sum: Long

343

def avg: Double

344

}

345

346

class DoubleAccumulator extends AccumulatorV2[java.lang.Double, java.lang.Double] {

347

def add(v: Double): Unit

348

def add(v: java.lang.Double): Unit

349

def count: Long

350

def sum: Double

351

def avg: Double

352

}

353

354

class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {

355

def add(v: T): Unit

356

def value: java.util.List[T]

357

}

358

```

359

360

### Usage Examples

361

362

```scala

363

val counter = sc.longAccumulator("Counter")

364

val doubleAcc = sc.doubleAccumulator("Double Accumulator")

365

val listAcc = sc.collectionAccumulator[String]("List Accumulator")

366

367

val rdd = sc.parallelize(1 to 100)

368

rdd.foreach { x =>

369

counter.add(1)

370

doubleAcc.add(x.toDouble)

371

if (x % 10 == 0) listAcc.add(s"Multiple of 10: $x")

372

}

373

374

println(s"Count: ${counter.value}")

375

println(s"Sum: ${doubleAcc.sum}")

376

println(s"Collected items: ${listAcc.value}")

377

```