or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-context-rdds.mdindex.mdlegacy-mllib.mdmachine-learning.mdpandas-api.mdresource-management.mdsql-dataframes.mdstreaming.md

core-context-rdds.mddocs/

0

# Core Spark Context and RDDs

1

2

Low-level distributed computing functionality providing the foundational building blocks for Spark applications. This includes SparkContext for cluster coordination, RDDs for distributed data processing, broadcast variables for efficient data sharing, and accumulators for distributed counters and sums.

3

4

## Capabilities

5

6

### Spark Context

7

8

Main entry point for Spark functionality that coordinates the Spark application and manages cluster resources.

9

10

```python { .api }

11

class SparkContext:

12

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,

13

environment=None, batchSize=0, serializer=CPickleSerializer(),

14

conf=None, gateway=None, jsc=None, profiler_cls=BasicProfiler):

15

"""

16

Create a new SparkContext.

17

18

Parameters:

19

- master (str): Cluster URL to connect to (e.g. "local", "local[4]", or "spark://master:7077")

20

- appName (str): Name of the application

21

- sparkHome (str): Spark installation directory on cluster nodes

22

- pyFiles (list): Python files to send to the cluster

23

- environment (dict): Environment variables to set on worker nodes

24

- batchSize (int): Number of Python objects represented as a single Java object

25

- serializer: Serializer for RDDs

26

- conf (SparkConf): SparkConf object with configuration

27

- profiler_cls: Profiler class to use for profiling

28

"""

29

30

def parallelize(self, c, numSlices=None):

31

"""

32

Distribute a local Python collection to form an RDD.

33

34

Parameters:

35

- c: Collection to distribute (list, tuple, etc.)

36

- numSlices (int): Number of partitions to create

37

38

Returns:

39

RDD containing the elements of the collection

40

"""

41

42

def textFile(self, name, minPartitions=None, use_unicode=True):

43

"""

44

Read a text file from HDFS/local filesystem/any Hadoop-supported filesystem.

45

46

Parameters:

47

- name (str): Path to the text file

48

- minPartitions (int): Minimum number of partitions

49

- use_unicode (bool): Whether to convert to unicode

50

51

Returns:

52

RDD of strings

53

"""

54

55

def wholeTextFiles(self, path, minPartitions=None, use_unicode=True):

56

"""

57

Read text files from a directory, returning each file as a (filename, content) pair.

58

59

Parameters:

60

- path (str): Directory path

61

- minPartitions (int): Minimum number of partitions

62

- use_unicode (bool): Whether to convert to unicode

63

64

Returns:

65

RDD of (filename, content) pairs

66

"""

67

68

def broadcast(self, value):

69

"""

70

Broadcast a read-only variable to the cluster.

71

72

Parameters:

73

- value: Value to broadcast

74

75

Returns:

76

Broadcast variable

77

"""

78

79

def accumulator(self, value, accum_param=None):

80

"""

81

Create an accumulator with the given initial value.

82

83

Parameters:

84

- value: Initial value

85

- accum_param: AccumulatorParam object

86

87

Returns:

88

Accumulator

89

"""

90

91

def stop(self):

92

"""Shut down the SparkContext."""

93

94

def setCheckpointDir(self, dirName):

95

"""

96

Set the directory under which RDDs are going to be checkpointed.

97

98

Parameters:

99

- dirName (str): Checkpoint directory path

100

"""

101

102

def setLogLevel(self, logLevel):

103

"""

104

Control the global logging level.

105

106

Parameters:

107

- logLevel (str): Log level ("ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN")

108

"""

109

```

110

111

### Resilient Distributed Datasets (RDDs)

112

113

Fundamental distributed data abstraction that represents an immutable, partitioned collection of elements.

114

115

