CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyspark

Python API for Apache Spark, providing distributed computing, data analysis, and machine learning capabilities

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

streaming.mddocs/

Streaming

Real-time data processing with structured streaming and DStreams for continuous data ingestion, processing, and output to various sinks. Enables processing of live data streams with fault tolerance and exactly-once semantics.

Capabilities

Streaming Context

Main entry point for streaming applications using discretized streams (DStreams).

class StreamingContext:
    """Main entry point for Spark Streaming functionality."""
    
    def __init__(self, sparkContext, batchDuration):
        """
        Create StreamingContext.
        
        Parameters:
        - sparkContext (SparkContext): Spark context
        - batchDuration: Batch duration for micro-batches
        """
    
    def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
        """
        Create input stream from TCP socket.
        
        Parameters:
        - hostname (str): Hostname to connect to
        - port (int): Port to connect to  
        - storageLevel (StorageLevel): Storage level for received data
        
        Returns:
        DStream of strings
        """
    
    def textFileStream(self, directory):
        """
        Create input stream that monitors directory for new files.
        
        Parameters:
        - directory (str): Directory to monitor
        
        Returns:
        DStream of strings
        """
    
    def queueStream(self, rdds, oneAtATime=True, default=None):
        """
        Create input stream from queue of RDDs.
        
        Parameters:
        - rdds: Queue of RDDs
        - oneAtATime (bool): Process one RDD at a time
        - default: Default RDD if queue is empty
        
        Returns:
        DStream
        """
    
    def start(self):
        """Start the streaming context."""
    
    def stop(self, stopSparkContext=True, stopGraceFully=False):
        """
        Stop the streaming context.
        
        Parameters:
        - stopSparkContext (bool): Whether to stop SparkContext
        - stopGraceFully (bool): Whether to stop gracefully
        """
    
    def awaitTermination(self, timeout=None):
        """Wait for the streaming context to terminate."""
    
    def checkpoint(self, directory):
        """
        Set checkpoint directory.
        
        Parameters:
        - directory (str): Checkpoint directory
        """

class DStream:
    """Discretized stream representing continuous stream of data."""
    
    def map(self, f):
        """
        Apply function to each element of the DStream.
        
        Parameters:
        - f: Function to apply
        
        Returns:
        New DStream
        """
    
    def filter(self, f):
        """
        Filter elements of the DStream.
        
        Parameters:
        - f: Filter function
        
        Returns:
        Filtered DStream
        """
    
    def flatMap(self, f):
        """
        Apply function and flatten results.
        
        Parameters:
        - f: Function returning iterable
        
        Returns:
        Flattened DStream
        """
    
    def union(self, other):
        """
        Union with another DStream.
        
        Parameters:
        - other (DStream): Another DStream
        
        Returns:
        Union DStream
        """
    
    def reduce(self, f):
        """
        Reduce elements using function.
        
        Parameters:
        - f: Reduce function
        
        Returns:
        DStream with reduced elements
        """
    
    def reduceByKey(self, func, numPartitions=None):
        """
        Reduce by key for paired DStream.
        
        Parameters:
        - func: Reduce function
        - numPartitions (int): Number of partitions
        
        Returns:
        DStream with reduced values per key
        """
    
    def groupByKey(self, numPartitions=None):
        """
        Group by key for paired DStream.
        
        Parameters:
        - numPartitions (int): Number of partitions
        
        Returns:
        DStream with grouped values per key
        """
    
    def countByValue(self):
        """Count occurrences of each element."""
    
    def foreachRDD(self, func):
        """
        Apply function to each RDD in the DStream.
        
        Parameters:
        - func: Function to apply to RDDs
        """
    
    def saveAsTextFiles(self, prefix, suffix=None):
        """
        Save DStream as text files.
        
        Parameters:
        - prefix (str): File prefix
        - suffix (str): File suffix
        """
    
    def window(self, windowDuration, slideDuration=None):
        """
        Create windowed DStream.
        
        Parameters:
        - windowDuration: Window duration
        - slideDuration: Slide duration
        
        Returns:
        Windowed DStream
        """
    
    def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration):
        """
        Reduce over sliding window.
        
        Parameters:
        - reduceFunc: Reduce function
        - invReduceFunc: Inverse reduce function  
        - windowDuration: Window duration
        - slideDuration: Slide duration
        
        Returns:
        DStream with windowed reductions
        """
    
    def transform(self, func):
        """
        Transform each RDD using function.
        
        Parameters:
        - func: Transformation function
        
        Returns:
        Transformed DStream
        """
    
    def cache(self):
        """Cache the DStream."""
    
    def checkpoint(self, interval):
        """
        Enable checkpointing.
        
        Parameters:
        - interval: Checkpoint interval
        """

class StreamingListener:
    """Listener for streaming events."""
    
    def onBatchCompleted(self, batchCompleted):
        """Called when batch processing completes."""
    
    def onBatchStarted(self, batchStarted):
        """Called when batch processing starts."""
    
    def onOutputOperationCompleted(self, outputOperationCompleted):
        """Called when output operation completes."""
    
    def onOutputOperationStarted(self, outputOperationStarted):
        """Called when output operation starts."""
    
    def onReceiverError(self, receiverError):
        """Called when receiver encounters error."""
    
    def onReceiverStarted(self, receiverStarted):
        """Called when receiver starts."""
    
    def onReceiverStopped(self, receiverStopped):
        """Called when receiver stops."""

