or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

caching-persistence.mdcore-rdd.mddata-sources.mdgraphx.mdindex.mdjava-api.mdkey-value-operations.mdmllib.mdpython-api.mdspark-context.mdsql.mdstreaming.md

python-api.mddocs/

0

# Python API (PySpark)

1

2

PySpark is the Python API for Apache Spark that allows Python developers to harness the power of Spark's distributed computing capabilities. It provides a Python interface to Spark's core functionality including RDDs, SQL, Streaming, MLlib and GraphX.

3

4

## Core Imports

5

6

```python { .api }

7

from pyspark import SparkContext, SparkConf

8

from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row

9

from pyspark import SparkFiles, StorageLevel

10

```

11

12

## Basic Usage

13

14

### Creating a SparkContext

15

16

```python { .api }

17

from pyspark import SparkContext, SparkConf

18

19

# Using SparkConf (recommended)

20

conf = SparkConf() \

21

.setAppName("My Python App") \

22

.setMaster("local[*]") \

23

.set("spark.executor.memory", "2g")

24

25

sc = SparkContext(conf=conf)

26

27

# Simple constructor

28

sc = SparkContext("local[*]", "My Python App")

29

30

# Remember to stop the context

31

sc.stop()

32

```

33

34

## Capabilities

35

36

### SparkContext

37

38

Main entry point for all Spark functionality.

39

40

```python { .api }

41

class SparkContext(object):

42

def __init__(self, master=None, appName=None, sparkHome=None,

43

pyFiles=None, environment=None, batchSize=1024,

44

serializer=PickleSerializer(), conf=None, gateway=None):

45

"""

46

Create a new SparkContext.

47

48

Args:

49

master: Cluster URL to connect to (e.g. local[4], spark://host:port)

50

appName: A name for your job, to display on cluster web UI

51

sparkHome: Location where Spark is installed on cluster nodes

52

pyFiles: Collection of .zip or .py files to send to cluster

53

environment: Dictionary of environment variables for worker nodes

54

batchSize: Number of Python objects represented as single Java object

55

serializer: The serializer for RDDs

56

conf: A SparkConf object setting Spark properties

57

gateway: Use existing gateway and JVM, otherwise create new JVM

58

"""

59

```

60

61

#### RDD Creation Methods

62

63

**parallelize**: Distribute a local collection to form an RDD

64

```python { .api }

65

def parallelize(self, c, numSlices=None):

66

"""

67

Distribute a local Python collection to form an RDD.

68

69

Args:

70

c: Collection to parallelize (list, tuple, etc.)

71

numSlices: Number of partitions to create (optional)

72

73

Returns:

74

RDD containing the distributed data

75

"""

76

```

77

78

```python

79

data = [1, 2, 3, 4, 5]

80

rdd = sc.parallelize(data) # Use default parallelism

81

rdd_with_partitions = sc.parallelize(data, 4) # Specify 4 partitions

82

```

83

84

**textFile**: Read text files as RDD of strings

85

```python { .api }

86

def textFile(self, name, minPartitions=None):

87

"""

88

Read a text file from HDFS or local filesystem.

89

90

Args:

91

name: Path to text file

92

minPartitions: Minimum number of partitions (optional)

93

94

Returns:

95

RDD where each element is a line from the file

96

"""

97

```

98

99

```python

100

lines = sc.textFile("hdfs://namenode:port/path/to/file.txt")

101

lines_local = sc.textFile("file:///local/path/file.txt")

102

lines_with_partitions = sc.textFile("hdfs://path/to/file.txt", 8)

103

```

104

105

**wholeTextFiles**: Read directory of text files as key-value pairs

106

```python { .api }

107

def wholeTextFiles(self, path, minPartitions=None):

108

"""

109

Read directory of text files as (filename, content) pairs.

110

111

Args:

112

path: Directory path containing text files

113

minPartitions: Minimum number of partitions (optional)

114

115

Returns:

116

RDD of (filename, content) tuples

117

"""

118

```

119

120

#### Shared Variables

121

122

