CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyspark-streaming

PySpark Streaming module that enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

index.mddocs/

PySpark Streaming

PySpark Streaming enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python. Built on Apache Spark's core distributed computing capabilities, it provides comprehensive Python APIs for processing continuous streams of data using micro-batch processing with configurable batch intervals.

Package Information

  • Package Name: pyspark
  • Package Type: Python Package (PyPI)
  • Language: Python
  • Installation: pip install pyspark==2.4.8

Core Imports

For basic streaming functionality:

from pyspark.streaming import StreamingContext
from pyspark import SparkContext, SparkConf

For comprehensive streaming operations:

from pyspark.streaming import StreamingContext
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevel

Basic Usage

Simple Word Count Example

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext

# Create Spark configuration and context
conf = SparkConf().setAppName("WordCount").setMaster("local[2]")
sc = SparkContext(conf=conf)

# Create StreamingContext with 1 second batch interval
ssc = StreamingContext(sc, 1)

# Create DStream from socket
lines = ssc.socketTextStream("localhost", 9999)

# Transform and count words
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda a, b: a + b)

# Print results
wordCounts.pprint()

# Start streaming computation
ssc.start()
ssc.awaitTermination()

Text File Streaming Example

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext

# Create configuration and context
conf = SparkConf().setAppName("FileStreaming").setMaster("local[2]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

# Monitor directory for new text files
lines = ssc.textFileStream("/data/streaming")

# Process lines
processed = lines.filter(lambda line: len(line) > 0) \
                .map(lambda line: line.upper())

# Output results
processed.pprint(10)

# Start and wait
ssc.start()
ssc.awaitTermination()

Architecture

PySpark Streaming follows a micro-batch architecture:

  • StreamingContext: Main entry point managing the streaming application lifecycle
  • DStreams: Abstraction representing continuous sequence of RDDs (discretized streams)
  • Input Sources: File systems, socket connections, message queues, custom receivers
  • Transformations: Map, filter, reduce, window, join operations on streams
  • Output Operations: Actions that write results to external systems
  • Checkpointing: Fault recovery through metadata and data checkpointing
  • Batch Processing: Configurable intervals for micro-batch execution
  • Python Integration: Seamless integration with Python data processing libraries

Capabilities

Core Streaming Operations

Core functionality for creating and managing streaming contexts, with essential lifecycle operations.

Key APIs:

class StreamingContext(sparkContext, batchDuration)
def start()
def stop(stopSparkContext=True, stopGraceFully=False)
def awaitTermination(timeout=None)

Core Streaming Operations

Input Sources

Various methods for ingesting data streams from external sources including sockets, files, and queues.

Key APIs:

def socketTextStream(hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_2)
def textFileStream(directory)
def queueStream(rdds, oneAtATime=True, default=None)
def binaryRecordsStream(directory, recordLength)

Input Sources

DStream Transformations

Comprehensive transformation operations for processing streaming data including mapping, filtering, windowing, and aggregations.

Key APIs:

def map(f, preservesPartitioning=False)
def filter(f)
def flatMap(f, preservesPartitioning=False)
def reduceByKey(func, numPartitions=None)
def window(windowDuration, slideDuration=None)

DStream Transformations

Output Operations

Methods for writing processed stream data to external systems and triggering computations.

Key APIs:

def foreachRDD(func)
def pprint(num=10)
def saveAsTextFiles(prefix, suffix=None)

Output Operations

Stateful Operations

Advanced operations for maintaining state across streaming batches, including updateStateByKey.

Key APIs:

def updateStateByKey(updateFunc)
def transform(func)

Stateful Operations

Java API

Complete Java-friendly wrappers providing full feature parity with Scala APIs using Java Function interfaces for Apache Spark Streaming.

class JavaStreamingContext {
    public JavaStreamingContext(SparkConf conf, Duration batchDuration);
    public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration);
}

Java API

Python-Specific Features

Python-specific functionality and integration with Python ecosystem.

Key APIs:

def mapPartitions(f, preservesPartitioning=False)
def mapPartitionsWithIndex(f, preservesPartitioning=False)
def partitionBy(numPartitions, partitionFunc=portable_hash)

Context Management

PySpark Streaming provides context management and lifecycle operations:

def getOrCreate(checkpointPath, setupFunc)
def getActive()
def getActiveOrCreate(checkpointPath, setupFunc)
def remember(duration)
def checkpoint(directory)

Example context management:

ssc = StreamingContext.getOrCreate("/checkpoint/path", create_context)

Key Data Types and Utilities

Duration and Time

Durations are specified as numeric values (seconds) in Python:

# Batch intervals in seconds
batch_interval = 1  # 1 second
window_duration = 60  # 60 seconds (1 minute)
slide_duration = 30  # 30 seconds

Storage Levels

from pyspark.storagelevel import StorageLevel

StorageLevel.MEMORY_ONLY
StorageLevel.MEMORY_AND_DISK
StorageLevel.MEMORY_AND_DISK_2
StorageLevel.MEMORY_ONLY_SER

Configuration

Key configuration properties for PySpark Streaming:

  • spark.streaming.checkpoint.directory - Checkpoint directory path
  • spark.streaming.stopGracefullyOnShutdown - Graceful shutdown (default: false)
  • spark.streaming.unpersist - Auto-unpersist old RDDs (default: true)
  • spark.streaming.receiver.maxRate - Max records per second per receiver
  • spark.python.worker.memory - Memory per Python worker process

Error Handling

Streaming applications handle failures through:

  • Automatic restart: Failed tasks restart automatically
  • Checkpointing: Metadata and data recovery
  • Write-ahead logs: Reliable data storage for receivers
  • Driver recovery: Restart from checkpoint on driver failure

Install with Tessl CLI

npx tessl i tessl/pypi-pyspark-streaming@2.4.3

docs

core-operations.md

index.md

input-sources.md

java-api.md

output-operations.md

stateful-operations.md

transformations.md

tile.json