0
# Apache Spark Streaming
1
2
Apache Spark Streaming is a scalable, high-throughput, fault-tolerant stream processing library built on Apache Spark Core that enables processing of live data streams from sources like Kafka, Flume, Kinesis, or TCP sockets. It provides high-level APIs in Scala, Java, and Python for building streaming applications using functional transformations like map, reduce, join, and window operations on DStreams (discretized streams).
3
4
## Package Information
5
6
- **Package Name**: org.apache.spark:spark-streaming_2.11
7
- **Package Type**: Maven
8
- **Language**: Scala (with Java API)
9
- **Installation**: Add dependency to your Maven `pom.xml` or SBT `build.sbt`
10
11
**Maven:**
12
```xml
13
<dependency>
14
<groupId>org.apache.spark</groupId>
15
<artifactId>spark-streaming_2.11</artifactId>
16
<version>2.4.8</version>
17
</dependency>
18
```
19
20
**SBT:**
21
```scala
22
"org.apache.spark" %% "spark-streaming" % "2.4.8"
23
```
24
25
## Core Imports
26
27
**Scala:**
28
```scala
29
import org.apache.spark.streaming.{StreamingContext, Duration, Seconds, Minutes}
30
import org.apache.spark.streaming.dstream.{DStream, PairDStreamFunctions}
31
import org.apache.spark.{SparkConf, SparkContext}
32
import org.apache.spark.storage.StorageLevel
33
```
34
35
**Java:**
36
```java
37
import org.apache.spark.streaming.api.java.JavaStreamingContext;
38
import org.apache.spark.streaming.api.java.JavaDStream;
39
import org.apache.spark.streaming.api.java.JavaPairDStream;
40
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
41
import org.apache.spark.streaming.Duration;
42
import org.apache.spark.streaming.Durations;
43
import org.apache.spark.SparkConf;
44
import org.apache.spark.storage.StorageLevel;
45
```
46
47
## Basic Usage
48
49
**Scala Example:**
50
```scala
51
import org.apache.spark.SparkConf
52
import org.apache.spark.streaming.{StreamingContext, Seconds}
53
54
// Create streaming context
55
val conf = new SparkConf().setAppName("StreamingExample")
56
val ssc = new StreamingContext(conf, Seconds(1))
57
58
// Create input stream from TCP socket
59
val lines = ssc.socketTextStream("localhost", 9999)
60
61
// Transform and process data
62
val words = lines.flatMap(_.split(" "))
63
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
64
65
// Output results
66
wordCounts.print()
67
68
// Start streaming
69
ssc.start()
70
ssc.awaitTermination()
71
```
72
73
**Java Example:**
74
```java
75
import org.apache.spark.SparkConf;
76
import org.apache.spark.streaming.api.java.JavaStreamingContext;
77
import org.apache.spark.streaming.api.java.JavaDStream;
78
import org.apache.spark.streaming.api.java.JavaPairDStream;
79
import org.apache.spark.streaming.Durations;
80
import java.util.Arrays;
81
import scala.Tuple2;
82
83
// Create streaming context
84
SparkConf conf = new SparkConf().setAppName("StreamingExample");
85
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
86
87
// Create input stream from TCP socket
88
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
89
90
// Transform and process data
91
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
92
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
93
.reduceByKey((i1, i2) -> i1 + i2);
94
95
// Output results
96
wordCounts.print();
97
98
// Start streaming
99
jssc.start();
100
jssc.awaitTermination();
101
```
102
103
## Architecture
104
105
Apache Spark Streaming is built around several key components:
106
107
- **StreamingContext**: Main entry point that coordinates the streaming application and manages the execution
108
- **DStream (Discretized Stream)**: Core abstraction representing a continuous stream of RDDs, with time-sliced micro-batches
109
- **Input Sources**: Various data sources like sockets, files, Kafka, Flume for ingesting streaming data
110
- **Transformations**: Functional operations like map, filter, reduce that transform DStreams
111
- **Output Operations**: Actions that send processed data to external systems or storage
112
- **Checkpointing**: Fault tolerance mechanism that periodically saves state to persistent storage
113
- **Receivers**: Components that collect data from streaming sources and feed it into Spark
114
115
## Capabilities
116
117
### Streaming Context Management
118
119
Entry point for creating and managing Spark Streaming applications. Handles initialization, configuration, and lifecycle management.
120
121
```scala { .api }
122
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration)
123
class StreamingContext(conf: SparkConf, batchDuration: Duration)
124
125
// Lifecycle methods
126
def start(): Unit
127
def stop(stopSparkContext: Boolean = true): Unit
128
def awaitTermination(): Unit
129
def awaitTerminationOrTimeout(timeout: Long): Boolean
130
131
// Configuration
132
def checkpoint(directory: String): Unit
133
def remember(duration: Duration): Unit
134
```
135
136
[Streaming Context](./streaming-context.md)
137
138
### Data Stream Operations
139
140
Core functionality for creating, transforming, and processing continuous data streams with functional programming paradigms.
141
142
```scala { .api }
143
abstract class DStream[T] {
144
// Transformations
145
def map[U](mapFunc: T => U): DStream[U]
146
def flatMap[U](flatMapFunc: T => TraversableOnce[U]): DStream[U]
147
def filter(filterFunc: T => Boolean): DStream[T]
148
def transform[U](transformFunc: RDD[T] => RDD[U]): DStream[U]
149
def union(that: DStream[T]): DStream[T]
150
151
// Window operations
152
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
153
def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
154
155
// Output operations
156
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
157
def print(num: Int = 10): Unit
158
}
159
```
160
161
[Data Streams](./data-streams.md)
162
163
### Input Sources
164
165
Various mechanisms for ingesting data from external sources including sockets, files, message queues, and custom receivers.
166
167
```scala { .api }
168
// Socket streams
169
def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
170
def rawSocketStream[T](hostname: String, port: Int, storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]
171
172
// File streams
173
def textFileStream(directory: String): DStream[String]
174
def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): InputDStream[(K, V)]
175
176
// Queue and custom streams
177
def queueStream[T](queue: Queue[RDD[T]], oneAtATime: Boolean = true): InputDStream[T]
178
def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T]
179
```
180
181
[Input Sources](./input-sources.md)
182
183
### Key-Value Operations
184
185
Specialized operations for key-value pair streams including aggregations, joins, and state management.
186
187
```scala { .api }
188
class PairDStreamFunctions[K, V] {
189
// Aggregations
190
def groupByKey(): DStream[(K, Iterable[V])]
191
def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)]
192
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C): DStream[(K, C)]
193
194
// Window aggregations
195
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]
196
197
// State management
198
def updateStateByKey[S](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
199
def mapWithState[StateType, MappedType](spec: StateSpec[K, V, StateType, MappedType]): MapWithStateDStream[K, V, StateType, MappedType]
200
201
// Joins
202
def join[W](other: DStream[(K, W)]): DStream[(K, (V, W))]
203
def leftOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
204
}
205
```
206
207
[Key-Value Operations](./key-value-ops.md)
208
209
### Window Operations
210
211
Time-based windowing operations for aggregating data across multiple time intervals.
212
213
```scala { .api }
214
// Basic windowing
215
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
216
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
217
218
// Windowed reductions
219
def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
220
def reduceByWindow(reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
221
```
222
223
[Window Operations](./window-ops.md)
224
225
### State Management
226
227
Stateful processing capabilities for maintaining state across streaming batches with fault tolerance.
228
229
```scala { .api }
230
// StateSpec for mapWithState
231
object StateSpec {
232
def function[KeyType, ValueType, StateType, MappedType](
233
mappingFunction: (KeyType, Option[ValueType], State[StateType]) => Option[MappedType]
234
): StateSpec[KeyType, ValueType, StateType, MappedType]
235
}
236
237
abstract class State[S] {
238
def exists(): Boolean
239
def get(): S
240
def update(newState: S): Unit
241
def remove(): Unit
242
def isTimingOut(): Boolean
243
}
244
```
245
246
[State Management](./state-management.md)
247
248
### Java API
249
250
Java-friendly wrappers providing the same functionality with Java-compatible interfaces and type system.
251
252
```java { .api }
253
public class JavaStreamingContext {
254
public JavaStreamingContext(SparkConf conf, Duration batchDuration);
255
public JavaDStream<String> socketTextStream(String hostname, int port);
256
public void start();
257
public void awaitTermination();
258
}
259
260
public class JavaDStream<T> {
261
public <R> JavaDStream<R> map(Function<T, R> f);
262
public JavaDStream<T> filter(Function<T, Boolean> f);
263
public <R> JavaDStream<R> flatMap(FlatMapFunction<T, R> f);
264
public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc);
265
}
266
```
267
268
[Java API](./java-api.md)
269
270
### Event Monitoring
271
272
Listener system for monitoring streaming application performance, batch processing, and receiver status.
273
274
```scala { .api }
275
trait StreamingListener {
276
def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit
277
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit
278
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit
279
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit
280
def onReceiverError(receiverError: StreamingListenerReceiverError): Unit
281
}
282
```
283
284
[Event Monitoring](./event-monitoring.md)
285
286
## Types
287
288
```scala { .api }
289
// Time-related types
290
case class Duration(milliseconds: Long) {
291
def +(that: Duration): Duration
292
def -(that: Duration): Duration
293
def *(times: Int): Duration
294
def /(that: Duration): Double
295
}
296
297
case class Time(milliseconds: Long) {
298
def +(that: Duration): Time
299
def -(that: Duration): Time
300
def -(that: Time): Duration
301
}
302
303
case class Interval(beginTime: Time, endTime: Time) {
304
def duration(): Duration
305
}
306
307
// Helper objects
308
object Seconds {
309
def apply(seconds: Long): Duration
310
}
311
312
object Minutes {
313
def apply(minutes: Long): Duration
314
}
315
316
object Milliseconds {
317
def apply(milliseconds: Long): Duration
318
}
319
```