**broadcast**: Create a broadcast variable for read-only data

123

```python { .api }

124

def broadcast(self, value):

125

"""

126

Broadcast a read-only variable to all nodes.

127

128

Args:

129

value: Value to broadcast

130

131

Returns:

132

Broadcast object with .value property

133

"""

134

```

135

136

```python

137

lookup_table = {"apple": 1, "banana": 2, "orange": 3}

138

broadcast_table = sc.broadcast(lookup_table)

139

140

data = sc.parallelize(["apple", "banana", "apple"])

141

mapped = data.map(lambda fruit: broadcast_table.value.get(fruit, 0))

142

```

143

144

**accumulator**: Create an accumulator for aggregating information

145

```python { .api }

146

def accumulator(self, value, accum_param=None):

147

"""

148

Create an accumulator with the given initial value.

149

150

Args:

151

value: Initial value

152

accum_param: AccumulatorParam object (optional)

153

154

Returns:

155

Accumulator object

156

"""

157

```

158

159

```python

160

counter = sc.accumulator(0)

161

162

data = sc.parallelize([1, 2, -1, 4, -5])

163

positive = data.filter(lambda x: x > 0 or counter.add(1))

164

positive.count() # Trigger action

165

print(f"Negative numbers: {counter.value}")

166

```

167

168

#### Job Control

169

170

**setJobGroup**: Assign group ID to jobs

171

```python { .api }

172

def setJobGroup(self, groupId, description, interruptOnCancel=False):

173

"""Set job group for all jobs started by this thread."""

174

```

175

176

**cancelJobGroup**: Cancel all jobs in a group

177

```python { .api }

178

def cancelJobGroup(self, groupId):

179

"""Cancel all jobs associated with a job group."""

180

```

181

182

**cancelAllJobs**: Cancel all scheduled or running jobs

183

```python { .api }

184

def cancelAllJobs(self):

185

"""Cancel all scheduled or running jobs."""

186

```

187

188

#### File Management

189

190

**addFile**: Add a file to be downloaded on every node

191

```python { .api }

192

def addFile(self, path, recursive=False):

193

"""

194

Add a file to be downloaded with this Spark job on every node.

195

196

Args:

197

path: Path to file (local or remote)

198

recursive: Whether to recursively add files in directories

199

"""

200

```

201

202

**addPyFile**: Add a Python file to be distributed

203

```python { .api }

204

def addPyFile(self, path):

205

"""

206

Add a .py or .zip file to be distributed with this Spark job.

207

208

Args:

209

path: Path to Python file or zip archive

210

"""

211

```

212

213

### RDD Operations

214

215

The RDD class provides the fundamental distributed data abstraction.

216

217

```python { .api }

218

class RDD(object):

219

"""

220

Resilient Distributed Dataset - immutable distributed collection.

221

"""

222

```

223

224

#### Transformations (Lazy)

225

226

**map**: Apply function to each element

227

```python { .api }

228

def map(self, f, preservesPartitioning=False):

229

"""

230

Apply a function to each element of the RDD.

231

232

Args:

233

f: Function to apply to each element

234

preservesPartitioning: Whether partitioning should be preserved

235

236

Returns:

237

New RDD with transformed elements

238

"""

239

```

240

241

```python

242

numbers = sc.parallelize([1, 2, 3, 4, 5])

243

squared = numbers.map(lambda x: x * x)

244

# Result: RDD containing [1, 4, 9, 16, 25]

245

```

246

247

**flatMap**: Apply function and flatten results

248

```python { .api }

249

def flatMap(self, f, preservesPartitioning=False):

250

"""

251

Apply function and flatten the results.

252

253

Args:

254

f: Function that returns iterable for each element

255

preservesPartitioning: Whether partitioning should be preserved

256

257

Returns:

258

New RDD with flattened results

259

"""

260

```

261

262

```python

263

lines = sc.parallelize(["hello world", "spark rdd"])

264

words = lines.flatMap(lambda line: line.split(" "))

265

# Result: RDD containing ["hello", "world", "spark", "rdd"]

266

```

