PySpark Streaming module that enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python
npx @tessl/cli install tessl/pypi-pyspark-streaming@2.4.00
# PySpark Streaming
1
2
PySpark Streaming enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python. Built on Apache Spark's core distributed computing capabilities, it provides comprehensive Python APIs for processing continuous streams of data using micro-batch processing with configurable batch intervals.
3
4
## Package Information
5
6
- **Package Name**: pyspark
7
- **Package Type**: Python Package (PyPI)
8
- **Language**: Python
9
- **Installation**: `pip install pyspark==2.4.8`
10
11
## Core Imports
12
13
For basic streaming functionality:
14
```python
15
from pyspark.streaming import StreamingContext
16
from pyspark import SparkContext, SparkConf
17
```
18
19
For comprehensive streaming operations:
20
```python
21
from pyspark.streaming import StreamingContext
22
from pyspark import SparkContext, SparkConf
23
from pyspark.storagelevel import StorageLevel
24
```
25
26
## Basic Usage
27
28
### Simple Word Count Example
29
30
```python
31
from pyspark import SparkContext, SparkConf
32
from pyspark.streaming import StreamingContext
33
34
# Create Spark configuration and context
35
conf = SparkConf().setAppName("WordCount").setMaster("local[2]")
36
sc = SparkContext(conf=conf)
37
38
# Create StreamingContext with 1 second batch interval
39
ssc = StreamingContext(sc, 1)
40
41
# Create DStream from socket
42
lines = ssc.socketTextStream("localhost", 9999)
43
44
# Transform and count words
45
words = lines.flatMap(lambda line: line.split(" "))
46
pairs = words.map(lambda word: (word, 1))
47
wordCounts = pairs.reduceByKey(lambda a, b: a + b)
48
49
# Print results
50
wordCounts.pprint()
51
52
# Start streaming computation
53
ssc.start()
54
ssc.awaitTermination()
55
```
56
57
### Text File Streaming Example
58
59
```python
60
from pyspark import SparkContext, SparkConf
61
from pyspark.streaming import StreamingContext
62
63
# Create configuration and context
64
conf = SparkConf().setAppName("FileStreaming").setMaster("local[2]")
65
sc = SparkContext(conf=conf)
66
ssc = StreamingContext(sc, 2)
67
68
# Monitor directory for new text files
69
lines = ssc.textFileStream("/data/streaming")
70
71
# Process lines
72
processed = lines.filter(lambda line: len(line) > 0) \
73
.map(lambda line: line.upper())
74
75
# Output results
76
processed.pprint(10)
77
78
# Start and wait
79
ssc.start()
80
ssc.awaitTermination()
81
```
82
83
## Architecture
84
85
PySpark Streaming follows a micro-batch architecture:
86
87
- **StreamingContext**: Main entry point managing the streaming application lifecycle
88
- **DStreams**: Abstraction representing continuous sequence of RDDs (discretized streams)
89
- **Input Sources**: File systems, socket connections, message queues, custom receivers
90
- **Transformations**: Map, filter, reduce, window, join operations on streams
91
- **Output Operations**: Actions that write results to external systems
92
- **Checkpointing**: Fault recovery through metadata and data checkpointing
93
- **Batch Processing**: Configurable intervals for micro-batch execution
94
- **Python Integration**: Seamless integration with Python data processing libraries
95
96
## Capabilities
97
98
### Core Streaming Operations
99
100
Core functionality for creating and managing streaming contexts, with essential lifecycle operations.
101
102
**Key APIs:**
103
```python { .api }
104
class StreamingContext(sparkContext, batchDuration)
105
def start()
106
def stop(stopSparkContext=True, stopGraceFully=False)
107
def awaitTermination(timeout=None)
108
```
109
110
[Core Streaming Operations](./core-operations.md)
111
112
### Input Sources
113
114
Various methods for ingesting data streams from external sources including sockets, files, and queues.
115
116
**Key APIs:**
117
```python { .api }
118
def socketTextStream(hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_2)
119
def textFileStream(directory)
120
def queueStream(rdds, oneAtATime=True, default=None)
121
def binaryRecordsStream(directory, recordLength)
122
```
123
124
[Input Sources](./input-sources.md)
125
126
### DStream Transformations
127
128
Comprehensive transformation operations for processing streaming data including mapping, filtering, windowing, and aggregations.
129
130
**Key APIs:**
131
```python { .api }
132
def map(f, preservesPartitioning=False)
133
def filter(f)
134
def flatMap(f, preservesPartitioning=False)
135
def reduceByKey(func, numPartitions=None)
136
def window(windowDuration, slideDuration=None)
137
```
138
139
[DStream Transformations](./transformations.md)
140
141
### Output Operations
142
143
Methods for writing processed stream data to external systems and triggering computations.
144
145
**Key APIs:**
146
```python { .api }
147
def foreachRDD(func)
148
def pprint(num=10)
149
def saveAsTextFiles(prefix, suffix=None)
150
```
151
152
[Output Operations](./output-operations.md)
153
154
### Stateful Operations
155
156
Advanced operations for maintaining state across streaming batches, including updateStateByKey.
157
158
**Key APIs:**
159
```python { .api }
160
def updateStateByKey(updateFunc)
161
def transform(func)
162
```
163
164
[Stateful Operations](./stateful-operations.md)
165
166
### Python-Specific Features
167
168
Python-specific functionality and integration with Python ecosystem.
169
170
**Key APIs:**
171
```python { .api }
172
def mapPartitions(f, preservesPartitioning=False)
173
def mapPartitionsWithIndex(f, preservesPartitioning=False)
174
def partitionBy(numPartitions, partitionFunc=portable_hash)
175
```
176
177
## Context Management
178
179
PySpark Streaming provides context management and lifecycle operations:
180
181
```python { .api }
182
def getOrCreate(checkpointPath, setupFunc)
183
def getActive()
184
def getActiveOrCreate(checkpointPath, setupFunc)
185
def remember(duration)
186
def checkpoint(directory)
187
```
188
189
Example context management:
190
```python
191
ssc = StreamingContext.getOrCreate("/checkpoint/path", create_context)
192
```
193
194
## Key Data Types and Utilities
195
196
### Duration and Time
197
Durations are specified as numeric values (seconds) in Python:
198
199
```python
200
# Batch intervals in seconds
201
batch_interval = 1 # 1 second
202
window_duration = 60 # 60 seconds (1 minute)
203
slide_duration = 30 # 30 seconds
204
```
205
206
### Storage Levels
207
```python { .api }
208
from pyspark.storagelevel import StorageLevel
209
210
StorageLevel.MEMORY_ONLY
211
StorageLevel.MEMORY_AND_DISK
212
StorageLevel.MEMORY_AND_DISK_2
213
StorageLevel.MEMORY_ONLY_SER
214
```
215
216
## Configuration
217
218
Key configuration properties for PySpark Streaming:
219
- `spark.streaming.checkpoint.directory` - Checkpoint directory path
220
- `spark.streaming.stopGracefullyOnShutdown` - Graceful shutdown (default: false)
221
- `spark.streaming.unpersist` - Auto-unpersist old RDDs (default: true)
222
- `spark.streaming.receiver.maxRate` - Max records per second per receiver
223
- `spark.python.worker.memory` - Memory per Python worker process
224
225
## Error Handling
226
227
Streaming applications handle failures through:
228
- **Automatic restart**: Failed tasks restart automatically
229
- **Checkpointing**: Metadata and data recovery
230
- **Write-ahead logs**: Reliable data storage for receivers
231
- **Driver recovery**: Restart from checkpoint on driver failure