or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core.mddeployment.mdgraphx.mdindex.mdml.mdsql.mdstreaming.md

core.mddocs/

0

# Core Data Processing

1

2

Core distributed data processing using Resilient Distributed Datasets (RDDs) and the fundamental Spark execution engine. RDDs provide fault-tolerant, parallel data structures with transformations and actions for large-scale data processing.

3

4

## Capabilities

5

6

### SparkContext

7

8

Main entry point for Spark functionality, representing the connection to a Spark cluster. Used to create RDDs, broadcast variables, and accumulators.

9

10

```scala { .api }

11

/**

12

* Main entry point for Spark functionality

13

* @param config Spark configuration object

14

*/

15

class SparkContext(config: SparkConf) {

16

/** Create RDD from a collection */

17

def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]

18

/** Create RDD from text file(s) */

19

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

20

/** Create RDD from whole text files */

21

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

22

/** Create RDD from binary files */

23

def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)]

24

/** Create RDD from Hadoop file */

25

def hadoopRDD[K, V](

26

conf: JobConf,

27

inputFormatClass: Class[_ <: InputFormat[K, V]],

28

keyClass: Class[K],

29

valueClass: Class[V],

30

minPartitions: Int = defaultMinPartitions

31

): RDD[(K, V)]

32

/** Create broadcast variable */

33

def broadcast[T: ClassTag](value: T): Broadcast[T]

34

/** Create accumulator */

35

def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]

36

/** Set job group for tracking */

37

def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false): Unit

38

/** Stop SparkContext */

39

def stop(): Unit

40

}

41

```

42

43

**Usage Examples:**

44

45

```scala

46

import org.apache.spark.{SparkContext, SparkConf}

47

48

val conf = new SparkConf().setAppName("MyApp").setMaster("local[*]")

49

val sc = new SparkContext(conf)

50

51

// Create RDD from collection

52

val numbersRDD = sc.parallelize(1 to 1000, 4)

53

54

// Create RDD from text file

55

val textRDD = sc.textFile("hdfs://path/to/file.txt")

56

57

// Create broadcast variable

58

val broadcastVar = sc.broadcast(Map("key1" -> "value1", "key2" -> "value2"))

59

60

sc.stop()

61

```

62

63

**Python API:**

64

65

```python { .api }

66

class SparkContext:

67

"""

68

Main entry point for Spark functionality in Python

69

"""

70

def __init__(self, conf: SparkConf = None, master: str = None, appName: str = None)

71

def parallelize(self, c: Iterable, numSlices: int = None) -> RDD

72

def textFile(self, name: str, minPartitions: int = None, use_unicode: bool = True) -> RDD

73

def wholeTextFiles(self, path: str, minPartitions: int = None, use_unicode: bool = True) -> RDD

74

def binaryFiles(self, path: str, minPartitions: int = None) -> RDD

75

def broadcast(self, value: Any) -> Broadcast

76

def accumulator(self, value: Any, accum_param: AccumulatorParam = None) -> Accumulator

77

def setJobGroup(self, groupId: str, description: str, interruptOnCancel: bool = False) -> None

78

def stop(self) -> None

79

```

80

81

**Python Usage Examples:**

82

83

```python

84

from pyspark import SparkContext, SparkConf

85

86

conf = SparkConf().setAppName("MyApp").setMaster("local[*]")

87

sc = SparkContext(conf=conf)

88

89

# Create RDD from collection

90

numbers_rdd = sc.parallelize(range(1, 1001), 4)

91

92

# Create RDD from text file

93

text_rdd = sc.textFile("hdfs://path/to/file.txt")

94

95

# Create broadcast variable

96

broadcast_var = sc.broadcast({"key1": "value1", "key2": "value2"})

97

98

sc.stop()

99

```

100

101

### RDD[T]

102

103

Resilient Distributed Dataset - the fundamental data structure of Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.

104

105