267

268

**filter**: Keep elements matching predicate

269

```python { .api }

270

def filter(self, f):

271

"""

272

Filter elements using a predicate function.

273

274

Args:

275

f: Function that returns boolean for each element

276

277

Returns:

278

New RDD containing only matching elements

279

"""

280

```

281

282

**distinct**: Remove duplicate elements

283

```python { .api }

284

def distinct(self, numPartitions=None):

285

"""

286

Remove duplicate elements from RDD.

287

288

Args:

289

numPartitions: Number of partitions in result (optional)

290

291

Returns:

292

New RDD with duplicates removed

293

"""

294

```

295

296

**union**: Combine with another RDD

297

```python { .api }

298

def union(self, other):

299

"""

300

Return union of this RDD and another.

301

302

Args:

303

other: Another RDD of the same type

304

305

Returns:

306

New RDD containing elements from both RDDs

307

"""

308

```

309

310

**intersection**: Find common elements

311

```python { .api }

312

def intersection(self, other, numPartitions=None):

313

"""

314

Return intersection of this RDD and another.

315

316

Args:

317

other: Another RDD

318

numPartitions: Number of partitions in result (optional)

319

320

Returns:

321

New RDD containing common elements

322

"""

323

```

324

325

**sortBy**: Sort elements using key function

326

```python { .api }

327

def sortBy(self, keyfunc, ascending=True, numPartitions=None):

328

"""

329

Sort RDD by the given key function.

330

331

Args:

332

keyfunc: Function to compute sort key for each element

333

ascending: Whether to sort in ascending order

334

numPartitions: Number of partitions in result (optional)

335

336

Returns:

337

New sorted RDD

338

"""

339

```

340

341

#### Actions (Eager)

342

343

**collect**: Return all elements as list

344

```python { .api }

345

def collect(self):

346

"""

347

Return all elements of the RDD as a list.

348

WARNING: Ensure result fits in driver memory.

349

350

Returns:

351

List containing all RDD elements

352

"""

353

```

354

355

**count**: Count number of elements

356

```python { .api }

357

def count(self):

358

"""

359

Count the number of elements in the RDD.

360

361

Returns:

362

Number of elements as integer

363

"""

364

```

365

366

**first**: Return first element

367

```python { .api }

368

def first(self):

369

"""

370

Return the first element of the RDD.

371

372

Returns:

373

First element

374

375

Raises:

376

ValueError: If RDD is empty

377

"""

378

```

379

380

**take**: Return first n elements

381

```python { .api }

382

def take(self, num):

383

"""

384

Return first n elements of the RDD.

385

386

Args:

387

num: Number of elements to return

388

389

Returns:

390

List of first n elements

391

"""

392

```

393

394

**reduce**: Reduce elements using associative function

395

```python { .api }

396

def reduce(self, f):

397

"""

398

Reduce elements using associative and commutative function.

399

400

Args:

401

f: Binary function that takes two parameters of same type

402

403

Returns:

404

Single reduced value

405

"""

406

```

407

408

```python

409

numbers = sc.parallelize([1, 2, 3, 4, 5])

410

sum_result = numbers.reduce(lambda a, b: a + b)

411

# Result: 15

412

```

413

414

**foreach**: Apply function to each element (for side effects)

415

```python { .api }

416

def foreach(self, f):

417

"""

418

Apply function to each element for side effects only.

419

420

Args:

421

f: Function to apply to each element

422

"""

423

```

424

425

#### Persistence Operations

426

427

**cache**: Cache RDD in memory

428

```python { .api }

429

def cache(self):

430

"""

431

Cache this RDD in memory using default storage level.

432

433

Returns:

434

Same RDD for method chaining

435

"""

436

```

437

438

**persist**: Cache with specific storage level

439

```python { .api }

440

def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):

441

"""

442

Cache RDD with specified storage level.

443

444

Args:

445

storageLevel: Storage level from StorageLevel class

446

447

Returns:

448

Same RDD for method chaining

449

"""

450

```

451

452

