CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-apache-spark

Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R

Pending
Overview
Eval results
Files

python-api.mddocs/

Python API (PySpark)

PySpark is the Python API for Apache Spark that allows Python developers to harness the power of Spark's distributed computing capabilities. It provides a Python interface to Spark's core functionality including RDDs, SQL, Streaming, MLlib and GraphX.

Core Imports

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row  
from pyspark import SparkFiles, StorageLevel

Basic Usage

Creating a SparkContext

from pyspark import SparkContext, SparkConf

# Using SparkConf (recommended)
conf = SparkConf() \
    .setAppName("My Python App") \
    .setMaster("local[*]") \
    .set("spark.executor.memory", "2g")

sc = SparkContext(conf=conf)

# Simple constructor
sc = SparkContext("local[*]", "My Python App")

# Remember to stop the context
sc.stop()

Capabilities

SparkContext

Main entry point for all Spark functionality.

class SparkContext(object):
    def __init__(self, master=None, appName=None, sparkHome=None, 
                 pyFiles=None, environment=None, batchSize=1024, 
                 serializer=PickleSerializer(), conf=None, gateway=None):
        """
        Create a new SparkContext.
        
        Args:
            master: Cluster URL to connect to (e.g. local[4], spark://host:port)
            appName: A name for your job, to display on cluster web UI
            sparkHome: Location where Spark is installed on cluster nodes
            pyFiles: Collection of .zip or .py files to send to cluster
            environment: Dictionary of environment variables for worker nodes
            batchSize: Number of Python objects represented as single Java object
            serializer: The serializer for RDDs
            conf: A SparkConf object setting Spark properties
            gateway: Use existing gateway and JVM, otherwise create new JVM
        """

RDD Creation Methods

parallelize: Distribute a local collection to form an RDD

def parallelize(self, c, numSlices=None):
    """
    Distribute a local Python collection to form an RDD.
    
    Args:
        c: Collection to parallelize (list, tuple, etc.)
        numSlices: Number of partitions to create (optional)
        
    Returns:
        RDD containing the distributed data
    """
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)                    # Use default parallelism
rdd_with_partitions = sc.parallelize(data, 4)  # Specify 4 partitions

textFile: Read text files as RDD of strings

def textFile(self, name, minPartitions=None):
    """
    Read a text file from HDFS or local filesystem.
    
    Args:
        name: Path to text file
        minPartitions: Minimum number of partitions (optional)
        
    Returns:
        RDD where each element is a line from the file
    """
lines = sc.textFile("hdfs://namenode:port/path/to/file.txt")
lines_local = sc.textFile("file:///local/path/file.txt")
lines_with_partitions = sc.textFile("hdfs://path/to/file.txt", 8)

wholeTextFiles: Read directory of text files as key-value pairs

def wholeTextFiles(self, path, minPartitions=None):
    """
    Read directory of text files as (filename, content) pairs.
    
    Args:
        path: Directory path containing text files
        minPartitions: Minimum number of partitions (optional)
        
    Returns:
        RDD of (filename, content) tuples
    """

Shared Variables

broadcast: Create a broadcast variable for read-only data

def broadcast(self, value):
    """
    Broadcast a read-only variable to all nodes.
    
    Args:
        value: Value to broadcast
        
    Returns:
        Broadcast object with .value property
    """
lookup_table = {"apple": 1, "banana": 2, "orange": 3}
broadcast_table = sc.broadcast(lookup_table)

data = sc.parallelize(["apple", "banana", "apple"])
mapped = data.map(lambda fruit: broadcast_table.value.get(fruit, 0))

accumulator: Create an accumulator for aggregating information

def accumulator(self, value, accum_param=None):
    """
    Create an accumulator with the given initial value.
    
    Args:
        value: Initial value
        accum_param: AccumulatorParam object (optional)
        
    Returns:
        Accumulator object
    """
counter = sc.accumulator(0)