```python { .api }

116

class RDD:

117

def map(self, f):

118

"""

119

Return a new RDD by applying a function to each element.

120

121

Parameters:

122

- f: Function to apply to each element

123

124

Returns:

125

New RDD with transformed elements

126

"""

127

128

def filter(self, f):

129

"""

130

Return a new RDD containing only elements that satisfy a predicate.

131

132

Parameters:

133

- f: Predicate function

134

135

Returns:

136

Filtered RDD

137

"""

138

139

def flatMap(self, f):

140

"""

141

Return a new RDD by first applying a function and then flattening the results.

142

143

Parameters:

144

- f: Function that returns a sequence

145

146

Returns:

147

Flattened RDD

148

"""

149

150

def mapPartitions(self, f, preservesPartitioning=False):

151

"""

152

Return a new RDD by applying a function to each partition.

153

154

Parameters:

155

- f: Function to apply to each partition iterator

156

- preservesPartitioning (bool): Whether partitioning is preserved

157

158

Returns:

159

New RDD

160

"""

161

162

def reduce(self, f):

163

"""

164

Reduce the elements of the RDD using the specified commutative and associative binary operator.

165

166

Parameters:

167

- f: Binary function for reduction

168

169

Returns:

170

Single reduced value

171

"""

172

173

def fold(self, zeroValue, op):

174

"""

175

Aggregate the elements using a given associative function and a neutral "zero value".

176

177

Parameters:

178

- zeroValue: Neutral zero value

179

- op: Associative function

180

181

Returns:

182

Aggregated result

183

"""

184

185

def aggregate(self, zeroValue, seqOp, combOp):

186

"""

187

Aggregate elements using given combine functions and a neutral "zero value".

188

189

Parameters:

190

- zeroValue: Neutral zero value

191

- seqOp: Function to combine elements within partitions

192

- combOp: Function to combine results from partitions

193

194

Returns:

195

Aggregated result

196

"""

197

198

def collect(self):

199

"""

200

Return all elements of the RDD as a list.

201

202

Returns:

203

List containing all RDD elements

204

"""

205

206

def take(self, num):

207

"""

208

Take the first num elements of the RDD.

209

210

Parameters:

211

- num (int): Number of elements to take

212

213

Returns:

214

List of first num elements

215

"""

216

217

def first(self):

218

"""

219

Return the first element of the RDD.

220

221

Returns:

222

First element

223

"""

224

225

def count(self):

226

"""

227

Return the number of elements in the RDD.

228

229

Returns:

230

Number of elements

231

"""

232

233

def distinct(self, numPartitions=None):

234

"""

235

Return a new RDD containing distinct elements.

236

237

Parameters:

238

- numPartitions (int): Number of partitions in result

239

240

Returns:

241

RDD with distinct elements

242

"""

243

244

def union(self, other):

245

"""

246

Return the union of this RDD and another one.

247

248

Parameters:

249

- other (RDD): Another RDD

250

251

Returns:

252

Union RDD

253

"""

254

255

def intersection(self, other):

256

"""

257

Return the intersection of this RDD and another one.

258

259

Parameters:

260

- other (RDD): Another RDD

261

262

Returns:

263

Intersection RDD

264

"""

265

266

def groupBy(self, f, numPartitions=None):

267

"""

268

Group RDD elements by a key function.

269

270

Parameters:

271

- f: Key function

272

- numPartitions (int): Number of partitions

273

274

Returns:

275

RDD of grouped elements

276

"""

277

278

def sortBy(self, keyfunc, ascending=True, numPartitions=None):

279

"""

280

Sort the RDD by a key function.

281

282

Parameters:

283

- keyfunc: Function to compute key for sorting

284

- ascending (bool): Sort in ascending order

285

- numPartitions (int): Number of partitions

286

287

Returns:

288

Sorted RDD

289

"""

290

291

def cache(self):

292

"""

293

Persist this RDD with the default storage level (MEMORY_ONLY).

294

295

Returns:

296

This RDD

297

"""

298

299

def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):

300

"""

301

Persist this RDD with the specified storage level.

302

303

Parameters:

304

- storageLevel (StorageLevel): Storage level

305

306

Returns:

307

This RDD

308

"""

309

310

def checkpoint(self):

311

"""Mark this RDD for checkpointing."""

312

313

def getNumPartitions(self):

314

"""

315

Return the number of partitions of this RDD.

316

317

Returns:

318

Number of partitions

319

"""

320

321

def coalesce(self, numPartitions, shuffle=False):

322

"""

323

Return a new RDD with reduced number of partitions.

324

325

Parameters:

326

- numPartitions (int): Target number of partitions

327

- shuffle (bool): Whether to shuffle data

328

329

Returns:

330

Coalesced RDD

331

"""

332

333

def repartition(self, numPartitions):

334

"""

335

Return a new RDD with exactly numPartitions partitions.

336

337

Parameters:

338

- numPartitions (int): Number of partitions

339

340

Returns:

341

Repartitioned RDD

342

"""

343

```

