Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
—
PySpark is the Python API for Apache Spark that allows Python developers to harness the power of Spark's distributed computing capabilities. It provides a Python interface to Spark's core functionality including RDDs, SQL, Streaming, MLlib and GraphX.
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row
from pyspark import SparkFiles, StorageLevelfrom pyspark import SparkContext, SparkConf
# Using SparkConf (recommended)
conf = SparkConf() \
.setAppName("My Python App") \
.setMaster("local[*]") \
.set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)
# Simple constructor
sc = SparkContext("local[*]", "My Python App")
# Remember to stop the context
sc.stop()Main entry point for all Spark functionality.
class SparkContext(object):
def __init__(self, master=None, appName=None, sparkHome=None,
pyFiles=None, environment=None, batchSize=1024,
serializer=PickleSerializer(), conf=None, gateway=None):
"""
Create a new SparkContext.
Args:
master: Cluster URL to connect to (e.g. local[4], spark://host:port)
appName: A name for your job, to display on cluster web UI
sparkHome: Location where Spark is installed on cluster nodes
pyFiles: Collection of .zip or .py files to send to cluster
environment: Dictionary of environment variables for worker nodes
batchSize: Number of Python objects represented as single Java object
serializer: The serializer for RDDs
conf: A SparkConf object setting Spark properties
gateway: Use existing gateway and JVM, otherwise create new JVM
"""parallelize: Distribute a local collection to form an RDD
def parallelize(self, c, numSlices=None):
"""
Distribute a local Python collection to form an RDD.
Args:
c: Collection to parallelize (list, tuple, etc.)
numSlices: Number of partitions to create (optional)
Returns:
RDD containing the distributed data
"""data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data) # Use default parallelism
rdd_with_partitions = sc.parallelize(data, 4) # Specify 4 partitionstextFile: Read text files as RDD of strings
def textFile(self, name, minPartitions=None):
"""
Read a text file from HDFS or local filesystem.
Args:
name: Path to text file
minPartitions: Minimum number of partitions (optional)
Returns:
RDD where each element is a line from the file
"""lines = sc.textFile("hdfs://namenode:port/path/to/file.txt")
lines_local = sc.textFile("file:///local/path/file.txt")
lines_with_partitions = sc.textFile("hdfs://path/to/file.txt", 8)wholeTextFiles: Read directory of text files as key-value pairs
def wholeTextFiles(self, path, minPartitions=None):
"""
Read directory of text files as (filename, content) pairs.
Args:
path: Directory path containing text files
minPartitions: Minimum number of partitions (optional)
Returns:
RDD of (filename, content) tuples
"""broadcast: Create a broadcast variable for read-only data
def broadcast(self, value):
"""
Broadcast a read-only variable to all nodes.
Args:
value: Value to broadcast
Returns:
Broadcast object with .value property
"""lookup_table = {"apple": 1, "banana": 2, "orange": 3}
broadcast_table = sc.broadcast(lookup_table)
data = sc.parallelize(["apple", "banana", "apple"])
mapped = data.map(lambda fruit: broadcast_table.value.get(fruit, 0))accumulator: Create an accumulator for aggregating information
def accumulator(self, value, accum_param=None):
"""
Create an accumulator with the given initial value.
Args:
value: Initial value
accum_param: AccumulatorParam object (optional)
Returns:
Accumulator object
"""counter = sc.accumulator(0)
data = sc.parallelize([1, 2, -1, 4, -5])
positive = data.filter(lambda x: x > 0 or counter.add(1))
positive.count() # Trigger action
print(f"Negative numbers: {counter.value}")setJobGroup: Assign group ID to jobs
def setJobGroup(self, groupId, description, interruptOnCancel=False):
"""Set job group for all jobs started by this thread."""cancelJobGroup: Cancel all jobs in a group
def cancelJobGroup(self, groupId):
"""Cancel all jobs associated with a job group."""cancelAllJobs: Cancel all scheduled or running jobs
def cancelAllJobs(self):
"""Cancel all scheduled or running jobs."""addFile: Add a file to be downloaded on every node
def addFile(self, path, recursive=False):
"""
Add a file to be downloaded with this Spark job on every node.
Args:
path: Path to file (local or remote)
recursive: Whether to recursively add files in directories
"""addPyFile: Add a Python file to be distributed
def addPyFile(self, path):
"""
Add a .py or .zip file to be distributed with this Spark job.
Args:
path: Path to Python file or zip archive
"""The RDD class provides the fundamental distributed data abstraction.
class RDD(object):
"""
Resilient Distributed Dataset - immutable distributed collection.
"""map: Apply function to each element
def map(self, f, preservesPartitioning=False):
"""
Apply a function to each element of the RDD.
Args:
f: Function to apply to each element
preservesPartitioning: Whether partitioning should be preserved
Returns:
New RDD with transformed elements
"""numbers = sc.parallelize([1, 2, 3, 4, 5])
squared = numbers.map(lambda x: x * x)
# Result: RDD containing [1, 4, 9, 16, 25]flatMap: Apply function and flatten results
def flatMap(self, f, preservesPartitioning=False):
"""
Apply function and flatten the results.
Args:
f: Function that returns iterable for each element
preservesPartitioning: Whether partitioning should be preserved
Returns:
New RDD with flattened results
"""lines = sc.parallelize(["hello world", "spark rdd"])
words = lines.flatMap(lambda line: line.split(" "))
# Result: RDD containing ["hello", "world", "spark", "rdd"]filter: Keep elements matching predicate
def filter(self, f):
"""
Filter elements using a predicate function.
Args:
f: Function that returns boolean for each element
Returns:
New RDD containing only matching elements
"""distinct: Remove duplicate elements
def distinct(self, numPartitions=None):
"""
Remove duplicate elements from RDD.
Args:
numPartitions: Number of partitions in result (optional)
Returns:
New RDD with duplicates removed
"""union: Combine with another RDD
def union(self, other):
"""
Return union of this RDD and another.
Args:
other: Another RDD of the same type
Returns:
New RDD containing elements from both RDDs
"""intersection: Find common elements
def intersection(self, other, numPartitions=None):
"""
Return intersection of this RDD and another.
Args:
other: Another RDD
numPartitions: Number of partitions in result (optional)
Returns:
New RDD containing common elements
"""sortBy: Sort elements using key function
def sortBy(self, keyfunc, ascending=True, numPartitions=None):
"""
Sort RDD by the given key function.
Args:
keyfunc: Function to compute sort key for each element
ascending: Whether to sort in ascending order
numPartitions: Number of partitions in result (optional)
Returns:
New sorted RDD
"""collect: Return all elements as list
def collect(self):
"""
Return all elements of the RDD as a list.
WARNING: Ensure result fits in driver memory.
Returns:
List containing all RDD elements
"""count: Count number of elements
def count(self):
"""
Count the number of elements in the RDD.
Returns:
Number of elements as integer
"""first: Return first element
def first(self):
"""
Return the first element of the RDD.
Returns:
First element
Raises:
ValueError: If RDD is empty
"""take: Return first n elements
def take(self, num):
"""
Return first n elements of the RDD.
Args:
num: Number of elements to return
Returns:
List of first n elements
"""reduce: Reduce elements using associative function
def reduce(self, f):
"""
Reduce elements using associative and commutative function.
Args:
f: Binary function that takes two parameters of same type
Returns:
Single reduced value
"""numbers = sc.parallelize([1, 2, 3, 4, 5])
sum_result = numbers.reduce(lambda a, b: a + b)
# Result: 15foreach: Apply function to each element (for side effects)
def foreach(self, f):
"""
Apply function to each element for side effects only.
Args:
f: Function to apply to each element
"""cache: Cache RDD in memory
def cache(self):
"""
Cache this RDD in memory using default storage level.
Returns:
Same RDD for method chaining
"""persist: Cache with specific storage level
def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
"""
Cache RDD with specified storage level.
Args:
storageLevel: Storage level from StorageLevel class
Returns:
Same RDD for method chaining
"""from pyspark import StorageLevel
rdd = sc.textFile("large-file.txt")
rdd.persist(StorageLevel.MEMORY_AND_DISK)
rdd.cache() # Equivalent to persist(StorageLevel.MEMORY_ONLY)unpersist: Remove from cache
def unpersist(self, blocking=False):
"""
Remove this RDD from cache/storage.
Args:
blocking: Whether to block until removal is complete
Returns:
Same RDD for method chaining
"""When RDD contains tuples, additional operations are available:
reduceByKey: Combine values by key
def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):
"""
Combine values with same key using associative function.
Args:
func: Binary function to combine values
numPartitions: Number of partitions in result (optional)
partitionFunc: Partitioning function (optional)
Returns:
New RDD with combined values per key
"""pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
sums = pairs.reduceByKey(lambda a, b: a + b)
# Result: [("a", 4), ("b", 2)]groupByKey: Group values by key
def groupByKey(self, numPartitions=None, partitionFunc=portable_hash):
"""
Group values with same key into iterables.
Args:
numPartitions: Number of partitions in result (optional)
partitionFunc: Partitioning function (optional)
Returns:
New RDD with grouped values per key
"""mapValues: Transform values, preserve keys
def mapValues(self, f):
"""
Apply function to values while preserving keys.
Args:
f: Function to apply to each value
Returns:
New RDD with transformed values
"""join: Inner join on keys
def join(self, other, numPartitions=None):
"""
Inner join with another RDD on keys.
Args:
other: Another pair RDD to join with
numPartitions: Number of partitions in result (optional)
Returns:
New RDD with joined key-value pairs
"""Configuration class for Spark applications.
class SparkConf(object):
"""Configuration for a Spark application."""
def __init__(self, loadDefaults=True, _jvm=None, _jconf=None):
"""Create SparkConf object."""
def set(self, key, value):
"""Set configuration property."""
def setMaster(self, value):
"""Set master URL."""
def setAppName(self, value):
"""Set application name."""
def setSparkHome(self, value):
"""Set Spark installation directory."""
def setExecutorEnv(self, key=None, value=None, pairs=None):
"""Set environment variables for executors."""
def get(self, key, defaultValue=None):
"""Get configuration value."""
def getAll(self):
"""Get all configuration as list of (key, value) pairs."""conf = SparkConf() \
.setAppName("My Application") \
.setMaster("local[4]") \
.set("spark.executor.memory", "4g") \
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")Read-only variables distributed to all nodes.
class Broadcast(object):
"""A broadcast variable created with SparkContext.broadcast()."""
@property
def value(self):
"""Get the broadcasted value."""
def unpersist(self, blocking=False):
"""Delete cached copies of this broadcast on executors."""
def destroy(self):
"""Destroy all data and metadata related to this broadcast."""Shared variables for aggregating information.
class Accumulator(object):
"""Shared variable that can only be added to."""
def add(self, term):
"""Add a term to this accumulator."""
@property
def value(self):
"""Get the accumulator's value (only valid on driver)."""Constants for RDD persistence levels.
class StorageLevel(object):
"""Storage levels for persisting RDDs."""
DISK_ONLY = StorageLevel(True, False, False, False, 1)
DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
MEMORY_ONLY = StorageLevel(False, True, False, False, 1)
MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
MEMORY_ONLY_SER = StorageLevel(False, True, False, True, 1)
MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, True, 2)
MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)
MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
MEMORY_AND_DISK_SER = StorageLevel(True, True, False, True, 1)
MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, True, 2)Utility for accessing files distributed with Spark job.
class SparkFiles(object):
"""Access files distributed via SparkContext.addFile()."""
@classmethod
def get(cls, filename):
"""
Get path to file added via SparkContext.addFile().
Args:
filename: Name of file to locate
Returns:
Absolute path to file on current node
"""
@classmethod
def getRootDirectory(cls):
"""
Get root directory for files added via addFile().
Returns:
Path to root directory containing distributed files
"""text_file = sc.textFile("hdfs://...")
counts = text_file \
.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://output")log_file = sc.textFile("access.log")
errors = log_file.filter(lambda line: "ERROR" in line)
error_counts = errors \
.map(lambda line: (extract_host(line), 1)) \
.reduceByKey(lambda a, b: a + b)
result = error_counts.collect()lookup_table = {"user1": "admin", "user2": "guest"}
broadcast_lookup = sc.broadcast(lookup_table)
user_logs = sc.textFile("user_activity.log")
enriched_logs = user_logs.map(lambda log: {
"log": log,
"role": broadcast_lookup.value.get(extract_user(log), "unknown")
})large_dataset = sc.textFile("huge_file.txt")
filtered_data = large_dataset.filter(lambda line: "important" in line)
# Cache the filtered data since we'll use it multiple times
filtered_data.cache()
# Multiple operations on cached data
count = filtered_data.count()
sample = filtered_data.sample(False, 0.1).collect()
unique_words = filtered_data.flatMap(lambda line: line.split()).distinct().count()Common exceptions and error patterns in PySpark:
Py4JJavaError: Most common error, indicates Java exception
try:
result = rdd.collect()
except Py4JJavaError as e:
print(f"Java exception occurred: {e}")SparkContext Errors: Only one SparkContext per JVM
try:
sc = SparkContext()
except ValueError as e:
print("SparkContext already exists")File Not Found: When reading non-existent files
try:
rdd = sc.textFile("nonexistent_file.txt")
rdd.count() # Error occurs on action, not creation
except Exception as e:
print(f"File access error: {e}")The Python API provides a Pythonic interface to Spark's distributed computing capabilities while maintaining compatibility with the underlying Scala/Java implementation.
Install with Tessl CLI
npx tessl i tessl/maven-apache-spark