data = sc.parallelize([1, 2, -1, 4, -5])
positive = data.filter(lambda x: x > 0 or counter.add(1))
positive.count()  # Trigger action
print(f"Negative numbers: {counter.value}")

Job Control

setJobGroup: Assign group ID to jobs

def setJobGroup(self, groupId, description, interruptOnCancel=False):
    """Set job group for all jobs started by this thread."""

cancelJobGroup: Cancel all jobs in a group

def cancelJobGroup(self, groupId):
    """Cancel all jobs associated with a job group."""

cancelAllJobs: Cancel all scheduled or running jobs

def cancelAllJobs(self):
    """Cancel all scheduled or running jobs."""

File Management

addFile: Add a file to be downloaded on every node

def addFile(self, path, recursive=False):
    """
    Add a file to be downloaded with this Spark job on every node.
    
    Args:
        path: Path to file (local or remote)
        recursive: Whether to recursively add files in directories
    """

addPyFile: Add a Python file to be distributed

def addPyFile(self, path):
    """
    Add a .py or .zip file to be distributed with this Spark job.
    
    Args:
        path: Path to Python file or zip archive
    """

RDD Operations

The RDD class provides the fundamental distributed data abstraction.

class RDD(object):
    """
    Resilient Distributed Dataset - immutable distributed collection.
    """

Transformations (Lazy)

map: Apply function to each element

def map(self, f, preservesPartitioning=False):
    """
    Apply a function to each element of the RDD.
    
    Args:
        f: Function to apply to each element
        preservesPartitioning: Whether partitioning should be preserved
        
    Returns:
        New RDD with transformed elements
    """
numbers = sc.parallelize([1, 2, 3, 4, 5])
squared = numbers.map(lambda x: x * x)
# Result: RDD containing [1, 4, 9, 16, 25]

flatMap: Apply function and flatten results

def flatMap(self, f, preservesPartitioning=False):
    """
    Apply function and flatten the results.
    
    Args:
        f: Function that returns iterable for each element
        preservesPartitioning: Whether partitioning should be preserved
        
    Returns:
        New RDD with flattened results
    """
lines = sc.parallelize(["hello world", "spark rdd"])
words = lines.flatMap(lambda line: line.split(" "))
# Result: RDD containing ["hello", "world", "spark", "rdd"]

filter: Keep elements matching predicate

def filter(self, f):
    """
    Filter elements using a predicate function.
    
    Args:
        f: Function that returns boolean for each element
        
    Returns:
        New RDD containing only matching elements
    """

distinct: Remove duplicate elements

def distinct(self, numPartitions=None):
    """
    Remove duplicate elements from RDD.
    
    Args:
        numPartitions: Number of partitions in result (optional)
        
    Returns:
        New RDD with duplicates removed
    """

union: Combine with another RDD

def union(self, other):
    """
    Return union of this RDD and another.
    
    Args:
        other: Another RDD of the same type
        
    Returns:
        New RDD containing elements from both RDDs
    """

intersection: Find common elements

def intersection(self, other, numPartitions=None):
    """
    Return intersection of this RDD and another.
    
    Args:
        other: Another RDD
        numPartitions: Number of partitions in result (optional)
        
    Returns:
        New RDD containing common elements
    """

sortBy: Sort elements using key function

def sortBy(self, keyfunc, ascending=True, numPartitions=None):
    """
    Sort RDD by the given key function.
    
    Args:
        keyfunc: Function to compute sort key for each element
        ascending: Whether to sort in ascending order
        numPartitions: Number of partitions in result (optional)
        
    Returns:
        New sorted RDD
    """

Actions (Eager)

collect: Return all elements as list

def collect(self):
    """
    Return all elements of the RDD as a list.
    WARNING: Ensure result fits in driver memory.
    
    Returns:
        List containing all RDD elements
    """

count: Count number of elements

def count(self):
    """
    Count the number of elements in the RDD.
    
    Returns:
        Number of elements as integer
    """

first: Return first element

