0
# Apache Spark Streaming
1
2
Apache Spark Streaming is a scalable fault-tolerant streaming processing system built on Apache Spark that enables processing of live data streams. It provides high-level abstractions like DStream (discretized stream) that represents a continuous sequence of RDDs, offering fault-tolerance through lineage-based recovery and integration with Spark's batch processing capabilities.
3
4
**⚠️ Deprecation Notice:** Spark Streaming (DStreams) is deprecated as of Apache Spark 3.4.0. Users are strongly encouraged to migrate to [Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html), which provides a more modern streaming API with better performance, late data handling, and exactly-once processing guarantees.
5
6
## Package Information
7
8
- **Package Name**: org.apache.spark:spark-streaming_2.12
9
- **Package Type**: Maven
10
- **Language**: Scala
11
- **Installation**: Add to `pom.xml`:
12
```xml
13
<dependency>
14
<groupId>org.apache.spark</groupId>
15
<artifactId>spark-streaming_2.12</artifactId>
16
<version>3.5.6</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```scala
23
import org.apache.spark._
24
import org.apache.spark.streaming._
25
import org.apache.spark.streaming.dstream._
26
```
27
28
## Basic Usage
29
30
```scala
31
import org.apache.spark._
32
import org.apache.spark.streaming._
33
34
// Create StreamingContext with 2 second batch interval
35
val conf = new SparkConf().setAppName("StreamingApp")
36
val ssc = new StreamingContext(conf, Seconds(2))
37
38
// Create DStream from text files
39
val lines = ssc.textFileStream("/path/to/directory")
40
41
// Transform the data
42
val words = lines.flatMap(_.split(" "))
43
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
44
45
// Output the results
46
wordCounts.print()
47
48
// Start the streaming context
49
ssc.start()
50
ssc.awaitTermination()
51
```
52
53
## Architecture
54
55
Spark Streaming is built around several key components:
56
57
- **StreamingContext**: Main entry point for creating DStreams from various input sources
58
- **DStream**: Discretized stream representing a continuous sequence of RDDs
59
- **Micro-batching**: Divides live streams into small batches processed by Spark engine
60
- **Fault Tolerance**: Lineage-based recovery and checkpointing for stateful operations
61
- **Write-Ahead Log (WAL)**: Optional reliability mechanism for receiver-based sources
62
- **Receivers**: Background tasks running on worker nodes to receive external data
63
- **Rate Controller**: Backpressure mechanism to prevent overwhelming the system
64
- **Block Manager**: Manages storage of received data blocks across cluster nodes
65
- **Integration**: Seamless integration with Spark SQL, MLlib, and GraphX
66
67
## Capabilities
68
69
### Core Streaming Operations
70
71
Primary streaming functionality including StreamingContext creation, DStream transformations, and output operations. Essential for all streaming applications.
72
73
```scala { .api }
74
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {
75
def start(): Unit
76
def stop(): Unit
77
def awaitTermination(): Unit
78
def checkpoint(directory: String): Unit
79
}
80
81
abstract class DStream[T] {
82
def map[U](mapFunc: T => U): DStream[U]
83
def filter(filterFunc: T => Boolean): DStream[T]
84
def print(): Unit
85
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
86
}
87
```
88
89
[Core Streaming Operations](./core-streaming.md)
90
91
### Input Sources and Data Ingestion
92
93
Comprehensive data ingestion capabilities from files, sockets, queues, and custom sources. Supports both receiver-based and direct stream approaches.
94
95
```scala { .api }
96
class StreamingContext {
97
def textFileStream(directory: String): DStream[String]
98
def socketTextStream(hostname: String, port: Int): ReceiverInputDStream[String]
99
def queueStream[T](queue: Queue[RDD[T]]): InputDStream[T]
100
def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T]
101
}
102
103
abstract class Receiver[T](storageLevel: StorageLevel) {
104
def onStart(): Unit
105
def onStop(): Unit
106
def store(dataItem: T): Unit
107
}
108
```
109
110
[Input Sources and Data Ingestion](./input-sources.md)
111
112
### Advanced Transformations and Windowing
113
114
Windowed operations, stateful transformations, and advanced data processing patterns for complex streaming analytics.
115
116
```scala { .api }
117
class DStream[T] {
118
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
119
def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
120
}
121
122
class PairDStreamFunctions[K, V] {
123
def updateStateByKey[S](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
124
def mapWithState[StateType, MappedType](spec: StateSpec[K, V, StateType, MappedType]): MapWithStateDStream[K, V, StateType, MappedType]
125
}
126
```
127
128
[Advanced Transformations and Windowing](./advanced-transformations.md)
129
130
### Java API Integration
131
132
Java-friendly wrappers providing seamless integration for Java applications with lambda expressions and Java collections support.
133
134
```scala { .api }
135
class JavaStreamingContext(sparkContext: JavaSparkContext, batchDuration: Duration) {
136
def socketTextStream(hostname: String, port: Int): JavaReceiverInputDStream[String]
137
def textFileStream(directory: String): JavaDStream[String]
138
}
139
140
class JavaDStream[T] {
141
def map[U](f: JFunction[T, U]): JavaDStream[U]
142
def filter(f: JFunction[T, Boolean]): JavaDStream[T]
143
}
144
```
145
146
[Java API Integration](./java-api.md)
147
148
## Core Types
149
150
```scala { .api }
151
case class Duration(milliseconds: Long) {
152
def +(that: Duration): Duration
153
def -(that: Duration): Duration
154
def *(times: Int): Duration
155
def isMultipleOf(that: Duration): Boolean
156
}
157
158
object Seconds {
159
def apply(seconds: Long): Duration
160
}
161
162
object Minutes {
163
def apply(minutes: Long): Duration
164
}
165
166
case class Time(milliseconds: Long) {
167
def +(that: Duration): Time
168
def -(that: Time): Duration
169
def floor(that: Duration): Time
170
}
171
172
case class Interval(beginTime: Time, endTime: Time) {
173
def duration(): Duration
174
}
175
176
sealed trait StreamingContextState
177
object StreamingContextState {
178
case object INITIALIZED extends StreamingContextState
179
case object ACTIVE extends StreamingContextState
180
case object STOPPED extends StreamingContextState
181
}
182
183
sealed abstract class StorageLevel extends Serializable {
184
def useDisk: Boolean
185
def useMemory: Boolean
186
def useOffHeap: Boolean
187
def deserialized: Boolean
188
def replication: Int
189
}
190
191
object StorageLevel {
192
val NONE: StorageLevel
193
val DISK_ONLY: StorageLevel
194
val DISK_ONLY_2: StorageLevel
195
val MEMORY_ONLY: StorageLevel
196
val MEMORY_ONLY_2: StorageLevel
197
val MEMORY_ONLY_SER: StorageLevel
198
val MEMORY_ONLY_SER_2: StorageLevel
199
val MEMORY_AND_DISK: StorageLevel
200
val MEMORY_AND_DISK_2: StorageLevel
201
val MEMORY_AND_DISK_SER: StorageLevel
202
val MEMORY_AND_DISK_SER_2: StorageLevel
203
val OFF_HEAP: StorageLevel
204
}
205
206
abstract class StreamingListener {
207
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {}
208
def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {}
209
def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {}
210
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {}
211
def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {}
212
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {}
213
def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {}
214
def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {}
215
def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit = {}
216
}
217
```
218
219
## Error Handling
220
221
Spark Streaming applications can handle errors through:
222
223
- **Receiver Error Handling**: `Receiver.reportError()` for custom receiver failures
224
- **DStream Failure Recovery**: Automatic lineage-based RDD recovery
225
- **Checkpoint Recovery**: `StreamingContext.getOrCreate()` for driver failure recovery
226
- **Custom Error Processing**: Using `DStream.foreachRDD()` with try-catch blocks
227
228
Common exceptions include:
229
- `IllegalArgumentException`: Invalid configuration parameters
230
- `SparkException`: General Spark runtime errors
231
- `StreamingQueryException`: Streaming-specific runtime errors