```python

453

from pyspark import StorageLevel

454

455

rdd = sc.textFile("large-file.txt")

456

rdd.persist(StorageLevel.MEMORY_AND_DISK)

457

rdd.cache() # Equivalent to persist(StorageLevel.MEMORY_ONLY)

458

```

459

460

**unpersist**: Remove from cache

461

```python { .api }

462

def unpersist(self, blocking=False):

463

"""

464

Remove this RDD from cache/storage.

465

466

Args:

467

blocking: Whether to block until removal is complete

468

469

Returns:

470

Same RDD for method chaining

471

"""

472

```

473

474

#### Key-Value Operations (PairRDD Functions)

475

476

When RDD contains tuples, additional operations are available:

477

478

**reduceByKey**: Combine values by key

479

```python { .api }

480

def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):

481

"""

482

Combine values with same key using associative function.

483

484

Args:

485

func: Binary function to combine values

486

numPartitions: Number of partitions in result (optional)

487

partitionFunc: Partitioning function (optional)

488

489

Returns:

490

New RDD with combined values per key

491

"""

492

```

493

494

```python

495

pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])

496

sums = pairs.reduceByKey(lambda a, b: a + b)

497

# Result: [("a", 4), ("b", 2)]

498

```

499

500

**groupByKey**: Group values by key

501

```python { .api }

502

def groupByKey(self, numPartitions=None, partitionFunc=portable_hash):

503

"""

504

Group values with same key into iterables.

505

506

Args:

507

numPartitions: Number of partitions in result (optional)

508

partitionFunc: Partitioning function (optional)

509

510

Returns:

511

New RDD with grouped values per key

512

"""

513

```

514

515

**mapValues**: Transform values, preserve keys

516

```python { .api }

517

def mapValues(self, f):

518

"""

519

Apply function to values while preserving keys.

520

521

Args:

522

f: Function to apply to each value

523

524

Returns:

525

New RDD with transformed values

526

"""

527

```

528

529

**join**: Inner join on keys

530

```python { .api }

531

def join(self, other, numPartitions=None):

532

"""

533

Inner join with another RDD on keys.

534

535

Args:

536

other: Another pair RDD to join with

537

numPartitions: Number of partitions in result (optional)

538

539

Returns:

540

New RDD with joined key-value pairs

541

"""

542

```

543

544

### SparkConf

545

546

Configuration class for Spark applications.

547

548

```python { .api }

549

class SparkConf(object):

550

"""Configuration for a Spark application."""

551

552

def __init__(self, loadDefaults=True, _jvm=None, _jconf=None):

553

"""Create SparkConf object."""

554

555

def set(self, key, value):

556

"""Set configuration property."""

557

558

def setMaster(self, value):

559

"""Set master URL."""

560

561

def setAppName(self, value):

562

"""Set application name."""

563

564

def setSparkHome(self, value):

565

"""Set Spark installation directory."""

566

567

def setExecutorEnv(self, key=None, value=None, pairs=None):

568

"""Set environment variables for executors."""

569

570

def get(self, key, defaultValue=None):

571

"""Get configuration value."""

572

573

def getAll(self):

574

"""Get all configuration as list of (key, value) pairs."""

575

```

576

577

```python

578

conf = SparkConf() \

579

.setAppName("My Application") \

580

.setMaster("local[4]") \

581

.set("spark.executor.memory", "4g") \

582

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

583

```

584

585

### Broadcast Variables

586

587

Read-only variables distributed to all nodes.

588

589

```python { .api }

590

class Broadcast(object):

591

"""A broadcast variable created with SparkContext.broadcast()."""

592

593

@property

594

def value(self):

595

"""Get the broadcasted value."""

596

597

def unpersist(self, blocking=False):

598

"""Delete cached copies of this broadcast on executors."""

599

600

def destroy(self):

601

"""Destroy all data and metadata related to this broadcast."""

602

```

603

604

### Accumulators

605

606

Shared variables for aggregating information.

607

608