def first(self):
    """
    Return the first element of the RDD.
    
    Returns:
        First element
        
    Raises:
        ValueError: If RDD is empty
    """

take: Return first n elements

def take(self, num):
    """
    Return first n elements of the RDD.
    
    Args:
        num: Number of elements to return
        
    Returns:
        List of first n elements
    """

reduce: Reduce elements using associative function

def reduce(self, f):
    """
    Reduce elements using associative and commutative function.
    
    Args:
        f: Binary function that takes two parameters of same type
        
    Returns:
        Single reduced value
    """
numbers = sc.parallelize([1, 2, 3, 4, 5])
sum_result = numbers.reduce(lambda a, b: a + b)
# Result: 15

foreach: Apply function to each element (for side effects)

def foreach(self, f):
    """
    Apply function to each element for side effects only.
    
    Args:
        f: Function to apply to each element
    """

Persistence Operations

cache: Cache RDD in memory

def cache(self):
    """
    Cache this RDD in memory using default storage level.
    
    Returns:
        Same RDD for method chaining
    """

persist: Cache with specific storage level

def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
    """
    Cache RDD with specified storage level.
    
    Args:
        storageLevel: Storage level from StorageLevel class
        
    Returns:
        Same RDD for method chaining
    """
from pyspark import StorageLevel

rdd = sc.textFile("large-file.txt")
rdd.persist(StorageLevel.MEMORY_AND_DISK)
rdd.cache()  # Equivalent to persist(StorageLevel.MEMORY_ONLY)

unpersist: Remove from cache

def unpersist(self, blocking=False):
    """
    Remove this RDD from cache/storage.
    
    Args:
        blocking: Whether to block until removal is complete
        
    Returns:
        Same RDD for method chaining
    """

Key-Value Operations (PairRDD Functions)

When RDD contains tuples, additional operations are available:

reduceByKey: Combine values by key

def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):
    """
    Combine values with same key using associative function.
    
    Args:
        func: Binary function to combine values
        numPartitions: Number of partitions in result (optional)
        partitionFunc: Partitioning function (optional)
        
    Returns:
        New RDD with combined values per key
    """
pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
sums = pairs.reduceByKey(lambda a, b: a + b)
# Result: [("a", 4), ("b", 2)]

groupByKey: Group values by key

def groupByKey(self, numPartitions=None, partitionFunc=portable_hash):
    """
    Group values with same key into iterables.
    
    Args:
        numPartitions: Number of partitions in result (optional)
        partitionFunc: Partitioning function (optional)
        
    Returns:
        New RDD with grouped values per key
    """

mapValues: Transform values, preserve keys

def mapValues(self, f):
    """
    Apply function to values while preserving keys.
    
    Args:
        f: Function to apply to each value
        
    Returns:
        New RDD with transformed values
    """

join: Inner join on keys

def join(self, other, numPartitions=None):
    """
    Inner join with another RDD on keys.
    
    Args:
        other: Another pair RDD to join with
        numPartitions: Number of partitions in result (optional)
        
    Returns:
        New RDD with joined key-value pairs
    """

SparkConf

Configuration class for Spark applications.

class SparkConf(object):
    """Configuration for a Spark application."""
    
    def __init__(self, loadDefaults=True, _jvm=None, _jconf=None):
        """Create SparkConf object."""
    
    def set(self, key, value):
        """Set configuration property."""
    
    def setMaster(self, value):
        """Set master URL."""
    
    def setAppName(self, value):
        """Set application name."""
    
    def setSparkHome(self, value):
        """Set Spark installation directory."""
    
    def setExecutorEnv(self, key=None, value=None, pairs=None):
        """Set environment variables for executors."""
    
    def get(self, key, defaultValue=None):
        """Get configuration value."""
    
    def getAll(self):
        """Get all configuration as list of (key, value) pairs."""
conf = SparkConf() \
    .setAppName("My Application") \
    .setMaster("local[4]") \
    .set("spark.executor.memory", "4g") \
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

Broadcast Variables

