Spec Registry
Help your agents use open-source better. Learn more.
Find usage specs for your project’s dependencies
- Author
- tessl
- Last updated
- Spec files
maven-apache-spark
Describes: maven/org.apache.spark/spark-parent
- Description
- Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
- Author
- tessl
- Last updated
python-api.md docs/
1# Python API (PySpark)23PySpark is the Python API for Apache Spark that allows Python developers to harness the power of Spark's distributed computing capabilities. It provides a Python interface to Spark's core functionality including RDDs, SQL, Streaming, MLlib and GraphX.45## Core Imports67```python { .api }8from pyspark import SparkContext, SparkConf9from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row10from pyspark import SparkFiles, StorageLevel11```1213## Basic Usage1415### Creating a SparkContext1617```python { .api }18from pyspark import SparkContext, SparkConf1920# Using SparkConf (recommended)21conf = SparkConf() \22.setAppName("My Python App") \23.setMaster("local[*]") \24.set("spark.executor.memory", "2g")2526sc = SparkContext(conf=conf)2728# Simple constructor29sc = SparkContext("local[*]", "My Python App")3031# Remember to stop the context32sc.stop()33```3435## Capabilities3637### SparkContext3839Main entry point for all Spark functionality.4041```python { .api }42class SparkContext(object):43def __init__(self, master=None, appName=None, sparkHome=None,44pyFiles=None, environment=None, batchSize=1024,45serializer=PickleSerializer(), conf=None, gateway=None):46"""47Create a new SparkContext.4849Args:50master: Cluster URL to connect to (e.g. local[4], spark://host:port)51appName: A name for your job, to display on cluster web UI52sparkHome: Location where Spark is installed on cluster nodes53pyFiles: Collection of .zip or .py files to send to cluster54environment: Dictionary of environment variables for worker nodes55batchSize: Number of Python objects represented as single Java object56serializer: The serializer for RDDs57conf: A SparkConf object setting Spark properties58gateway: Use existing gateway and JVM, otherwise create new JVM59"""60```6162#### RDD Creation Methods6364**parallelize**: Distribute a local collection to form an RDD65```python { .api }66def parallelize(self, c, numSlices=None):67"""68Distribute a local Python collection to form an RDD.6970Args:71c: Collection to parallelize (list, tuple, etc.)72numSlices: Number of partitions to create (optional)7374Returns:75RDD containing the distributed data76"""77```7879```python80data = [1, 2, 3, 4, 5]81rdd = sc.parallelize(data) # Use default parallelism82rdd_with_partitions = sc.parallelize(data, 4) # Specify 4 partitions83```8485**textFile**: Read text files as RDD of strings86```python { .api }87def textFile(self, name, minPartitions=None):88"""89Read a text file from HDFS or local filesystem.9091Args:92name: Path to text file93minPartitions: Minimum number of partitions (optional)9495Returns:96RDD where each element is a line from the file97"""98```99100```python101lines = sc.textFile("hdfs://namenode:port/path/to/file.txt")102lines_local = sc.textFile("file:///local/path/file.txt")103lines_with_partitions = sc.textFile("hdfs://path/to/file.txt", 8)104```105106**wholeTextFiles**: Read directory of text files as key-value pairs107```python { .api }108def wholeTextFiles(self, path, minPartitions=None):109"""110Read directory of text files as (filename, content) pairs.111112Args:113path: Directory path containing text files114minPartitions: Minimum number of partitions (optional)115116Returns:117RDD of (filename, content) tuples118"""119```120121#### Shared Variables122123**broadcast**: Create a broadcast variable for read-only data124```python { .api }125def broadcast(self, value):126"""127Broadcast a read-only variable to all nodes.128129Args:130value: Value to broadcast131132Returns:133Broadcast object with .value property134"""135```136137```python138lookup_table = {"apple": 1, "banana": 2, "orange": 3}139broadcast_table = sc.broadcast(lookup_table)140141data = sc.parallelize(["apple", "banana", "apple"])142mapped = data.map(lambda fruit: broadcast_table.value.get(fruit, 0))143```144145**accumulator**: Create an accumulator for aggregating information146```python { .api }147def accumulator(self, value, accum_param=None):148"""149Create an accumulator with the given initial value.150151Args:152value: Initial value153accum_param: AccumulatorParam object (optional)154155Returns:156Accumulator object157"""158```159160```python161counter = sc.accumulator(0)162163data = sc.parallelize([1, 2, -1, 4, -5])164positive = data.filter(lambda x: x > 0 or counter.add(1))165positive.count() # Trigger action166print(f"Negative numbers: {counter.value}")167```168169#### Job Control170171**setJobGroup**: Assign group ID to jobs172```python { .api }173def setJobGroup(self, groupId, description, interruptOnCancel=False):174"""Set job group for all jobs started by this thread."""175```176177**cancelJobGroup**: Cancel all jobs in a group178```python { .api }179def cancelJobGroup(self, groupId):180"""Cancel all jobs associated with a job group."""181```182183**cancelAllJobs**: Cancel all scheduled or running jobs184```python { .api }185def cancelAllJobs(self):186"""Cancel all scheduled or running jobs."""187```188189#### File Management190191**addFile**: Add a file to be downloaded on every node192```python { .api }193def addFile(self, path, recursive=False):194"""195Add a file to be downloaded with this Spark job on every node.196197Args:198path: Path to file (local or remote)199recursive: Whether to recursively add files in directories200"""201```202203**addPyFile**: Add a Python file to be distributed204```python { .api }205def addPyFile(self, path):206"""207Add a .py or .zip file to be distributed with this Spark job.208209Args:210path: Path to Python file or zip archive211"""212```213214### RDD Operations215216The RDD class provides the fundamental distributed data abstraction.217218```python { .api }219class RDD(object):220"""221Resilient Distributed Dataset - immutable distributed collection.222"""223```224225#### Transformations (Lazy)226227**map**: Apply function to each element228```python { .api }229def map(self, f, preservesPartitioning=False):230"""231Apply a function to each element of the RDD.232233Args:234f: Function to apply to each element235preservesPartitioning: Whether partitioning should be preserved236237Returns:238New RDD with transformed elements239"""240```241242```python243numbers = sc.parallelize([1, 2, 3, 4, 5])244squared = numbers.map(lambda x: x * x)245# Result: RDD containing [1, 4, 9, 16, 25]246```247248**flatMap**: Apply function and flatten results249```python { .api }250def flatMap(self, f, preservesPartitioning=False):251"""252Apply function and flatten the results.253254Args:255f: Function that returns iterable for each element256preservesPartitioning: Whether partitioning should be preserved257258Returns:259New RDD with flattened results260"""261```262263```python264lines = sc.parallelize(["hello world", "spark rdd"])265words = lines.flatMap(lambda line: line.split(" "))266# Result: RDD containing ["hello", "world", "spark", "rdd"]267```268269**filter**: Keep elements matching predicate270```python { .api }271def filter(self, f):272"""273Filter elements using a predicate function.274275Args:276f: Function that returns boolean for each element277278Returns:279New RDD containing only matching elements280"""281```282283**distinct**: Remove duplicate elements284```python { .api }285def distinct(self, numPartitions=None):286"""287Remove duplicate elements from RDD.288289Args:290numPartitions: Number of partitions in result (optional)291292Returns:293New RDD with duplicates removed294"""295```296297**union**: Combine with another RDD298```python { .api }299def union(self, other):300"""301Return union of this RDD and another.302303Args:304other: Another RDD of the same type305306Returns:307New RDD containing elements from both RDDs308"""309```310311**intersection**: Find common elements312```python { .api }313def intersection(self, other, numPartitions=None):314"""315Return intersection of this RDD and another.316317Args:318other: Another RDD319numPartitions: Number of partitions in result (optional)320321Returns:322New RDD containing common elements323"""324```325326**sortBy**: Sort elements using key function327```python { .api }328def sortBy(self, keyfunc, ascending=True, numPartitions=None):329"""330Sort RDD by the given key function.331332Args:333keyfunc: Function to compute sort key for each element334ascending: Whether to sort in ascending order335numPartitions: Number of partitions in result (optional)336337Returns:338New sorted RDD339"""340```341342#### Actions (Eager)343344**collect**: Return all elements as list345```python { .api }346def collect(self):347"""348Return all elements of the RDD as a list.349WARNING: Ensure result fits in driver memory.350351Returns:352List containing all RDD elements353"""354```355356**count**: Count number of elements357```python { .api }358def count(self):359"""360Count the number of elements in the RDD.361362Returns:363Number of elements as integer364"""365```366367**first**: Return first element368```python { .api }369def first(self):370"""371Return the first element of the RDD.372373Returns:374First element375376Raises:377ValueError: If RDD is empty378"""379```380381**take**: Return first n elements382```python { .api }383def take(self, num):384"""385Return first n elements of the RDD.386387Args:388num: Number of elements to return389390Returns:391List of first n elements392"""393```394395**reduce**: Reduce elements using associative function396```python { .api }397def reduce(self, f):398"""399Reduce elements using associative and commutative function.400401Args:402f: Binary function that takes two parameters of same type403404Returns:405Single reduced value406"""407```408409```python410numbers = sc.parallelize([1, 2, 3, 4, 5])411sum_result = numbers.reduce(lambda a, b: a + b)412# Result: 15413```414415**foreach**: Apply function to each element (for side effects)416```python { .api }417def foreach(self, f):418"""419Apply function to each element for side effects only.420421Args:422f: Function to apply to each element423"""424```425426#### Persistence Operations427428**cache**: Cache RDD in memory429```python { .api }430def cache(self):431"""432Cache this RDD in memory using default storage level.433434Returns:435Same RDD for method chaining436"""437```438439**persist**: Cache with specific storage level440```python { .api }441def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):442"""443Cache RDD with specified storage level.444445Args:446storageLevel: Storage level from StorageLevel class447448Returns:449Same RDD for method chaining450"""451```452453```python454from pyspark import StorageLevel455456rdd = sc.textFile("large-file.txt")457rdd.persist(StorageLevel.MEMORY_AND_DISK)458rdd.cache() # Equivalent to persist(StorageLevel.MEMORY_ONLY)459```460461**unpersist**: Remove from cache462```python { .api }463def unpersist(self, blocking=False):464"""465Remove this RDD from cache/storage.466467Args:468blocking: Whether to block until removal is complete469470Returns:471Same RDD for method chaining472"""473```474475#### Key-Value Operations (PairRDD Functions)476477When RDD contains tuples, additional operations are available:478479**reduceByKey**: Combine values by key480```python { .api }481def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):482"""483Combine values with same key using associative function.484485Args:486func: Binary function to combine values487numPartitions: Number of partitions in result (optional)488partitionFunc: Partitioning function (optional)489490Returns:491New RDD with combined values per key492"""493```494495```python496pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])497sums = pairs.reduceByKey(lambda a, b: a + b)498# Result: [("a", 4), ("b", 2)]499```500501**groupByKey**: Group values by key502```python { .api }503def groupByKey(self, numPartitions=None, partitionFunc=portable_hash):504"""505Group values with same key into iterables.506507Args:508numPartitions: Number of partitions in result (optional)509partitionFunc: Partitioning function (optional)510511Returns:512New RDD with grouped values per key513"""514```515516**mapValues**: Transform values, preserve keys517```python { .api }518def mapValues(self, f):519"""520Apply function to values while preserving keys.521522Args:523f: Function to apply to each value524525Returns:526New RDD with transformed values527"""528```529530**join**: Inner join on keys531```python { .api }532def join(self, other, numPartitions=None):533"""534Inner join with another RDD on keys.535536Args:537other: Another pair RDD to join with538numPartitions: Number of partitions in result (optional)539540Returns:541New RDD with joined key-value pairs542"""543```544545### SparkConf546547Configuration class for Spark applications.548549```python { .api }550class SparkConf(object):551"""Configuration for a Spark application."""552553def __init__(self, loadDefaults=True, _jvm=None, _jconf=None):554"""Create SparkConf object."""555556def set(self, key, value):557"""Set configuration property."""558559def setMaster(self, value):560"""Set master URL."""561562def setAppName(self, value):563"""Set application name."""564565def setSparkHome(self, value):566"""Set Spark installation directory."""567568def setExecutorEnv(self, key=None, value=None, pairs=None):569"""Set environment variables for executors."""570571def get(self, key, defaultValue=None):572"""Get configuration value."""573574def getAll(self):575"""Get all configuration as list of (key, value) pairs."""576```577578```python579conf = SparkConf() \580.setAppName("My Application") \581.setMaster("local[4]") \582.set("spark.executor.memory", "4g") \583.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")584```585586### Broadcast Variables587588Read-only variables distributed to all nodes.589590```python { .api }591class Broadcast(object):592"""A broadcast variable created with SparkContext.broadcast()."""593594@property595def value(self):596"""Get the broadcasted value."""597598def unpersist(self, blocking=False):599"""Delete cached copies of this broadcast on executors."""600601def destroy(self):602"""Destroy all data and metadata related to this broadcast."""603```604605### Accumulators606607Shared variables for aggregating information.608609```python { .api }610class Accumulator(object):611"""Shared variable that can only be added to."""612613def add(self, term):614"""Add a term to this accumulator."""615616@property617def value(self):618"""Get the accumulator's value (only valid on driver)."""619```620621### StorageLevel622623Constants for RDD persistence levels.624625```python { .api }626class StorageLevel(object):627"""Storage levels for persisting RDDs."""628629DISK_ONLY = StorageLevel(True, False, False, False, 1)630DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)631MEMORY_ONLY = StorageLevel(False, True, False, False, 1)632MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)633MEMORY_ONLY_SER = StorageLevel(False, True, False, True, 1)634MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, True, 2)635MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)636MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)637MEMORY_AND_DISK_SER = StorageLevel(True, True, False, True, 1)638MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, True, 2)639```640641### SparkFiles642643Utility for accessing files distributed with Spark job.644645```python { .api }646class SparkFiles(object):647"""Access files distributed via SparkContext.addFile()."""648649@classmethod650def get(cls, filename):651"""652Get path to file added via SparkContext.addFile().653654Args:655filename: Name of file to locate656657Returns:658Absolute path to file on current node659"""660661@classmethod662def getRootDirectory(cls):663"""664Get root directory for files added via addFile().665666Returns:667Path to root directory containing distributed files668"""669```670671## Usage Examples672673### Word Count674```python675text_file = sc.textFile("hdfs://...")676counts = text_file \677.flatMap(lambda line: line.split(" ")) \678.map(lambda word: (word, 1)) \679.reduceByKey(lambda a, b: a + b)680681counts.saveAsTextFile("hdfs://output")682```683684### Log Analysis685```python686log_file = sc.textFile("access.log")687errors = log_file.filter(lambda line: "ERROR" in line)688error_counts = errors \689.map(lambda line: (extract_host(line), 1)) \690.reduceByKey(lambda a, b: a + b)691692result = error_counts.collect()693```694695### Using Broadcast Variables696```python697lookup_table = {"user1": "admin", "user2": "guest"}698broadcast_lookup = sc.broadcast(lookup_table)699700user_logs = sc.textFile("user_activity.log")701enriched_logs = user_logs.map(lambda log: {702"log": log,703"role": broadcast_lookup.value.get(extract_user(log), "unknown")704})705```706707### Caching for Performance708```python709large_dataset = sc.textFile("huge_file.txt")710filtered_data = large_dataset.filter(lambda line: "important" in line)711712# Cache the filtered data since we'll use it multiple times713filtered_data.cache()714715# Multiple operations on cached data716count = filtered_data.count()717sample = filtered_data.sample(False, 0.1).collect()718unique_words = filtered_data.flatMap(lambda line: line.split()).distinct().count()719```720721## Error Handling722723Common exceptions and error patterns in PySpark:724725**Py4JJavaError**: Most common error, indicates Java exception726```python727try:728result = rdd.collect()729except Py4JJavaError as e:730print(f"Java exception occurred: {e}")731```732733**SparkContext Errors**: Only one SparkContext per JVM734```python735try:736sc = SparkContext()737except ValueError as e:738print("SparkContext already exists")739```740741**File Not Found**: When reading non-existent files742```python743try:744rdd = sc.textFile("nonexistent_file.txt")745rdd.count() # Error occurs on action, not creation746except Exception as e:747print(f"File access error: {e}")748```749750The Python API provides a Pythonic interface to Spark's distributed computing capabilities while maintaining compatibility with the underlying Scala/Java implementation.