344

345

### Paired RDD Operations

346

347

Operations available on RDDs of key-value pairs.

348

349

```python { .api }

350

class RDD:

351

def groupByKey(self, numPartitions=None):

352

"""

353

Group values for each key in the RDD into a single sequence.

354

355

Parameters:

356

- numPartitions (int): Number of partitions

357

358

Returns:

359

RDD of (key, iterable of values) pairs

360

"""

361

362

def reduceByKey(self, func, numPartitions=None):

363

"""

364

Merge values for each key using an associative reduce function.

365

366

Parameters:

367

- func: Associative reduce function

368

- numPartitions (int): Number of partitions

369

370

Returns:

371

RDD of (key, reduced value) pairs

372

"""

373

374

def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None):

375

"""

376

Aggregate values for each key using given combine functions.

377

378

Parameters:

379

- zeroValue: Initial value for each key

380

- seqFunc: Function to combine values within partitions

381

- combFunc: Function to combine results across partitions

382

- numPartitions (int): Number of partitions

383

384

Returns:

385

RDD of (key, aggregated value) pairs

386

"""

387

388

def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):

389

"""

390

Sort RDD by keys.

391

392

Parameters:

393

- ascending (bool): Sort in ascending order

394

- numPartitions (int): Number of partitions

395

- keyfunc: Function to compute sort key

396

397

Returns:

398

Sorted RDD

399

"""

400

401

def join(self, other, numPartitions=None):

402

"""

403

Return an RDD containing all pairs of elements with matching keys.

404

405

Parameters:

406

- other (RDD): Another RDD to join with

407

- numPartitions (int): Number of partitions

408

409

Returns:

410

RDD of (key, (value1, value2)) pairs

411

"""

412

413

def leftOuterJoin(self, other, numPartitions=None):

414

"""

415

Perform a left outer join of this RDD and another one.

416

417

Parameters:

418

- other (RDD): Another RDD to join with

419

- numPartitions (int): Number of partitions

420

421

Returns:

422

RDD of (key, (value1, Optional[value2])) pairs

423

"""

424

425

def rightOuterJoin(self, other, numPartitions=None):

426

"""

427

Perform a right outer join of this RDD and another one.

428

429

Parameters:

430

- other (RDD): Another RDD to join with

431

- numPartitions (int): Number of partitions

432

433

Returns:

434

RDD of (key, (Optional[value1], value2)) pairs

435

"""

436

437

def fullOuterJoin(self, other, numPartitions=None):

438

"""

439

Perform a full outer join of this RDD and another one.

440

441

Parameters:

442

- other (RDD): Another RDD to join with

443

- numPartitions (int): Number of partitions

444

445

Returns:

446

RDD of (key, (Optional[value1], Optional[value2])) pairs

447

"""

448

```

449

450

### Broadcast Variables

451

452

Read-only variables cached on each machine for efficient data sharing.

453

454

```python { .api }

455

class Broadcast:

456

def value(self):

457

"""

458

Return the broadcasted value.

459

460

Returns:

461

The broadcasted value

462

"""

463

464

def destroy(self):

465

"""Destroy all data and metadata related to this broadcast variable."""

466

467

def unpersist(self, blocking=False):

468

"""

469

Delete cached copies of this broadcast on the executors.

470

471

Parameters:

472

- blocking (bool): Whether to block until unpersisting is complete

473

"""

474

```

475

476

### Accumulators

477

478

Shared variables that can be accumulated across tasks.

479

480