Read-only variables distributed to all nodes.

class Broadcast(object):
    """A broadcast variable created with SparkContext.broadcast()."""
    
    @property
    def value(self):
        """Get the broadcasted value."""
    
    def unpersist(self, blocking=False):
        """Delete cached copies of this broadcast on executors."""
    
    def destroy(self):
        """Destroy all data and metadata related to this broadcast."""

Accumulators

Shared variables for aggregating information.

class Accumulator(object):
    """Shared variable that can only be added to."""
    
    def add(self, term):
        """Add a term to this accumulator."""
    
    @property  
    def value(self):
        """Get the accumulator's value (only valid on driver)."""

StorageLevel

Constants for RDD persistence levels.

class StorageLevel(object):
    """Storage levels for persisting RDDs."""
    
    DISK_ONLY = StorageLevel(True, False, False, False, 1)
    DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
    MEMORY_ONLY = StorageLevel(False, True, False, False, 1)
    MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
    MEMORY_ONLY_SER = StorageLevel(False, True, False, True, 1)
    MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, True, 2)
    MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)
    MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
    MEMORY_AND_DISK_SER = StorageLevel(True, True, False, True, 1)
    MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, True, 2)

SparkFiles

Utility for accessing files distributed with Spark job.

class SparkFiles(object):
    """Access files distributed via SparkContext.addFile()."""
    
    @classmethod
    def get(cls, filename):
        """
        Get path to file added via SparkContext.addFile().
        
        Args:
            filename: Name of file to locate
            
        Returns:
            Absolute path to file on current node
        """
    
    @classmethod
    def getRootDirectory(cls):
        """
        Get root directory for files added via addFile().
        
        Returns:
            Path to root directory containing distributed files
        """

Usage Examples

Word Count

text_file = sc.textFile("hdfs://...")
counts = text_file \
    .flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b)
    
counts.saveAsTextFile("hdfs://output")

Log Analysis

log_file = sc.textFile("access.log")
errors = log_file.filter(lambda line: "ERROR" in line)
error_counts = errors \
    .map(lambda line: (extract_host(line), 1)) \
    .reduceByKey(lambda a, b: a + b)
    
result = error_counts.collect()

Using Broadcast Variables

lookup_table = {"user1": "admin", "user2": "guest"}
broadcast_lookup = sc.broadcast(lookup_table)

user_logs = sc.textFile("user_activity.log")
enriched_logs = user_logs.map(lambda log: {
    "log": log,
    "role": broadcast_lookup.value.get(extract_user(log), "unknown")
})

Caching for Performance

large_dataset = sc.textFile("huge_file.txt")
filtered_data = large_dataset.filter(lambda line: "important" in line)

# Cache the filtered data since we'll use it multiple times
filtered_data.cache()

# Multiple operations on cached data
count = filtered_data.count()
sample = filtered_data.sample(False, 0.1).collect()
unique_words = filtered_data.flatMap(lambda line: line.split()).distinct().count()

Error Handling

Common exceptions and error patterns in PySpark:

Py4JJavaError: Most common error, indicates Java exception

try:
    result = rdd.collect()
except Py4JJavaError as e:
    print(f"Java exception occurred: {e}")

SparkContext Errors: Only one SparkContext per JVM

try:
    sc = SparkContext()
except ValueError as e:
    print("SparkContext already exists")

File Not Found: When reading non-existent files

try:
    rdd = sc.textFile("nonexistent_file.txt")
    rdd.count()  # Error occurs on action, not creation
except Exception as e:
    print(f"File access error: {e}")

The Python API provides a Pythonic interface to Spark's distributed computing capabilities while maintaining compatibility with the underlying Scala/Java implementation.

Install with Tessl CLI

npx tessl i tessl/maven-apache-spark

docs

caching-persistence.md

core-rdd.md

data-sources.md

graphx.md

index.md

java-api.md

key-value-operations.md

mllib.md

python-api.md

spark-context.md

sql.md

streaming.md

tile.json