```python { .api }

609

class Accumulator(object):

610

"""Shared variable that can only be added to."""

611

612

def add(self, term):

613

"""Add a term to this accumulator."""

614

615

@property

616

def value(self):

617

"""Get the accumulator's value (only valid on driver)."""

618

```

619

620

### StorageLevel

621

622

Constants for RDD persistence levels.

623

624

```python { .api }

625

class StorageLevel(object):

626

"""Storage levels for persisting RDDs."""

627

628

DISK_ONLY = StorageLevel(True, False, False, False, 1)

629

DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)

630

MEMORY_ONLY = StorageLevel(False, True, False, False, 1)

631

MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)

632

MEMORY_ONLY_SER = StorageLevel(False, True, False, True, 1)

633

MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, True, 2)

634

MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)

635

MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)

636

MEMORY_AND_DISK_SER = StorageLevel(True, True, False, True, 1)

637

MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, True, 2)

638

```

639

640

### SparkFiles

641

642

Utility for accessing files distributed with Spark job.

643

644

```python { .api }

645

class SparkFiles(object):

646

"""Access files distributed via SparkContext.addFile()."""

647

648

@classmethod

649

def get(cls, filename):

650

"""

651

Get path to file added via SparkContext.addFile().

652

653

Args:

654

filename: Name of file to locate

655

656

Returns:

657

Absolute path to file on current node

658

"""

659

660

@classmethod

661

def getRootDirectory(cls):

662

"""

663

Get root directory for files added via addFile().

664

665

Returns:

666

Path to root directory containing distributed files

667

"""

668

```

669

670

## Usage Examples

671

672

### Word Count

673

```python

674

text_file = sc.textFile("hdfs://...")

675

counts = text_file \

676

.flatMap(lambda line: line.split(" ")) \

677

.map(lambda word: (word, 1)) \

678

.reduceByKey(lambda a, b: a + b)

679

680

counts.saveAsTextFile("hdfs://output")

681

```

682

683

### Log Analysis

684

```python

685

log_file = sc.textFile("access.log")

686

errors = log_file.filter(lambda line: "ERROR" in line)

687

error_counts = errors \

688

.map(lambda line: (extract_host(line), 1)) \

689

.reduceByKey(lambda a, b: a + b)

690

691

result = error_counts.collect()

692

```

693

694

### Using Broadcast Variables

695

```python

696

lookup_table = {"user1": "admin", "user2": "guest"}

697

broadcast_lookup = sc.broadcast(lookup_table)

698

699

user_logs = sc.textFile("user_activity.log")

700

enriched_logs = user_logs.map(lambda log: {

701

"log": log,

702

"role": broadcast_lookup.value.get(extract_user(log), "unknown")

703

})

704

```

705

706

### Caching for Performance

707

```python

708

large_dataset = sc.textFile("huge_file.txt")

709

filtered_data = large_dataset.filter(lambda line: "important" in line)

710

711

# Cache the filtered data since we'll use it multiple times

712

filtered_data.cache()

713

714

# Multiple operations on cached data

715

count = filtered_data.count()

716

sample = filtered_data.sample(False, 0.1).collect()

717

unique_words = filtered_data.flatMap(lambda line: line.split()).distinct().count()

718

```

719

720

## Error Handling

721

722

Common exceptions and error patterns in PySpark:

723

724

**Py4JJavaError**: Most common error, indicates Java exception

725

```python

726

try:

727

result = rdd.collect()

728

except Py4JJavaError as e:

729

print(f"Java exception occurred: {e}")

730

```

731

732

**SparkContext Errors**: Only one SparkContext per JVM

733

```python

734

try:

735

sc = SparkContext()

736

except ValueError as e:

737

print("SparkContext already exists")

738

```

739

740

**File Not Found**: When reading non-existent files

741

```python

742

try:

743

rdd = sc.textFile("nonexistent_file.txt")

744

rdd.count() # Error occurs on action, not creation

745

except Exception as e:

746

print(f"File access error: {e}")

747

```

748

749

The Python API provides a Pythonic interface to Spark's distributed computing capabilities while maintaining compatibility with the underlying Scala/Java implementation.