CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyspark-streaming

PySpark Streaming module that enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

PySpark Streaming

PySpark Streaming enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python. Built on Apache Spark's core distributed computing capabilities, it provides comprehensive Python APIs for processing continuous streams of data using micro-batch processing with configurable batch intervals.

Package Information

  • Package Name: pyspark
  • Package Type: Python Package (PyPI)
  • Language: Python
  • Installation: pip install pyspark==2.4.8

Core Imports

For basic streaming functionality:

from pyspark.streaming import StreamingContext
from pyspark import SparkContext, SparkConf

For comprehensive streaming operations:

from pyspark.streaming import StreamingContext
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevel

Basic Usage

Simple Word Count Example

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext

# Create Spark configuration and context
conf = SparkConf().setAppName("WordCount").setMaster("local[2]")
sc = SparkContext(conf=conf)

# Create StreamingContext with 1 second batch interval
ssc = StreamingContext(sc, 1)

# Create DStream from socket
lines = ssc.socketTextStream("localhost", 9999)

# Transform and count words
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda a, b: a + b)

# Print results
wordCounts.pprint()

# Start streaming computation
ssc.start()
ssc.awaitTermination()

Text File Streaming Example

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext

# Create configuration and context
conf = SparkConf().setAppName("FileStreaming").setMaster("local[2]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

# Monitor directory for new text files
lines = ssc.textFileStream("/data/streaming")

# Process lines
processed = lines.filter(lambda line: len(line) > 0) \
                .map(lambda line: line.upper())

# Output results
processed.pprint(10)

# Start and wait
ssc.start()
ssc.awaitTermination()

Architecture

PySpark Streaming follows a micro-batch architecture:

  • StreamingContext: Main entry point managing the streaming application lifecycle
  • DStreams: Abstraction representing continuous sequence of RDDs (discretized streams)
  • Input Sources: File systems, socket connections, message queues, custom receivers
  • Transformations: Map, filter, reduce, window, join operations on streams
  • Output Operations: Actions that write results to external systems
  • Checkpointing: Fault recovery through metadata and data checkpointing
  • Batch Processing: Configurable intervals for micro-batch execution
  • Python Integration: Seamless integration with Python data processing libraries

Capabilities

Core Streaming Operations

Core functionality for creating and managing streaming contexts, with essential lifecycle operations.

Key APIs:

class StreamingContext(sparkContext, batchDuration)
def start()
def stop(stopSparkContext=True, stopGraceFully=False)
def awaitTermination(timeout=None)

Core Streaming Operations

Input Sources

Various methods for ingesting data streams from external sources including sockets, files, and queues.

Key APIs:

def socketTextStream(hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_2)
def textFileStream(directory)
def queueStream(rdds, oneAtATime=True, default=None)
def binaryRecordsStream(directory, recordLength)

Input Sources

DStream Transformations

Comprehensive transformation operations for processing streaming data including mapping, filtering, windowing, and aggregations.

Key APIs:

def map(f, preservesPartitioning=False)
def filter(f)
def flatMap(f, preservesPartitioning=False)
def reduceByKey(func, numPartitions=None)
def window(windowDuration, slideDuration=None)

DStream Transformations

Output Operations

Methods for writing processed stream data to external systems and triggering computations.

Key APIs:

def foreachRDD(func)
def pprint(num=10)
def saveAsTextFiles(prefix, suffix=None)

Output Operations

Stateful Operations

Advanced operations for maintaining state across streaming batches, including updateStateByKey.

Key APIs:

def updateStateByKey(updateFunc)
def transform(func)

Stateful Operations

Java API

Complete Java-friendly wrappers providing full feature parity with Scala APIs using Java Function interfaces for Apache Spark Streaming.

class JavaStreamingContext {
    public JavaStreamingContext(SparkConf conf, Duration batchDuration);
    public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration);
}

Java API

Python-Specific Features

Python-specific functionality and integration with Python ecosystem.

Key APIs:

def mapPartitions(f, preservesPartitioning=False)
def mapPartitionsWithIndex(f, preservesPartitioning=False)
def partitionBy(numPartitions, partitionFunc=portable_hash)

Context Management

PySpark Streaming provides context management and lifecycle operations:

def getOrCreate(checkpointPath, setupFunc)
def getActive()
def getActiveOrCreate(checkpointPath, setupFunc)
def remember(duration)
def checkpoint(directory)

Example context management:

ssc = StreamingContext.getOrCreate("/checkpoint/path", create_context)

Key Data Types and Utilities

Duration and Time

Durations are specified as numeric values (seconds) in Python:

# Batch intervals in seconds
batch_interval = 1  # 1 second
window_duration = 60  # 60 seconds (1 minute)
slide_duration = 30  # 30 seconds

Storage Levels

from pyspark.storagelevel import StorageLevel

StorageLevel.MEMORY_ONLY
StorageLevel.MEMORY_AND_DISK
StorageLevel.MEMORY_AND_DISK_2
StorageLevel.MEMORY_ONLY_SER

Configuration

Key configuration properties for PySpark Streaming:

  • spark.streaming.checkpoint.directory - Checkpoint directory path
  • spark.streaming.stopGracefullyOnShutdown - Graceful shutdown (default: false)
  • spark.streaming.unpersist - Auto-unpersist old RDDs (default: true)
  • spark.streaming.receiver.maxRate - Max records per second per receiver
  • spark.python.worker.memory - Memory per Python worker process

Error Handling

Streaming applications handle failures through:

  • Automatic restart: Failed tasks restart automatically
  • Checkpointing: Metadata and data recovery
  • Write-ahead logs: Reliable data storage for receivers
  • Driver recovery: Restart from checkpoint on driver failure

Install with Tessl CLI

npx tessl i tessl/pypi-pyspark-streaming
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pyspark@2.4.x
Publish Source
CLI
Badge
tessl/pypi-pyspark-streaming badge