Structured Streaming

High-level streaming API built on DataFrames for continuous processing.

class DataStreamReader:
    """Interface for reading streaming data into DataFrames."""
    
    def format(self, source):
        """
        Specify data source format.
        
        Parameters:
        - source (str): Data source format
        
        Returns:
        DataStreamReader
        """
    
    def option(self, key, value):
        """
        Add input option.
        
        Parameters:
        - key (str): Option key
        - value: Option value
        
        Returns:
        DataStreamReader
        """
    
    def options(self, **options):
        """
        Add input options.
        
        Parameters:
        - options: Keyword options
        
        Returns:
        DataStreamReader
        """
    
    def schema(self, schema):
        """
        Specify input schema.
        
        Parameters:
        - schema: Schema definition
        
        Returns:
        DataStreamReader
        """
    
    def load(self, path=None, format=None, schema=None, **options):
        """
        Load streaming data.
        
        Parameters:
        - path (str): Input path
        - format (str): Data format
        - schema: Input schema
        - options: Additional options
        
        Returns:
        Streaming DataFrame
        """
    
    def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxFilesPerTrigger=None, latestFirst=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None):
        """Read CSV files as streaming DataFrame."""
    
    def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None, allowUnquotedControlChars=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, maxFilesPerTrigger=None, latestFirst=None):
        """Read JSON files as streaming DataFrame."""
    
    def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, maxFilesPerTrigger=None, latestFirst=None):
        """Read Parquet files as streaming DataFrame."""
    
    def text(self, path, wholetext=False, lineSep=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, maxFilesPerTrigger=None, latestFirst=None):
        """Read text files as streaming DataFrame."""

class DataStreamWriter:
    """Interface for writing streaming DataFrames."""
    
    def outputMode(self, outputMode):
        """
        Specify output mode.
        
        Parameters:
        - outputMode (str): Output mode ('append', 'complete', 'update')
        
        Returns:
        DataStreamWriter
        """
    
    def format(self, source):
        """
        Specify output format.
        
        Parameters:
        - source (str): Output format
        
        Returns:
        DataStreamWriter
        """
    
    def option(self, key, value):
        """
        Add output option.
        
        Parameters:
        - key (str): Option key
        - value: Option value
        
        Returns:
        DataStreamWriter
        """
    
    def options(self, **options):
        """
        Add output options.
        
        Parameters:
        - options: Keyword options
        
        Returns:
        DataStreamWriter
        """
    
    def partitionBy(self, *cols):
        """
        Partition output by columns.
        
        Parameters:
        - cols: Partition columns
        
        Returns:
        DataStreamWriter
        """
    
    def queryName(self, queryName):
        """
        Specify query name.
        
        Parameters:
        - queryName (str): Query name
        
        Returns:
        DataStreamWriter
        """
    
    def trigger(self, processingTime=None, once=None, continuous=None, availableNow=None):
        """
        Set trigger for stream processing.
        
        Parameters:
        - processingTime (str): Processing time interval
        - once (bool): Process once and stop
        - continuous (str): Continuous processing interval
        - availableNow (bool): Process available data and stop
        
        Returns:
        DataStreamWriter
        """
    
    def start(self, path=None, format=None, outputMode=None, partitionBy=None, queryName=None, **options):
        """
        Start streaming query.
        
        Parameters:
        - path (str): Output path
        - format (str): Output format
        - outputMode (str): Output mode
        - partitionBy: Partition columns
        - queryName (str): Query name
        - options: Additional options
        
        Returns:
        StreamingQuery
        """
    
    def foreach(self, f):
        """
        Apply function to each row.
        
        Parameters:
        - f: Function to apply
        
        Returns:
        StreamingQuery
        """
    
    def foreachBatch(self, func):
        """
        Apply function to each micro-batch.
        
        Parameters:
        - func: Function to apply to batches
        
        Returns:
        StreamingQuery
        """

class StreamingQuery:
    """Handle for streaming query."""
    
    @property
    def id(self):
        """Unique identifier of the query."""
    
    @property
    def name(self):
        """Name of the query."""
    
    @property
    def isActive(self):
        """Whether the query is active."""
    
    def start(self):
        """Start the query."""
    
    def stop(self):
        """Stop the execution of the query."""
    
    def awaitTermination(self, timeout=None):
        """
        Wait for termination of the query.
        
        Parameters:
        - timeout (int): Timeout in seconds
        """
    
    def processAllAvailable(self):
        """Block until all available data is processed."""
    
    def lastProgress(self):
        """Progress information of the last trigger."""
    
    def recentProgress(self):
        """Progress information of recent triggers."""
    
    def status(self):
        """Current status of the query."""
    
    def exception(self):
        """Exception that caused the query to stop."""

Types

class StreamingQueryException(Exception):
    """Exception thrown by streaming query."""
    pass

class StreamingQueryStatus:
    """Status of a streaming query."""
    pass

class StreamingQueryProgress:
    """Progress information of a streaming query."""
    pass

Install with Tessl CLI

npx tessl i tessl/pypi-pyspark

docs

core-context-rdds.md

index.md

legacy-mllib.md

machine-learning.md

pandas-api.md

resource-management.md

sql-dataframes.md

streaming.md

tile.json