```scala { .api }

106

/**

107

* Resilient Distributed Dataset - core abstraction for distributed collections

108

*/

109

abstract class RDD[T: ClassTag] {

110

/** Transform each element using a function */

111

def map[U: ClassTag](f: T => U): RDD[U]

112

/** Transform each element to zero or more elements */

113

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

114

/** Keep only elements matching predicate */

115

def filter(f: T => Boolean): RDD[T]

116

/** Remove duplicates */

117

def distinct(numPartitions: Int = partitions.length): RDD[T]

118

/** Random sample of elements */

119

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

120

/** Union with another RDD */

121

def union(other: RDD[T]): RDD[T]

122

/** Intersection with another RDD */

123

def intersection(other: RDD[T]): RDD[T]

124

/** Cartesian product with another RDD */

125

def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]

126

/** Group elements by key function */

127

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

128

/** Sort elements */

129

def sortBy[K](f: T => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

130

131

/** Collect all elements to driver */

132

def collect(): Array[T]

133

/** Take first n elements */

134

def take(num: Int): Array[T]

135

/** Get first element */

136

def first(): T

137

/** Count number of elements */

138

def count(): Long

139

/** Reduce elements using function */

140

def reduce(f: (T, T) => T): T

141

/** Fold elements with zero value */

142

def fold(zeroValue: T)(op: (T, T) => T): T

143

/** Aggregate with different result type */

144

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

145

146

/** Cache RDD in memory */

147

def cache(): RDD[T]

148

/** Persist with storage level */

149

def persist(newLevel: StorageLevel): RDD[T]

150

/** Remove from cache */

151

def unpersist(blocking: Boolean = false): RDD[T]

152

}

153

```

154

155

**Usage Examples:**

156

157

```scala

158

val data = sc.parallelize(1 to 100, 4)

159

160

// Transformations (lazy)

161

val doubled = data.map(_ * 2)

162

val filtered = doubled.filter(_ > 50)

163

val unique = filtered.distinct()

164

165

// Actions (trigger computation)

166

val result = unique.collect()

167

val count = unique.count()

168

val first10 = unique.take(10)

169

170

// Persist for reuse

171

unique.cache()

172

173

// Complex operations

174

val grouped = data.groupBy(_ % 10) // Group by remainder

175

val aggregated = data.aggregate(0)(_ + _, _ + _) // Sum all elements

176

```

177

178

**Python RDD API:**

179

180

```python { .api }

181

class RDD:

182

"""

183

Resilient Distributed Dataset - Python implementation

184

"""

185

def map(self, f: Callable) -> RDD

186

def flatMap(self, f: Callable) -> RDD

187

def filter(self, f: Callable) -> RDD

188

def distinct(self, numPartitions: int = None) -> RDD

189

def sample(self, withReplacement: bool, fraction: float, seed: int = None) -> RDD

190

def union(self, other: RDD) -> RDD

191

def intersection(self, other: RDD) -> RDD

192

def cartesian(self, other: RDD) -> RDD

193

def groupBy(self, f: Callable, numPartitions: int = None) -> RDD

194

def sortBy(self, keyfunc: Callable, ascending: bool = True, numPartitions: int = None) -> RDD

195

196

# Actions

197

def collect(self) -> List

198

def take(self, num: int) -> List

199

def first(self) -> Any

200

def count(self) -> int

201

def reduce(self, f: Callable) -> Any

202

def fold(self, zeroValue: Any, op: Callable) -> Any

203

def aggregate(self, zeroValue: Any, seqOp: Callable, combOp: Callable) -> Any

204

def foreach(self, f: Callable) -> None

205

def foreachPartition(self, f: Callable) -> None

206

207

# Persistence

208

def cache(self) -> RDD

209

def persist(self, storageLevel: StorageLevel = None) -> RDD

210

def unpersist(self, blocking: bool = False) -> RDD

211

```

212

213

**Python Usage Examples:**

214

215

