PySpark Streaming module that enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
PySpark Streaming enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python. Built on Apache Spark's core distributed computing capabilities, it provides comprehensive Python APIs for processing continuous streams of data using micro-batch processing with configurable batch intervals.
pip install pyspark==2.4.8For basic streaming functionality:
from pyspark.streaming import StreamingContext
from pyspark import SparkContext, SparkConfFor comprehensive streaming operations:
from pyspark.streaming import StreamingContext
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevelfrom pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
# Create Spark configuration and context
conf = SparkConf().setAppName("WordCount").setMaster("local[2]")
sc = SparkContext(conf=conf)
# Create StreamingContext with 1 second batch interval
ssc = StreamingContext(sc, 1)
# Create DStream from socket
lines = ssc.socketTextStream("localhost", 9999)
# Transform and count words
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda a, b: a + b)
# Print results
wordCounts.pprint()
# Start streaming computation
ssc.start()
ssc.awaitTermination()from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
# Create configuration and context
conf = SparkConf().setAppName("FileStreaming").setMaster("local[2]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)
# Monitor directory for new text files
lines = ssc.textFileStream("/data/streaming")
# Process lines
processed = lines.filter(lambda line: len(line) > 0) \
.map(lambda line: line.upper())
# Output results
processed.pprint(10)
# Start and wait
ssc.start()
ssc.awaitTermination()PySpark Streaming follows a micro-batch architecture:
Core functionality for creating and managing streaming contexts, with essential lifecycle operations.
Key APIs:
class StreamingContext(sparkContext, batchDuration)
def start()
def stop(stopSparkContext=True, stopGraceFully=False)
def awaitTermination(timeout=None)Various methods for ingesting data streams from external sources including sockets, files, and queues.
Key APIs:
def socketTextStream(hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_2)
def textFileStream(directory)
def queueStream(rdds, oneAtATime=True, default=None)
def binaryRecordsStream(directory, recordLength)Comprehensive transformation operations for processing streaming data including mapping, filtering, windowing, and aggregations.
Key APIs:
def map(f, preservesPartitioning=False)
def filter(f)
def flatMap(f, preservesPartitioning=False)
def reduceByKey(func, numPartitions=None)
def window(windowDuration, slideDuration=None)Methods for writing processed stream data to external systems and triggering computations.
Key APIs:
def foreachRDD(func)
def pprint(num=10)
def saveAsTextFiles(prefix, suffix=None)Advanced operations for maintaining state across streaming batches, including updateStateByKey.
Key APIs:
def updateStateByKey(updateFunc)
def transform(func)Complete Java-friendly wrappers providing full feature parity with Scala APIs using Java Function interfaces for Apache Spark Streaming.
class JavaStreamingContext {
public JavaStreamingContext(SparkConf conf, Duration batchDuration);
public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration);
}Python-specific functionality and integration with Python ecosystem.
Key APIs:
def mapPartitions(f, preservesPartitioning=False)
def mapPartitionsWithIndex(f, preservesPartitioning=False)
def partitionBy(numPartitions, partitionFunc=portable_hash)PySpark Streaming provides context management and lifecycle operations:
def getOrCreate(checkpointPath, setupFunc)
def getActive()
def getActiveOrCreate(checkpointPath, setupFunc)
def remember(duration)
def checkpoint(directory)Example context management:
ssc = StreamingContext.getOrCreate("/checkpoint/path", create_context)Durations are specified as numeric values (seconds) in Python:
# Batch intervals in seconds
batch_interval = 1 # 1 second
window_duration = 60 # 60 seconds (1 minute)
slide_duration = 30 # 30 secondsfrom pyspark.storagelevel import StorageLevel
StorageLevel.MEMORY_ONLY
StorageLevel.MEMORY_AND_DISK
StorageLevel.MEMORY_AND_DISK_2
StorageLevel.MEMORY_ONLY_SERKey configuration properties for PySpark Streaming:
spark.streaming.checkpoint.directory - Checkpoint directory pathspark.streaming.stopGracefullyOnShutdown - Graceful shutdown (default: false)spark.streaming.unpersist - Auto-unpersist old RDDs (default: true)spark.streaming.receiver.maxRate - Max records per second per receiverspark.python.worker.memory - Memory per Python worker processStreaming applications handle failures through:
Install with Tessl CLI
npx tessl i tessl/pypi-pyspark-streaming