Python API for Apache Spark, providing distributed computing, data analysis, and machine learning capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Structured data processing with DataFrames, SQL queries, and comprehensive built-in functions. This is the primary high-level interface for working with structured and semi-structured data in PySpark, providing optimized query execution through the Catalyst optimizer.
Unified entry point for DataFrame and Dataset functionality, replacing SQLContext in newer versions.
class SparkSession:
def __init__(self, sparkContext, jsparkSession=None):
"""Create a new SparkSession."""
@classmethod
def builder(cls):
"""
Creates a Builder for constructing a SparkSession.
Returns:
SparkSession.Builder
"""
def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=True):
"""
Creates a DataFrame from RDD, list, pandas DataFrame, numpy array, or PyArrow Table.
Parameters:
- data: Input data (RDD, Iterable, pandas DataFrame, numpy array, PyArrow Table)
- schema (StructType, str, list): Schema definition for the DataFrame
- samplingRatio (float): Sample ratio for schema inference from RDD (0.0-1.0)
- verifySchema (bool): Verify data conforms to the provided schema
Returns:
DataFrame with the specified data and schema
"""
def sql(self, sqlQuery):
"""
Returns a DataFrame representing the result of the given query.
Parameters:
- sqlQuery (str): SQL query string
Returns:
DataFrame with query results
"""
def table(self, tableName):
"""
Returns the specified table as a DataFrame.
Parameters:
- tableName (str): Table name
Returns:
DataFrame representing the table
"""
def range(self, start, end=None, step=1, numPartitions=None):
"""
Create a DataFrame with single pyspark.sql.types.LongType column named id.
Parameters:
- start (int): Start value (inclusive)
- end (int): End value (exclusive)
- step (int): Step size
- numPartitions (int): Number of partitions
Returns:
DataFrame with id column
"""
def stop(self):
"""Stop the underlying SparkContext."""
@property
def read(self):
"""
Returns a DataFrameReader for reading data.
Returns:
DataFrameReader
"""
@property
def readStream(self):
"""
Returns a DataStreamReader for reading streaming data.
Returns:
DataStreamReader
"""
@property
def catalog(self):
"""
Interface to Spark's catalog of databases, tables and functions.
Returns:
Catalog
"""
@property
def udf(self):
"""
Returns a UDFRegistration for registering user-defined functions.
Returns:
UDFRegistration
"""Builder pattern for creating SparkSession instances with configuration.
class Builder:
def appName(self, name):
"""
Sets a name for the application.
Parameters:
- name (str): Application name
Returns:
Builder
"""
def master(self, master):
"""
Sets the Spark master URL.
Parameters:
- master (str): Master URL
Returns:
Builder
"""
def config(self, key=None, value=None, conf=None):
"""
Sets a config option or SparkConf.
Parameters:
- key (str): Configuration key
- value (str): Configuration value
- conf (SparkConf): SparkConf object
Returns:
Builder
"""
def enableHiveSupport(self):
"""
Enables Hive support.
Returns:
Builder
"""
def getOrCreate(self):
"""
Gets an existing SparkSession or creates a new one.
Returns:
SparkSession
"""Distributed collection of data organized into named columns.
class DataFrame:
def select(self, *cols):
"""
Projects a set of expressions and returns a new DataFrame.
Parameters:
- cols: Column expressions or names
Returns:
DataFrame with selected columns
"""
def filter(self, condition):
"""
Filters rows using the given condition.
Parameters:
- condition: Filter condition (Column or string)
Returns:
Filtered DataFrame
"""
def where(self, condition):
"""
Filters rows using the given condition (alias for filter).
Parameters:
- condition: Filter condition (Column or string)
Returns:
Filtered DataFrame
"""
def groupBy(self, *cols):
"""
Group DataFrame using the specified columns.
Parameters:
- cols: Column names or expressions
Returns:
GroupedData for aggregation
"""
def agg(self, *exprs):
"""
Aggregate on the entire DataFrame without groups.
Parameters:
- exprs: Aggregation expressions
Returns:
DataFrame with aggregated results
"""
def orderBy(self, *cols, **kwargs):
"""
Sort DataFrame by specified columns.
Parameters:
- cols: Column names or expressions
- ascending (bool): Sort in ascending order
Returns:
Sorted DataFrame
"""
def sort(self, *cols, **kwargs):
"""
Sort DataFrame by specified columns (alias for orderBy).
Parameters:
- cols: Column names or expressions
- ascending (bool): Sort in ascending order
Returns:
Sorted DataFrame
"""
def join(self, other, on=None, how=None):
"""
Join with another DataFrame.
Parameters:
- other (DataFrame): DataFrame to join with
- on: Join condition (column names or expression)
- how (str): Join type ("inner", "outer", "left", "right", "semi", "anti")
Returns:
Joined DataFrame
"""
def union(self, other):
"""
Return a new DataFrame containing union of rows.
Parameters:
- other (DataFrame): Another DataFrame
Returns:
Union DataFrame
"""
def unionByName(self, other, allowMissingColumns=False):
"""
Return a new DataFrame containing union of rows by column names.
Parameters:
- other (DataFrame): Another DataFrame
- allowMissingColumns (bool): Allow missing columns
Returns:
Union DataFrame
"""
def intersect(self, other):
"""
Return a new DataFrame containing rows in both DataFrames.
Parameters:
- other (DataFrame): Another DataFrame
Returns:
Intersection DataFrame
"""
def subtract(self, other):
"""
Return a new DataFrame containing rows in this DataFrame but not in the other.
Parameters:
- other (DataFrame): Another DataFrame
Returns:
Difference DataFrame
"""
def distinct(self):
"""
Return a new DataFrame with distinct rows.
Returns:
DataFrame with distinct rows
"""
def dropDuplicates(self, subset=None):
"""
Return a new DataFrame with duplicate rows removed.
Parameters:
- subset (list): Column names to consider for duplicates
Returns:
DataFrame without duplicates
"""
def drop(self, *cols):
"""
Return a new DataFrame with specified columns dropped.
Parameters:
- cols: Column names to drop
Returns:
DataFrame with columns dropped
"""
def withColumn(self, colName, col):
"""
Return a new DataFrame by adding or replacing a column.
Parameters:
- colName (str): Column name
- col (Column): Column expression
Returns:
DataFrame with new/updated column
"""
def withColumnRenamed(self, existing, new):
"""
Return a new DataFrame by renaming a column.
Parameters:
- existing (str): Existing column name
- new (str): New column name
Returns:
DataFrame with renamed column
"""
def show(self, n=20, truncate=True, vertical=False):
"""
Print the first n rows to the console.
Parameters:
- n (int): Number of rows to show
- truncate (bool or int): Truncate strings (True/False or max character width)
- vertical (bool): Print rows vertically instead of horizontally
"""
def collect(self):
"""
Return all the records as a list of Row.
Returns:
List of Row objects
"""
def take(self, num):
"""
Return the first num rows as a list of Row.
Parameters:
- num (int): Number of rows to return
Returns:
List of Row objects
"""
def first(self):
"""
Return the first row as a Row.
Returns:
First Row
"""
def head(self, n=None):
"""
Return the first n rows or the first row if n is None.
Parameters:
- n (int): Number of rows to return
Returns:
Row or list of Rows
"""
def count(self):
"""
Return the number of rows in this DataFrame.
Returns:
Number of rows
"""
def describe(self, *cols):
"""
Compute basic statistics for numeric and string columns.
Parameters:
- cols: Column names
Returns:
DataFrame with statistics
"""
def summary(self, *statistics):
"""
Compute specified statistics for numeric and string columns.
Parameters:
- statistics: Statistics to compute
Returns:
DataFrame with statistics
"""
def cache(self):
"""
Persist this DataFrame with the default storage level.
Returns:
This DataFrame
"""
def persist(self, storageLevel=StorageLevel.MEMORY_AND_DISK):
"""
Persist this DataFrame with the given storage level.
Parameters:
- storageLevel (StorageLevel): Storage level
Returns:
This DataFrame
"""
def unpersist(self, blocking=False):
"""
Mark the DataFrame as non-persistent.
Parameters:
- blocking (bool): Whether to block until complete
Returns:
This DataFrame
"""
def coalesce(self, numPartitions):
"""
Return a new DataFrame with reduced number of partitions.
Parameters:
- numPartitions (int): Target number of partitions
Returns:
Coalesced DataFrame
"""
def repartition(self, numPartitions, *cols):
"""
Return a new DataFrame partitioned by the given expressions.
Parameters:
- numPartitions (int): Number of partitions
- cols: Partitioning expressions
Returns:
Repartitioned DataFrame
"""
def createOrReplaceTempView(self, name):
"""
Create or replace a local temporary view.
Parameters:
- name (str): View name
"""
def createGlobalTempView(self, name):
"""
Create a global temporary view.
Parameters:
- name (str): View name
"""
@property
def write(self):
"""
Interface for saving the content of DataFrame.
Returns:
DataFrameWriter
"""
@property
def writeStream(self):
"""
Interface for saving the content of streaming DataFrame.
Returns:
DataStreamWriter
"""
@property
def schema(self):
"""
Return the schema of this DataFrame.
Returns:
StructType representing the schema
"""
@property
def columns(self):
"""
Return all column names as a list.
Returns:
List of column names
"""
@property
def dtypes(self):
"""
Return all column names and their data types as a list.
Returns:
List of (name, type) tuples
"""Specialized function classes for handling missing data and statistical operations.
class DataFrameNaFunctions:
"""
Functionality for working with missing data in DataFrames.
Accessed via DataFrame.na property.
"""
def drop(self, how="any", thresh=None, subset=None):
"""
Drop rows with null values.
Parameters:
- how (str): "any" or "all" - drop rows with any/all null values
- thresh (int): Minimum number of non-null values required
- subset (list): Column subset to consider for null checking
Returns:
DataFrame with null rows dropped
"""
def fill(self, value, subset=None):
"""
Fill null values with specified value.
Parameters:
- value: Value to replace nulls (dict for per-column values)
- subset (list): Column subset to fill
Returns:
DataFrame with null values filled
"""
def replace(self, to_replace, value=None, subset=None):
"""
Replace specified values in DataFrame.
Parameters:
- to_replace: Value(s) to replace
- value: Replacement value(s)
- subset (list): Column subset to apply replacement
Returns:
DataFrame with values replaced
"""
class DataFrameStatFunctions:
"""
Functionality for statistical operations on DataFrames.
Accessed via DataFrame.stat property.
"""
def approxQuantile(self, col, probabilities, relativeError):
"""
Calculate approximate quantiles for numeric columns.
Parameters:
- col (str): Column name or list of column names
- probabilities (list): List of quantile probabilities (0.0 to 1.0)
- relativeError (float): Relative error tolerance
Returns:
List of quantile values
"""
def corr(self, col1, col2, method="pearson"):
"""
Calculate correlation between two columns.
Parameters:
- col1 (str): First column name
- col2 (str): Second column name
- method (str): Correlation method ("pearson" or "spearman")
Returns:
Correlation coefficient as float
"""
def cov(self, col1, col2):
"""
Calculate covariance between two columns.
Parameters:
- col1 (str): First column name
- col2 (str): Second column name
Returns:
Covariance as float
"""
def crosstab(self, col1, col2):
"""
Calculate cross-tabulation between two columns.
Parameters:
- col1 (str): First column name
- col2 (str): Second column name
Returns:
DataFrame with cross-tabulation results
"""
def freqItems(self, cols, support=None):
"""
Find frequent items for specified columns.
Parameters:
- cols (list): Column names
- support (float): Minimum support threshold
Returns:
DataFrame with frequent items
"""
def sampleBy(self, col, fractions, seed=None):
"""
Stratified sampling by column values.
Parameters:
- col (str): Column to stratify by
- fractions (dict): Sampling fractions per stratum
- seed (int): Random seed
Returns:
Sampled DataFrame
"""Column expressions for DataFrame transformations.
class Column:
def alias(self, *alias, **kwargs):
"""
Return a column with an alias.
Parameters:
- alias: Alias name(s)
Returns:
Aliased Column
"""
def cast(self, dataType):
"""
Convert the column to a different data type.
Parameters:
- dataType: Target data type
Returns:
Casted Column
"""
def contains(self, other):
"""
Check if column contains the specified value.
Parameters:
- other: Value to check
Returns:
Boolean Column
"""
def startswith(self, other):
"""
Check if column starts with the specified string.
Parameters:
- other (str): String to check
Returns:
Boolean Column
"""
def endswith(self, other):
"""
Check if column ends with the specified string.
Parameters:
- other (str): String to check
Returns:
Boolean Column
"""
def isNull(self):
"""
Check if column is null.
Returns:
Boolean Column
"""
def isNotNull(self):
"""
Check if column is not null.
Returns:
Boolean Column
"""
def isin(self, *cols):
"""
Check if column value is in the specified list.
Parameters:
- cols: Values to check against
Returns:
Boolean Column
"""
def between(self, lowerBound, upperBound):
"""
Check if column is between two values.
Parameters:
- lowerBound: Lower bound
- upperBound: Upper bound
Returns:
Boolean Column
"""
def when(self, condition, value):
"""
Evaluate a list of conditions and return one of multiple possible result expressions.
Parameters:
- condition: Condition expression
- value: Value when condition is true
Returns:
Column with conditional logic
"""
def otherwise(self, value):
"""
Evaluate a list of conditions and return one of multiple possible result expressions.
Parameters:
- value: Default value
Returns:
Column with conditional logic
"""
def substr(self, startPos, length):
"""
Return a substring of the column.
Parameters:
- startPos (int): Starting position
- length (int): Length of substring
Returns:
Substring Column
"""
def asc(self):
"""
Return a sort expression based on ascending order.
Returns:
Ascending sort Column
"""
def desc(self):
"""
Return a sort expression based on descending order.
Returns:
Descending sort Column
"""Window functions for analytical operations over groups of rows.
class Window:
"""
Utility functions for defining window specifications in DataFrames.
"""
unboundedPreceding: int
"""Represents unbounded preceding frame boundary."""
unboundedFollowing: int
"""Represents unbounded following frame boundary."""
currentRow: int
"""Represents current row frame boundary."""
@staticmethod
def partitionBy(*cols):
"""
Creates a WindowSpec with the partitioning defined.
Parameters:
- cols: Column names or expressions for partitioning
Returns:
WindowSpec with partitioning defined
"""
@staticmethod
def orderBy(*cols):
"""
Creates a WindowSpec with the ordering defined.
Parameters:
- cols: Column names or expressions for ordering
Returns:
WindowSpec with ordering defined
"""
@staticmethod
def rowsBetween(start, end):
"""
Creates a WindowSpec with row-based frame boundaries.
Parameters:
- start (int): Start boundary (inclusive)
- end (int): End boundary (inclusive)
Returns:
WindowSpec with frame boundaries defined
"""
@staticmethod
def rangeBetween(start, end):
"""
Creates a WindowSpec with range-based frame boundaries.
Parameters:
- start: Start boundary value
- end: End boundary value
Returns:
WindowSpec with range frame boundaries
"""
class WindowSpec:
"""
Window specification that defines partitioning, ordering, and frame boundaries.
"""
def partitionBy(*cols):
"""
Defines the partitioning columns for this window.
Parameters:
- cols: Column names or expressions
Returns:
WindowSpec with updated partitioning
"""
def orderBy(*cols):
"""
Defines the ordering columns for this window.
Parameters:
- cols: Column names or expressions with optional sort direction
Returns:
WindowSpec with updated ordering
"""
def rowsBetween(start, end):
"""
Defines row-based frame boundaries for this window.
Parameters:
- start (int): Start row offset
- end (int): End row offset
Returns:
WindowSpec with frame boundaries
"""
def rangeBetween(start, end):
"""
Defines range-based frame boundaries for this window.
Parameters:
- start: Start range value
- end: End range value
Returns:
WindowSpec with range frame boundaries
"""Interfaces for reading and writing data to various formats and sources.
class DataFrameReader:
def format(self, source):
"""
Specify the input data source format.
Parameters:
- source (str): Data source format
Returns:
DataFrameReader
"""
def option(self, key, value):
"""
Add an input option for the underlying data source.
Parameters:
- key (str): Option key
- value: Option value
Returns:
DataFrameReader
"""
def options(self, **options):
"""
Add input options for the underlying data source.
Parameters:
- options: Keyword arguments of options
Returns:
DataFrameReader
"""
def schema(self, schema):
"""
Specify the input schema.
Parameters:
- schema: Schema definition
Returns:
DataFrameReader
"""
def load(self, path=None, format=None, schema=None, **options):
"""
Load data from a data source.
Parameters:
- path (str): File path
- format (str): Data source format
- schema: Schema definition
- options: Additional options
Returns:
DataFrame
"""
def csv(self, path, schema=None, sep=None, encoding=None, quote=None,
escape=None, comment=None, header=None, inferSchema=None,
ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
nullValue=None, nanValue=None, positiveInf=None, negativeInf=None,
dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None,
mode=None, columnNameOfCorruptRecord=None, multiLine=None,
charToEscapeQuoteEscaping=None, samplingRatio=None,
enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
pathGlobFilter=None, recursiveFileLookup=None,
modifiedBefore=None, modifiedAfter=None, unescapedQuoteHandling=None):
"""
Load a CSV file and return the results as a DataFrame.
Parameters:
- path (str): CSV file path
- schema: Schema for the CSV file
- sep (str): Column separator
- encoding (str): Character encoding
- quote (str): Quote character
- escape (str): Escape character
- comment (str): Comment character
- header (bool): Whether first line is header
- inferSchema (bool): Automatically infer column types
- ignoreLeadingWhiteSpace (bool): Ignore leading whitespaces
- ignoreTrailingWhiteSpace (bool): Ignore trailing whitespaces
- nullValue (str): String representation of null value
- nanValue (str): String representation of NaN value
- positiveInf (str): String representation of positive infinity
- negativeInf (str): String representation of negative infinity
- dateFormat (str): Date format string
- timestampFormat (str): Timestamp format string
- maxColumns (int): Maximum number of columns
- maxCharsPerColumn (int): Maximum characters per column
- maxMalformedLogPerPartition (int): Maximum malformed records to log per partition
- mode (str): Parse mode ("PERMISSIVE", "DROPMALFORMED", "FAILFAST")
- columnNameOfCorruptRecord (str): Column name for corrupt records
- multiLine (bool): Parse multi-line records
- charToEscapeQuoteEscaping (str): Character to escape quote escaping
- samplingRatio (float): Sampling ratio for schema inference
- enforceSchema (bool): Enforce specified or inferred schema
- emptyValue (str): String representation of empty value
- locale (str): Locale for parsing
- lineSep (str): Line separator
- pathGlobFilter (str): Path glob filter
- recursiveFileLookup (bool): Recursive file lookup
- modifiedBefore (str): Files modified before timestamp
- modifiedAfter (str): Files modified after timestamp
- unescapedQuoteHandling (str): How to handle unescaped quotes
Returns:
DataFrame
"""
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
timestampFormat=None, multiLine=None, allowUnquotedControlChars=None,
lineSep=None, samplingRatio=None, dropFieldIfAllNull=None,
encoding=None, locale=None, pathGlobFilter=None, recursiveFileLookup=None,
modifiedBefore=None, modifiedAfter=None):
"""
Load JSON files and return the results as a DataFrame.
Parameters:
- path (str): JSON file path
- schema: Schema for the JSON data
- primitivesAsString (bool): Infer primitive types as strings
- prefersDecimal (bool): Prefer decimal type for numbers
- allowComments (bool): Allow Java/C++ style comments
- allowUnquotedFieldNames (bool): Allow unquoted field names
- allowSingleQuotes (bool): Allow single quotes
- allowNumericLeadingZero (bool): Allow leading zeros in numbers
- allowBackslashEscapingAnyCharacter (bool): Allow backslash escaping
- mode (str): Parse mode
- columnNameOfCorruptRecord (str): Column name for corrupt records
- dateFormat (str): Date format string
- timestampFormat (str): Timestamp format string
- multiLine (bool): Parse multi-line JSON records
- allowUnquotedControlChars (bool): Allow unquoted control characters
- lineSep (str): Line separator
- samplingRatio (float): Sampling ratio for schema inference
- dropFieldIfAllNull (bool): Drop fields with all null values
- encoding (str): Character encoding
- locale (str): Locale for parsing
- pathGlobFilter (str): Path glob filter
- recursiveFileLookup (bool): Recursive file lookup
- modifiedBefore (str): Files modified before timestamp
- modifiedAfter (str): Files modified after timestamp
Returns:
DataFrame
"""
def parquet(self, *paths, **options):
"""
Load Parquet files and return the results as a DataFrame.
Parameters:
- paths: Parquet file paths
- options: Additional options
Returns:
DataFrame
"""
def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,
recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None):
"""
Load text files and return a DataFrame with a single string column.
Parameters:
- paths: Text file paths
- wholetext (bool): Read files as single record
- lineSep (str): Line separator
- pathGlobFilter (str): Path glob filter
- recursiveFileLookup (bool): Recursive file lookup
- modifiedBefore (str): Files modified before timestamp
- modifiedAfter (str): Files modified after timestamp
Returns:
DataFrame
"""
def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None,
modifiedBefore=None, modifiedAfter=None):
"""
Load ORC files and return the results as a DataFrame.
Parameters:
- path (str): ORC file path
- mergeSchema (bool): Merge schemas from multiple files
- pathGlobFilter (str): Path glob filter
- recursiveFileLookup (bool): Recursive file lookup
- modifiedBefore (str): Files modified before timestamp
- modifiedAfter (str): Files modified after timestamp
Returns:
DataFrame
"""
def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None,
numPartitions=None, predicates=None, properties=None):
"""
Construct a DataFrame representing the database table.
Parameters:
- url (str): JDBC database URL
- table (str): Table name
- column (str): Column name for partitioning
- lowerBound: Lower bound for partitioning column
- upperBound: Upper bound for partitioning column
- numPartitions (int): Number of partitions
- predicates (list): List of expressions for partitioning
- properties (dict): JDBC connection properties
Returns:
DataFrame
"""Components for real-time data processing with streaming DataFrames.
class DataStreamReader:
"""
Interface for reading streaming data from various sources.
Accessed via SparkSession.readStream property.
"""
def format(self, source):
"""
Specify data source format.
Parameters:
- source (str): Source format ("kafka", "socket", "rate", etc.)
Returns:
DataStreamReader with format specified
"""
def option(self, key, value):
"""
Add input option for streaming source.
Parameters:
- key (str): Option name
- value: Option value
Returns:
DataStreamReader with option set
"""
def schema(self, schema):
"""
Specify input schema for streaming data.
Parameters:
- schema (StructType): Schema definition
Returns:
DataStreamReader with schema specified
"""
def load(self, path=None):
"""
Load streaming DataFrame from configured source.
Parameters:
- path (str): Optional source path
Returns:
Streaming DataFrame
"""
class DataStreamWriter:
"""
Interface for writing streaming data to various sinks.
Accessed via DataFrame.writeStream property.
"""
def format(self, source):
"""
Specify output sink format.
Parameters:
- source (str): Sink format ("console", "memory", "kafka", etc.)
Returns:
DataStreamWriter with format specified
"""
def outputMode(self, outputMode):
"""
Specify output mode for streaming queries.
Parameters:
- outputMode (str): "append", "complete", or "update"
Returns:
DataStreamWriter with output mode set
"""
def trigger(self, **kwargs):
"""
Set trigger for streaming query execution.
Parameters:
- kwargs: Trigger options (processingTime, once, continuous)
Returns:
DataStreamWriter with trigger configured
"""
def start(self, path=None):
"""
Start the streaming query.
Parameters:
- path (str): Optional output path
Returns:
StreamingQuery representing the running query
"""
class StreamingQuery:
"""
Handle to a streaming query execution.
"""
@property
def id(self):
"""Unique identifier for this query."""
@property
def isActive(self):
"""Whether the query is currently active."""
def awaitTermination(self, timeout=None):
"""
Wait for query termination.
Parameters:
- timeout (int): Maximum time to wait (seconds)
Returns:
True if query terminated, False if timeout
"""
def stop(self):
"""Stop the streaming query."""
class StreamingQueryManager:
"""
Manager for streaming queries.
Accessed via SparkSession.streams property.
"""
@property
def active(self):
"""List of currently active streaming queries."""
def get(self, id):
"""
Get a streaming query by ID.
Parameters:
- id (str): Query ID
Returns:
StreamingQuery or None if not found
"""
def awaitAnyTermination(self, timeout=None):
"""
Wait for any streaming query to terminate.
Parameters:
- timeout (int): Maximum time to wait (seconds)
Returns:
True if any query terminated, False if timeout
"""Comprehensive built-in functions for data processing and transformation.
# Column creation and references
def col(col_name):
"""
Return a Column based on the given column name.
Parameters:
- col_name (str): Column name
Returns:
Column
"""
def column(col_name):
"""
Return a Column based on the given column name (alias for col).
Parameters:
- col_name (str): Column name
Returns:
Column
"""
def lit(col_value):
"""
Create a Column of literal value.
Parameters:
- col_value: Literal value
Returns:
Column with literal value
"""
def expr(str_expr):
"""
Parse SQL expression into a Column.
Parameters:
- str_expr (str): SQL expression string
Returns:
Column representing the expression
"""
# Mathematical functions
def abs(col):
"""Return the absolute value of a column."""
def acos(col):
"""Return the arc cosine of a column."""
def asin(col):
"""Return the arc sine of a column."""
def atan(col):
"""Return the arc tangent of a column."""
def cos(col):
"""Return the cosine of a column."""
def sin(col):
"""Return the sine of a column."""
def tan(col):
"""Return the tangent of a column."""
def sqrt(col):
"""Return the square root of a column."""
def exp(col):
"""Return the exponential of a column."""
def log(arg1, arg2=None):
"""Return the natural logarithm or logarithm with specified base."""
def pow(col1, col2):
"""Return col1 raised to the power of col2."""
def round(col, scale=0):
"""Round the given value to scale decimal places."""
def ceil(col):
"""Return the ceiling of a column."""
def floor(col):
"""Return the floor of a column."""
# String functions
def upper(col):
"""Convert a string column to uppercase."""
def lower(col):
"""Convert a string column to lowercase."""
def length(col):
"""Return the length of a string column."""
def trim(col):
"""Trim spaces from both ends of a string column."""
def ltrim(col):
"""Trim spaces from the left end of a string column."""
def rtrim(col):
"""Trim spaces from the right end of a string column."""
def concat(*cols):
"""Concatenate multiple string columns."""
def concat_ws(sep, *cols):
"""Concatenate multiple string columns with separator."""
def substring(str, pos, len):
"""Return substring of str from pos with specified length."""
def split(str, pattern, limit=-1):
"""Split str around matches of the given pattern."""
def regexp_extract(str, pattern, idx):
"""Extract a specific group matched by pattern from str."""
def regexp_replace(str, pattern, replacement):
"""Replace all substrings that match pattern with replacement."""
# Date and time functions
def current_date():
"""Return the current date as a date column."""
def current_timestamp():
"""Return the current timestamp as a timestamp column."""
def date_add(start, days):
"""Return the date that is days days after start."""
def date_sub(start, days):
"""Return the date that is days days before start."""
def datediff(end, start):
"""Return the number of days from start to end."""
def year(col):
"""Extract the year from a date/timestamp column."""
def month(col):
"""Extract the month from a date/timestamp column."""
def dayofmonth(col):
"""Extract the day of month from a date/timestamp column."""
def hour(col):
"""Extract the hour from a timestamp column."""
def minute(col):
"""Extract the minute from a timestamp column."""
def second(col):
"""Extract the second from a timestamp column."""
def date_format(date, format):
"""Convert a date/timestamp to a string with the given format."""
def to_date(col, format=None):
"""Convert a string column to a date column."""
def to_timestamp(col, format=None):
"""Convert a string column to a timestamp column."""
# Array functions
def array(*cols):
"""Create a new array column."""
def array_contains(col, value):
"""Return true if the array contains the given value."""
def size(col):
"""Return the size of an array or map column."""
def sort_array(col, asc=True):
"""Sort the input array in ascending or descending order."""
def reverse(col):
"""Return a reversed string or array."""
def slice(x, start, length):
"""Return a slice of the array from start index with specified length."""
def array_join(col, delimiter, null_replacement=None):
"""Concatenate array elements using delimiter."""
def explode(col):
"""Return a new row for each element in the array column."""
def posexplode(col):
"""Return a new row for each element with position in the array column."""
# Map functions
def create_map(*cols):
"""Create a map from key-value pairs."""
def map_keys(col):
"""Return the keys of a map column as an array."""
def map_values(col):
"""Return the values of a map column as an array."""
def map_from_arrays(col1, col2):
"""Create a map from key and value arrays."""
# Aggregate functions
def count(col):
"""Return the number of items in a group (including null values)."""
def countDistinct(col, *cols):
"""Return the number of distinct items in a group."""
def sum(col):
"""Return the sum of values in a group."""
def avg(col):
"""Return the average of values in a group."""
def mean(col):
"""Return the average of values in a group (alias for avg)."""
def max(col):
"""Return the maximum value in a group."""
def min(col):
"""Return the minimum value in a group."""
def first(col, ignorenulls=False):
"""Return the first value in a group."""
def last(col, ignorenulls=False):
"""Return the last value in a group."""
def stddev(col):
"""Return the sample standard deviation of values in a group."""
def stddev_pop(col):
"""Return the population standard deviation of values in a group."""
def variance(col):
"""Return the sample variance of values in a group."""
def var_pop(col):
"""Return the population variance of values in a group."""
def collect_list(col):
"""Return a list of objects with duplicates."""
def collect_set(col):
"""Return a set of objects with duplicate elements eliminated."""
# Window functions
def row_number():
"""Return a sequential number starting at 1 within a window partition."""
def rank():
"""Return the rank of rows within a window partition."""
def dense_rank():
"""Return the dense rank of rows within a window partition."""
def percent_rank():
"""Return the relative rank of rows within a window partition."""
def cume_dist():
"""Return the cumulative distribution of values within a window partition."""
def lag(col, offset=1, default=None):
"""Return the value that is offset rows before the current row."""
def lead(col, offset=1, default=None):
"""Return the value that is offset rows after the current row."""
def ntile(n):
"""Return the ntile group id (1-indexed) within a window partition."""
# Conditional functions
def when(condition, value):
"""Evaluate a list of conditions and return one of multiple possible result expressions."""
def coalesce(*cols):
"""Return the first non-null value among the given columns."""
def greatest(*cols):
"""Return the greatest value among the given columns."""
def least(*cols):
"""Return the least value among the given columns."""
def isnull(col):
"""Return true if the column is null."""
def isnan(col):
"""Return true if the column is NaN."""
# JSON functions
def from_json(col, schema, options=None):
"""Parse a column containing a JSON string."""
def to_json(col, options=None):
"""Convert a column containing a struct to a JSON string."""
def get_json_object(col, path):
"""Extract a JSON object from a JSON string."""
def json_tuple(col, *fields):
"""Return a tuple of JSON object based on the given fields."""
# Type conversion functions
def cast(col, dataType):
"""Convert the column to a different data type."""
# Null handling functions
def isNull(col):
"""Return true if the column is null."""
def isNotNull(col):
"""Return true if the column is not null."""
def dropna(how='any', thresh=None, subset=None):
"""Return a new DataFrame omitting rows with null values."""
def fillna(value, subset=None):
"""Replace null values with specified value."""
def replace(to_replace, value=None, subset=None):
"""Replace values matching keys in to_replace with corresponding values."""class Row:
"""A row of data in a DataFrame."""
def __init__(self, **kwargs):
"""Create a row with named arguments."""
def asDict(self, recursive=False):
"""
Return as a dict.
Parameters:
- recursive (bool): Turn nested rows to dict recursively
Returns:
Dict representation of the row
"""
class GroupedData:
"""A set of methods for aggregations on a DataFrame."""
def agg(self, *exprs):
"""Compute aggregates and return the result as a DataFrame."""
def count(self):
"""Count the number of rows for each group."""
def mean(self, *cols):
"""Compute the average value for each numeric column for each group."""
def avg(self, *cols):
"""Compute the average value for each numeric column for each group."""
def max(self, *cols):
"""Compute the max value for each numeric column for each group."""
def min(self, *cols):
"""Compute the min value for each numeric column for each group."""
def sum(self, *cols):
"""Compute the sum for each numeric column for each group."""
from pyspark.sql.types import *
class DataType:
"""Base class for data types."""
class NullType(DataType):
"""Null data type."""
class StringType(DataType):
"""String data type."""
class BinaryType(DataType):
"""Binary data type."""
class BooleanType(DataType):
"""Boolean data type."""
class DateType(DataType):
"""Date data type."""
class TimestampType(DataType):
"""Timestamp data type."""
class DecimalType(DataType):
"""Decimal data type with precision and scale."""
def __init__(self, precision=10, scale=0): ...
class DoubleType(DataType):
"""Double precision floating point data type."""
class FloatType(DataType):
"""Single precision floating point data type."""
class ByteType(DataType):
"""Byte integer data type."""
class IntegerType(DataType):
"""32-bit integer data type."""
class LongType(DataType):
"""64-bit integer data type."""
class ShortType(DataType):
"""16-bit integer data type."""
class ArrayType(DataType):
"""Array data type."""
def __init__(self, elementType, containsNull=True): ...
class MapType(DataType):
"""Map data type."""
def __init__(self, keyType, valueType, valueContainsNull=True): ...
class StructField:
"""A field in StructType."""
def __init__(self, name, dataType, nullable=True, metadata=None): ...
class StructType(DataType):
"""Struct data type representing a row."""
def __init__(self, fields=None): ...
def add(self, field, data_type=None, nullable=True, metadata=None): ...Install with Tessl CLI
npx tessl i tessl/pypi-pyspark