or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-pyspark-streaming

PySpark Streaming module that enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pyspark@2.4.x

To install, run

npx @tessl/cli install tessl/pypi-pyspark-streaming@2.4.0

0

# PySpark Streaming

1

2

PySpark Streaming enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python. Built on Apache Spark's core distributed computing capabilities, it provides comprehensive Python APIs for processing continuous streams of data using micro-batch processing with configurable batch intervals.

3

4

## Package Information

5

6

- **Package Name**: pyspark

7

- **Package Type**: Python Package (PyPI)

8

- **Language**: Python

9

- **Installation**: `pip install pyspark==2.4.8`

10

11

## Core Imports

12

13

For basic streaming functionality:

14

```python

15

from pyspark.streaming import StreamingContext

16

from pyspark import SparkContext, SparkConf

17

```

18

19

For comprehensive streaming operations:

20

```python

21

from pyspark.streaming import StreamingContext

22

from pyspark import SparkContext, SparkConf

23

from pyspark.storagelevel import StorageLevel

24

```

25

26

## Basic Usage

27

28

### Simple Word Count Example

29

30

```python

31

from pyspark import SparkContext, SparkConf

32

from pyspark.streaming import StreamingContext

33

34

# Create Spark configuration and context

35

conf = SparkConf().setAppName("WordCount").setMaster("local[2]")

36

sc = SparkContext(conf=conf)

37

38

# Create StreamingContext with 1 second batch interval

39

ssc = StreamingContext(sc, 1)

40

41

# Create DStream from socket

42

lines = ssc.socketTextStream("localhost", 9999)

43

44

# Transform and count words

45

words = lines.flatMap(lambda line: line.split(" "))

46

pairs = words.map(lambda word: (word, 1))

47

wordCounts = pairs.reduceByKey(lambda a, b: a + b)

48

49

# Print results

50

wordCounts.pprint()

51

52

# Start streaming computation

53

ssc.start()

54

ssc.awaitTermination()

55

```

56

57

### Text File Streaming Example

58

59

```python

60

from pyspark import SparkContext, SparkConf

61

from pyspark.streaming import StreamingContext

62

63

# Create configuration and context

64

conf = SparkConf().setAppName("FileStreaming").setMaster("local[2]")

65

sc = SparkContext(conf=conf)

66

ssc = StreamingContext(sc, 2)

67

68

# Monitor directory for new text files

69

lines = ssc.textFileStream("/data/streaming")

70

71

# Process lines

72

processed = lines.filter(lambda line: len(line) > 0) \

73

.map(lambda line: line.upper())

74

75

# Output results

76

processed.pprint(10)

77

78

# Start and wait

79

ssc.start()

80

ssc.awaitTermination()

81

```

82

83

## Architecture

84

85

PySpark Streaming follows a micro-batch architecture:

86

87

- **StreamingContext**: Main entry point managing the streaming application lifecycle

88

- **DStreams**: Abstraction representing continuous sequence of RDDs (discretized streams)

89

- **Input Sources**: File systems, socket connections, message queues, custom receivers

90

- **Transformations**: Map, filter, reduce, window, join operations on streams

91

- **Output Operations**: Actions that write results to external systems

92

- **Checkpointing**: Fault recovery through metadata and data checkpointing

93

- **Batch Processing**: Configurable intervals for micro-batch execution

94

- **Python Integration**: Seamless integration with Python data processing libraries

95

96

## Capabilities

97

98

### Core Streaming Operations

99

100

Core functionality for creating and managing streaming contexts, with essential lifecycle operations.

101

102

**Key APIs:**

103

```python { .api }

104

class StreamingContext(sparkContext, batchDuration)

105

def start()

106

def stop(stopSparkContext=True, stopGraceFully=False)

107

def awaitTermination(timeout=None)

108

```

109

110

[Core Streaming Operations](./core-operations.md)

111

112

### Input Sources