```python

216

data = sc.parallelize(range(1, 101), 4)

217

218

# Transformations (lazy)

219

doubled = data.map(lambda x: x * 2)

220

filtered = doubled.filter(lambda x: x > 50)

221

unique = filtered.distinct()

222

223

# Actions (trigger computation)

224

result = unique.collect()

225

count = unique.count()

226

first_10 = unique.take(10)

227

228

# Persist for reuse

229

unique.cache()

230

231

# Complex operations

232

grouped = data.groupBy(lambda x: x % 10) # Group by remainder

233

aggregated = data.aggregate(0, lambda acc, x: acc + x, lambda acc1, acc2: acc1 + acc2) # Sum

234

```

235

236

### PairRDDFunctions

237

238

Additional operations available on RDDs of key-value pairs through implicit conversions.

239

240

```scala { .api }

241

/**

242

* Additional operations for RDDs of key-value pairs

243

*/

244

implicit class PairRDDFunctions[K, V](self: RDD[(K, V)]) {

245

/** Get keys only */

246

def keys: RDD[K]

247

/** Get values only */

248

def values: RDD[V]

249

/** Transform values keeping keys */

250

def mapValues[U](f: V => U): RDD[(K, U)]

251

/** Group values by key */

252

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

253

/** Reduce values by key */

254

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

255

/** Aggregate values by key */

256

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

257

/** Sort by keys */

258

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]

259

/** Inner join with another pair RDD */

260

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

261

/** Left outer join */

262

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

263

/** Right outer join */

264

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

265

/** Full outer join */

266

def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]

267

/** Cogroup with another pair RDD */

268

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

269

/** Save as text file */

270

def saveAsTextFile(path: String): Unit

271

/** Save as Hadoop sequence file */

272

def saveAsSequenceFile(path: String): Unit

273

}

274

```

275

276

**Usage Examples:**

277

278

```scala

279

val pairs = sc.parallelize(Seq(("apple", 1), ("banana", 2), ("apple", 3), ("cherry", 1)))

280

281

// Basic pair operations

282

val keys = pairs.keys.collect() // Array("apple", "banana", "apple", "cherry")

283

val values = pairs.values.collect() // Array(1, 2, 3, 1)

284

285

// Group and reduce

286

val grouped = pairs.groupByKey().collect()

287

val sums = pairs.reduceByKey(_ + _).collect() // Array(("apple", 4), ("banana", 2), ("cherry", 1))

288

289

// Joins

290

val other = sc.parallelize(Seq(("apple", "red"), ("banana", "yellow")))

291

val joined = pairs.join(other).collect()

292

```

293

294

**Python Pair RDD Operations:**

295

296

In Python, RDDs of key-value pairs (tuples) automatically have these operations available:

297

298

```python { .api }

299

# Pair RDD operations available on RDD of tuples

300

class RDD: # When containing (key, value) tuples

301

def keys(self) -> RDD

302

def values(self) -> RDD

303

def mapValues(self, f: Callable) -> RDD

304

def groupByKey(self, numPartitions: int = None) -> RDD

305

def reduceByKey(self, func: Callable, numPartitions: int = None) -> RDD

306

def aggregateByKey(self, zeroValue: Any, seqFunc: Callable, combFunc: Callable, numPartitions: int = None) -> RDD

307

def sortByKey(self, ascending: bool = True, numPartitions: int = None, keyfunc: Callable = None) -> RDD

308

def join(self, other: RDD, numPartitions: int = None) -> RDD

309

def leftOuterJoin(self, other: RDD, numPartitions: int = None) -> RDD

310

def rightOuterJoin(self, other: RDD, numPartitions: int = None) -> RDD

311

def fullOuterJoin(self, other: RDD, numPartitions: int = None) -> RDD

312

def cogroup(self, other: RDD, numPartitions: int = None) -> RDD

313

def saveAsTextFile(self, path: str) -> None

314

```

315

316

**Python Usage Examples:**

317

318