```python { .api }

481

class Accumulator:

482

def add(self, term):

483

"""

484

Add a term to this accumulator.

485

486

Parameters:

487

- term: Value to add

488

"""

489

490

def value(self):

491

"""

492

Get the accumulator's value.

493

494

Returns:

495

Current accumulator value

496

"""

497

498

class AccumulatorParam:

499

def zero(self, value):

500

"""

501

Provide a "zero value" for the accumulator type.

502

503

Parameters:

504

- value: Sample value

505

506

Returns:

507

Zero value

508

"""

509

510

def addInPlace(self, value1, value2):

511

"""

512

Add two values of the accumulator's data type.

513

514

Parameters:

515

- value1: First value

516

- value2: Second value

517

518

Returns:

519

Sum of the values

520

"""

521

```

522

523

### Spark Configuration

524

525

Configuration settings for Spark applications.

526

527

```python { .api }

528

class SparkConf:

529

def __init__(self, loadDefaults=True, _jvm=None, _jconf=None):

530

"""

531

Create a new Spark configuration.

532

533

Parameters:

534

- loadDefaults (bool): Whether to load default values

535

"""

536

537

def setAppName(self, value):

538

"""

539

Set application name.

540

541

Parameters:

542

- value (str): Application name

543

544

Returns:

545

This SparkConf object

546

"""

547

548

def setMaster(self, value):

549

"""

550

Set master URL.

551

552

Parameters:

553

- value (str): Master URL

554

555

Returns:

556

This SparkConf object

557

"""

558

559

def set(self, key, value):

560

"""

561

Set a configuration property.

562

563

Parameters:

564

- key (str): Configuration key

565

- value (str): Configuration value

566

567

Returns:

568

This SparkConf object

569

"""

570

571

def get(self, key, defaultValue=None):

572

"""

573

Get a configuration value.

574

575

Parameters:

576

- key (str): Configuration key

577

- defaultValue (str): Default value if key not found

578

579

Returns:

580

Configuration value

581

"""

582

583

def setSparkHome(self, value):

584

"""

585

Set Spark installation path.

586

587

Parameters:

588

- value (str): Spark home directory

589

590

Returns:

591

This SparkConf object

592

"""

593

594

def setExecutorEnv(self, key=None, value=None, pairs=None):

595

"""

596

Set environment variables for executor processes.

597

598

Parameters:

599

- key (str): Environment variable name

600

- value (str): Environment variable value

601

- pairs (list): List of (key, value) pairs

602

603

Returns:

604

This SparkConf object

605

"""

606

```

607

608

## Types

609

610

```python { .api }

611

class StorageLevel:

612

"""Storage levels for RDD persistence."""

613

DISK_ONLY: StorageLevel

614

DISK_ONLY_2: StorageLevel

615

MEMORY_ONLY: StorageLevel

616

MEMORY_ONLY_2: StorageLevel

617

MEMORY_ONLY_SER: StorageLevel

618

MEMORY_ONLY_SER_2: StorageLevel

619

MEMORY_AND_DISK: StorageLevel

620

MEMORY_AND_DISK_2: StorageLevel

621

MEMORY_AND_DISK_SER: StorageLevel

622

MEMORY_AND_DISK_SER_2: StorageLevel

623

OFF_HEAP: StorageLevel

624

625

class TaskContext:

626

"""Information about the task currently being executed."""

627

628

def attemptNumber(self):

629

"""How many times this task has been attempted."""

630

631

def partitionId(self):

632

"""The ID of the RDD partition that is computed by this task."""

633

634

def stageId(self):

635

"""The ID of the stage that this task belong to."""

636

637

def taskAttemptId(self):

638

"""An ID that is unique to this task attempt."""

639

640

class StatusTracker:

641

"""Low-level status reporting APIs for monitoring job and stage progress."""

642

643

def getJobIdsForGroup(self, jobGroup):

644

"""Return a list of all known jobs in a particular job group."""

645

646

def getActiveStageIds(self):

647

"""Returns an array containing the ids of all active stages."""

648

649

def getExecutorInfos(self):

650

"""Returns information about all known executors."""

651

```