113

114

Various methods for ingesting data streams from external sources including sockets, files, and queues.

115

116

**Key APIs:**

117

```python { .api }

118

def socketTextStream(hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_2)

119

def textFileStream(directory)

120

def queueStream(rdds, oneAtATime=True, default=None)

121

def binaryRecordsStream(directory, recordLength)

122

```

123

124

[Input Sources](./input-sources.md)

125

126

### DStream Transformations

127

128

Comprehensive transformation operations for processing streaming data including mapping, filtering, windowing, and aggregations.

129

130

**Key APIs:**

131

```python { .api }

132

def map(f, preservesPartitioning=False)

133

def filter(f)

134

def flatMap(f, preservesPartitioning=False)

135

def reduceByKey(func, numPartitions=None)

136

def window(windowDuration, slideDuration=None)

137

```

138

139

[DStream Transformations](./transformations.md)

140

141

### Output Operations

142

143

Methods for writing processed stream data to external systems and triggering computations.

144

145

**Key APIs:**

146

```python { .api }

147

def foreachRDD(func)

148

def pprint(num=10)

149

def saveAsTextFiles(prefix, suffix=None)

150

```

151

152

[Output Operations](./output-operations.md)

153

154

### Stateful Operations

155

156

Advanced operations for maintaining state across streaming batches, including updateStateByKey.

157

158

**Key APIs:**

159

```python { .api }

160

def updateStateByKey(updateFunc)

161

def transform(func)

162

```

163

164

[Stateful Operations](./stateful-operations.md)

165

166

### Python-Specific Features

167

168

Python-specific functionality and integration with Python ecosystem.

169

170

**Key APIs:**

171

```python { .api }

172

def mapPartitions(f, preservesPartitioning=False)

173

def mapPartitionsWithIndex(f, preservesPartitioning=False)

174

def partitionBy(numPartitions, partitionFunc=portable_hash)

175

```

176

177

## Context Management

178

179

PySpark Streaming provides context management and lifecycle operations:

180

181

```python { .api }

182

def getOrCreate(checkpointPath, setupFunc)

183

def getActive()

184

def getActiveOrCreate(checkpointPath, setupFunc)

185

def remember(duration)

186

def checkpoint(directory)

187

```

188

189

Example context management:

190

```python

191

ssc = StreamingContext.getOrCreate("/checkpoint/path", create_context)

192

```

193

194

## Key Data Types and Utilities

195

196

### Duration and Time

197

Durations are specified as numeric values (seconds) in Python:

198

199

```python

200

# Batch intervals in seconds

201

batch_interval = 1 # 1 second

202

window_duration = 60 # 60 seconds (1 minute)

203

slide_duration = 30 # 30 seconds

204

```

205

206

### Storage Levels

207

```python { .api }

208

from pyspark.storagelevel import StorageLevel

209

210

StorageLevel.MEMORY_ONLY

211

StorageLevel.MEMORY_AND_DISK

212

StorageLevel.MEMORY_AND_DISK_2

213

StorageLevel.MEMORY_ONLY_SER

214

```

215

216

## Configuration

217

218

Key configuration properties for PySpark Streaming:

219

- `spark.streaming.checkpoint.directory` - Checkpoint directory path

220

- `spark.streaming.stopGracefullyOnShutdown` - Graceful shutdown (default: false)

221

- `spark.streaming.unpersist` - Auto-unpersist old RDDs (default: true)

222

- `spark.streaming.receiver.maxRate` - Max records per second per receiver

223

- `spark.python.worker.memory` - Memory per Python worker process

224

225

## Error Handling

226

227

Streaming applications handle failures through:

228

- **Automatic restart**: Failed tasks restart automatically

229

- **Checkpointing**: Metadata and data recovery

230

- **Write-ahead logs**: Reliable data storage for receivers

231

- **Driver recovery**: Restart from checkpoint on driver failure