Apache Spark Streaming extension for scalable, fault-tolerant stream processing of live data streams
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming_2-13@3.5.00
# Apache Spark Streaming
1
2
Apache Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. It provides a micro-batching architecture for processing continuous streams with exactly-once semantics.
3
4
**⚠️ Deprecation Notice**: Apache Spark Streaming is deprecated as of Spark 3.4.0. Users should migrate to Structured Streaming for new applications.
5
6
## Package Information
7
8
- **Package Name**: spark-streaming_2.13
9
- **Package Type**: maven
10
- **Language**: Scala (with Java API)
11
- **Version**: 3.5.6
12
- **Installation**: Add to `pom.xml` or `build.sbt`
13
14
Maven:
15
```xml
16
<dependency>
17
<groupId>org.apache.spark</groupId>
18
<artifactId>spark-streaming_2.13</artifactId>
19
<version>3.5.6</version>
20
</dependency>
21
```
22
23
SBT:
24
```scala
25
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.6"
26
```
27
28
## Core Imports
29
30
```scala
31
import org.apache.spark.streaming._
32
import org.apache.spark.streaming.dstream._
33
import org.apache.spark.streaming.receiver._
34
import org.apache.spark.streaming.scheduler._
35
```
36
37
For Java API:
38
```java
39
import org.apache.spark.streaming.api.java.*;
40
import org.apache.spark.streaming.*;
41
import org.apache.spark.streaming.scheduler.*;
42
```
43
44
## Basic Usage
45
46
```scala
47
import org.apache.spark.{SparkConf, SparkContext}
48
import org.apache.spark.streaming._
49
50
// Create streaming context with 2-second batch interval
51
val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")
52
val ssc = new StreamingContext(conf, Seconds(2))
53
54
// Create input stream from socket
55
val lines = ssc.socketTextStream("localhost", 9999)
56
57
// Transform and process the stream
58
val words = lines.flatMap(_.split(" "))
59
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
60
61
// Output results
62
wordCounts.print()
63
64
// Start the streaming context
65
ssc.start()
66
ssc.awaitTermination()
67
```
68
69
## Architecture
70
71
Apache Spark Streaming processes live data streams using micro-batches:
72
73
- **StreamingContext**: Main entry point that coordinates stream processing
74
- **DStream**: Fundamental abstraction representing a continuous sequence of RDDs
75
- **Receivers**: Components that ingest data from external sources (Kafka, Flume, etc.)
76
- **Transformations**: Operations that create new DStreams from existing ones
77
- **Output Operations**: Actions that write DStream data to external systems
78
- **Checkpointing**: Fault tolerance mechanism for stateful operations
79
80
## Capabilities
81
82
### Core Streaming
83
84
Main streaming abstractions including StreamingContext, DStream operations, and time management classes for building streaming applications.
85
86
```scala { .api }
87
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {
88
def start(): Unit
89
def stop(stopSparkContext: Boolean = true): Unit
90
def awaitTermination(): Unit
91
}
92
93
abstract class DStream[T] {
94
def map[U](mapFunc: T => U): DStream[U]
95
def filter(filterFunc: T => Boolean): DStream[T]
96
def foreachRDD(func: RDD[T] => Unit): Unit
97
}
98
99
case class Duration(milliseconds: Long)
100
object Seconds { def apply(seconds: Long): Duration }
101
```
102
103
[Core Streaming](./core-streaming.md)
104
105
### Input Sources
106
107
Data ingestion capabilities for reading from various external sources including files, sockets, and message queues.
108
109
```scala { .api }
110
class StreamingContext {
111
def socketTextStream(hostname: String, port: Int): DStream[String]
112
def textFileStream(directory: String): DStream[String]
113
def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): DStream[(K, V)]
114
def queueStream[T](queue: Queue[RDD[T]]): DStream[T]
115
}
116
```
117
118
[Input Sources](./input-sources.md)
119
120
### State Management
121
122
Stateful processing capabilities for maintaining state across batches, including updateStateByKey and mapWithState operations.
123
124
```scala { .api }
125
abstract class State[S] {
126
def exists(): Boolean
127
def get(): S
128
def update(newState: S): Unit
129
def remove(): Unit
130
}
131
132
abstract class StateSpec[KeyType, ValueType, StateType, MappedType] {
133
def initialState(rdd: RDD[(KeyType, StateType)]): StateSpec[KeyType, ValueType, StateType, MappedType]
134
def timeout(timeout: Duration): StateSpec[KeyType, ValueType, StateType, MappedType]
135
}
136
```
137
138
[State Management](./state-management.md)
139
140
### Java API
141
142
Java-friendly wrappers for all streaming functionality, providing familiar Java interfaces and method signatures.
143
144
```java { .api }
145
public class JavaStreamingContext {
146
public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration);
147
public void start();
148
public void awaitTermination();
149
}
150
151
public class JavaDStream<T> {
152
public <R> JavaDStream<R> map(Function<T, R> f);
153
public JavaDStream<T> filter(Function<T, Boolean> f);
154
public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc);
155
}
156
```
157
158
[Java API](./java-api.md)
159
160
### Monitoring and Listeners
161
162
Comprehensive event system for monitoring streaming applications including batch processing, receiver status, and performance metrics.
163
164
```scala { .api }
165
trait StreamingListener {
166
def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit
167
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit
168
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit
169
}
170
171
case class BatchInfo(
172
batchTime: Time,
173
streamIdToInputInfo: Map[Int, StreamInputInfo],
174
submissionTime: Long,
175
processingStartTime: Option[Long],
176
processingEndTime: Option[Long]
177
)
178
```
179
180
[Monitoring and Listeners](./monitoring-listeners.md)
181
182
### Web UI
183
184
Built-in web interface for visualizing streaming application metrics, batch processing status, and receiver information.
185
186
```scala { .api }
187
class StreamingTab(parent: SparkUI)
188
class StreamingPage(parent: StreamingTab)
189
class StreamingJobProgressListener extends StreamingListener
190
```
191
192
[Web UI](./web-ui.md)
193
194
## Types
195
196
```scala { .api }
197
// Time and Duration
198
case class Time(milliseconds: Long) {
199
def +(that: Duration): Time
200
def -(that: Duration): Time
201
def -(that: Time): Duration
202
}
203
204
case class Duration(milliseconds: Long) {
205
def +(that: Duration): Duration
206
def -(that: Duration): Duration
207
def *(times: Int): Duration
208
def <(that: Duration): Boolean
209
def <=(that: Duration): Boolean
210
}
211
212
case class Interval(beginTime: Time, endTime: Time)
213
214
// State Management Types
215
sealed trait ValidationResult[T]
216
case class ValidationError(field: String, message: String)
217
218
// Streaming Context States
219
object StreamingContextState extends Enumeration {
220
val INITIALIZED, ACTIVE, STOPPED = Value
221
}
222
223
// Input Stream Information
224
case class StreamInputInfo(
225
inputStreamId: Int,
226
numRecords: Long,
227
metadata: Map[String, Any] = Map.empty
228
) {
229
def metadataDescription: Option[String]
230
}
231
```