Python API for Apache Spark, providing distributed computing, data analysis, and machine learning capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Real-time data processing with structured streaming and DStreams for continuous data ingestion, processing, and output to various sinks. Enables processing of live data streams with fault tolerance and exactly-once semantics.
Main entry point for streaming applications using discretized streams (DStreams).
class StreamingContext:
"""Main entry point for Spark Streaming functionality."""
def __init__(self, sparkContext, batchDuration):
"""
Create StreamingContext.
Parameters:
- sparkContext (SparkContext): Spark context
- batchDuration: Batch duration for micro-batches
"""
def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
"""
Create input stream from TCP socket.
Parameters:
- hostname (str): Hostname to connect to
- port (int): Port to connect to
- storageLevel (StorageLevel): Storage level for received data
Returns:
DStream of strings
"""
def textFileStream(self, directory):
"""
Create input stream that monitors directory for new files.
Parameters:
- directory (str): Directory to monitor
Returns:
DStream of strings
"""
def queueStream(self, rdds, oneAtATime=True, default=None):
"""
Create input stream from queue of RDDs.
Parameters:
- rdds: Queue of RDDs
- oneAtATime (bool): Process one RDD at a time
- default: Default RDD if queue is empty
Returns:
DStream
"""
def start(self):
"""Start the streaming context."""
def stop(self, stopSparkContext=True, stopGraceFully=False):
"""
Stop the streaming context.
Parameters:
- stopSparkContext (bool): Whether to stop SparkContext
- stopGraceFully (bool): Whether to stop gracefully
"""
def awaitTermination(self, timeout=None):
"""Wait for the streaming context to terminate."""
def checkpoint(self, directory):
"""
Set checkpoint directory.
Parameters:
- directory (str): Checkpoint directory
"""
class DStream:
"""Discretized stream representing continuous stream of data."""
def map(self, f):
"""
Apply function to each element of the DStream.
Parameters:
- f: Function to apply
Returns:
New DStream
"""
def filter(self, f):
"""
Filter elements of the DStream.
Parameters:
- f: Filter function
Returns:
Filtered DStream
"""
def flatMap(self, f):
"""
Apply function and flatten results.
Parameters:
- f: Function returning iterable
Returns:
Flattened DStream
"""
def union(self, other):
"""
Union with another DStream.
Parameters:
- other (DStream): Another DStream
Returns:
Union DStream
"""
def reduce(self, f):
"""
Reduce elements using function.
Parameters:
- f: Reduce function
Returns:
DStream with reduced elements
"""
def reduceByKey(self, func, numPartitions=None):
"""
Reduce by key for paired DStream.
Parameters:
- func: Reduce function
- numPartitions (int): Number of partitions
Returns:
DStream with reduced values per key
"""
def groupByKey(self, numPartitions=None):
"""
Group by key for paired DStream.
Parameters:
- numPartitions (int): Number of partitions
Returns:
DStream with grouped values per key
"""
def countByValue(self):
"""Count occurrences of each element."""
def foreachRDD(self, func):
"""
Apply function to each RDD in the DStream.
Parameters:
- func: Function to apply to RDDs
"""
def saveAsTextFiles(self, prefix, suffix=None):
"""
Save DStream as text files.
Parameters:
- prefix (str): File prefix
- suffix (str): File suffix
"""
def window(self, windowDuration, slideDuration=None):
"""
Create windowed DStream.
Parameters:
- windowDuration: Window duration
- slideDuration: Slide duration
Returns:
Windowed DStream
"""
def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration):
"""
Reduce over sliding window.
Parameters:
- reduceFunc: Reduce function
- invReduceFunc: Inverse reduce function
- windowDuration: Window duration
- slideDuration: Slide duration
Returns:
DStream with windowed reductions
"""
def transform(self, func):
"""
Transform each RDD using function.
Parameters:
- func: Transformation function
Returns:
Transformed DStream
"""
def cache(self):
"""Cache the DStream."""
def checkpoint(self, interval):
"""
Enable checkpointing.
Parameters:
- interval: Checkpoint interval
"""
class StreamingListener:
"""Listener for streaming events."""
def onBatchCompleted(self, batchCompleted):
"""Called when batch processing completes."""
def onBatchStarted(self, batchStarted):
"""Called when batch processing starts."""
def onOutputOperationCompleted(self, outputOperationCompleted):
"""Called when output operation completes."""
def onOutputOperationStarted(self, outputOperationStarted):
"""Called when output operation starts."""
def onReceiverError(self, receiverError):
"""Called when receiver encounters error."""
def onReceiverStarted(self, receiverStarted):
"""Called when receiver starts."""
def onReceiverStopped(self, receiverStopped):
"""Called when receiver stops."""High-level streaming API built on DataFrames for continuous processing.
class DataStreamReader:
"""Interface for reading streaming data into DataFrames."""
def format(self, source):
"""
Specify data source format.
Parameters:
- source (str): Data source format
Returns:
DataStreamReader
"""
def option(self, key, value):
"""
Add input option.
Parameters:
- key (str): Option key
- value: Option value
Returns:
DataStreamReader
"""
def options(self, **options):
"""
Add input options.
Parameters:
- options: Keyword options
Returns:
DataStreamReader
"""
def schema(self, schema):
"""
Specify input schema.
Parameters:
- schema: Schema definition
Returns:
DataStreamReader
"""
def load(self, path=None, format=None, schema=None, **options):
"""
Load streaming data.
Parameters:
- path (str): Input path
- format (str): Data format
- schema: Input schema
- options: Additional options
Returns:
Streaming DataFrame
"""
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxFilesPerTrigger=None, latestFirst=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None):
"""Read CSV files as streaming DataFrame."""
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None, allowUnquotedControlChars=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, maxFilesPerTrigger=None, latestFirst=None):
"""Read JSON files as streaming DataFrame."""
def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, maxFilesPerTrigger=None, latestFirst=None):
"""Read Parquet files as streaming DataFrame."""
def text(self, path, wholetext=False, lineSep=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, maxFilesPerTrigger=None, latestFirst=None):
"""Read text files as streaming DataFrame."""
class DataStreamWriter:
"""Interface for writing streaming DataFrames."""
def outputMode(self, outputMode):
"""
Specify output mode.
Parameters:
- outputMode (str): Output mode ('append', 'complete', 'update')
Returns:
DataStreamWriter
"""
def format(self, source):
"""
Specify output format.
Parameters:
- source (str): Output format
Returns:
DataStreamWriter
"""
def option(self, key, value):
"""
Add output option.
Parameters:
- key (str): Option key
- value: Option value
Returns:
DataStreamWriter
"""
def options(self, **options):
"""
Add output options.
Parameters:
- options: Keyword options
Returns:
DataStreamWriter
"""
def partitionBy(self, *cols):
"""
Partition output by columns.
Parameters:
- cols: Partition columns
Returns:
DataStreamWriter
"""
def queryName(self, queryName):
"""
Specify query name.
Parameters:
- queryName (str): Query name
Returns:
DataStreamWriter
"""
def trigger(self, processingTime=None, once=None, continuous=None, availableNow=None):
"""
Set trigger for stream processing.
Parameters:
- processingTime (str): Processing time interval
- once (bool): Process once and stop
- continuous (str): Continuous processing interval
- availableNow (bool): Process available data and stop
Returns:
DataStreamWriter
"""
def start(self, path=None, format=None, outputMode=None, partitionBy=None, queryName=None, **options):
"""
Start streaming query.
Parameters:
- path (str): Output path
- format (str): Output format
- outputMode (str): Output mode
- partitionBy: Partition columns
- queryName (str): Query name
- options: Additional options
Returns:
StreamingQuery
"""
def foreach(self, f):
"""
Apply function to each row.
Parameters:
- f: Function to apply
Returns:
StreamingQuery
"""
def foreachBatch(self, func):
"""
Apply function to each micro-batch.
Parameters:
- func: Function to apply to batches
Returns:
StreamingQuery
"""
class StreamingQuery:
"""Handle for streaming query."""
@property
def id(self):
"""Unique identifier of the query."""
@property
def name(self):
"""Name of the query."""
@property
def isActive(self):
"""Whether the query is active."""
def start(self):
"""Start the query."""
def stop(self):
"""Stop the execution of the query."""
def awaitTermination(self, timeout=None):
"""
Wait for termination of the query.
Parameters:
- timeout (int): Timeout in seconds
"""
def processAllAvailable(self):
"""Block until all available data is processed."""
def lastProgress(self):
"""Progress information of the last trigger."""
def recentProgress(self):
"""Progress information of recent triggers."""
def status(self):
"""Current status of the query."""
def exception(self):
"""Exception that caused the query to stop."""class StreamingQueryException(Exception):
"""Exception thrown by streaming query."""
pass
class StreamingQueryStatus:
"""Status of a streaming query."""
pass
class StreamingQueryProgress:
"""Progress information of a streaming query."""
passInstall with Tessl CLI
npx tessl i tessl/pypi-pyspark