```python

319

pairs = sc.parallelize([("apple", 1), ("banana", 2), ("apple", 3), ("cherry", 1)])

320

321

# Basic pair operations

322

keys = pairs.keys().collect() # ['apple', 'banana', 'apple', 'cherry']

323

values = pairs.values().collect() # [1, 2, 3, 1]

324

325

# Group and reduce

326

grouped = pairs.groupByKey().collect()

327

sums = pairs.reduceByKey(lambda a, b: a + b).collect() # [('apple', 4), ('banana', 2), ('cherry', 1)]

328

329

# Joins

330

other = sc.parallelize([("apple", "red"), ("banana", "yellow")])

331

joined = pairs.join(other).collect()

332

```

333

334

### Configuration

335

336

Configuration management for Spark applications.

337

338

```scala { .api }

339

/**

340

* Configuration for Spark applications

341

* @param loadDefaults whether to load default configurations

342

*/

343

class SparkConf(loadDefaults: Boolean = true) {

344

/** Set configuration property */

345

def set(key: String, value: String): SparkConf

346

/** Set master URL */

347

def setMaster(master: String): SparkConf

348

/** Set application name */

349

def setAppName(name: String): SparkConf

350

/** Set JAR files */

351

def setJars(jars: Seq[String]): SparkConf

352

/** Set executor environment variable */

353

def setExecutorEnv(variable: String, value: String): SparkConf

354

/** Set Spark home directory */

355

def setSparkHome(home: String): SparkConf

356

/** Set all properties from iterable */

357

def setAll(settings: Iterable[(String, String)]): SparkConf

358

/** Get configuration value */

359

def get(key: String): String

360

/** Get configuration value with default */

361

def get(key: String, defaultValue: String): String

362

/** Get boolean configuration value */

363

def getBoolean(key: String, defaultValue: Boolean): Boolean

364

}

365

```

366

367

### Shared Variables

368

369

Variables that can be shared across cluster nodes efficiently.

370

371

```scala { .api }

372

/**

373

* Broadcast variable for efficient data sharing

374

*/

375

abstract class Broadcast[T: ClassTag] {

376

/** Access broadcast value */

377

def value: T

378

/** Remove from executors but keep in driver */

379

def unpersist(): Unit

380

/** Destroy completely */

381

def destroy(): Unit

382

}

383

384

/**

385

* Accumulator for collecting information from executors

386

*/

387

class Accumulator[T] {

388

/** Get current value (driver only) */

389

def value: T

390

/** Add to accumulator */

391

def +=(term: T): Unit

392

/** Add to accumulator (alternative syntax) */

393

def add(term: T): Unit

394

}

395

```

396

397

### Storage Levels

398

399

Storage levels for RDD persistence.

400

401

```scala { .api }

402

/**

403

* Storage levels for RDD caching and persistence

404

*/

405

object StorageLevel {

406

val NONE: StorageLevel

407

val DISK_ONLY: StorageLevel

408

val DISK_ONLY_2: StorageLevel

409

val MEMORY_ONLY: StorageLevel

410

val MEMORY_ONLY_2: StorageLevel

411

val MEMORY_ONLY_SER: StorageLevel

412

val MEMORY_ONLY_SER_2: StorageLevel

413

val MEMORY_AND_DISK: StorageLevel

414

val MEMORY_AND_DISK_2: StorageLevel

415

val MEMORY_AND_DISK_SER: StorageLevel

416

val MEMORY_AND_DISK_SER_2: StorageLevel

417

val OFF_HEAP: StorageLevel

418

}

419

```

420

421

**Usage Examples:**

422

423

```scala

424

import org.apache.spark.storage.StorageLevel

425

426

val rdd = sc.parallelize(1 to 1000000)

427

428

// Different persistence levels

429

rdd.persist(StorageLevel.MEMORY_ONLY)

430

rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

431

rdd.persist(StorageLevel.DISK_ONLY_2) // 2x replication

432

433

// Shorthand for memory-only

434

rdd.cache()

435

```

436

437

## Error Handling

438

439

Common exceptions in core Spark operations:

440

441

- `SparkException` - General Spark runtime exceptions

442

- `TaskFailedException` - Task execution failures

443

- `TaskKilledException` - Task cancellation

444

- `SparkOutOfMemoryError` - Memory-related errors