0
# Data Streams
1
2
DStream (Discretized Stream) is the core abstraction in Spark Streaming representing a continuous stream of data. Internally, a DStream is represented as a sequence of RDDs (Resilient Distributed Datasets), where each RDD contains data for a specific time interval.
3
4
## Capabilities
5
6
### Basic Transformations
7
8
Core transformation operations that modify each element or the structure of the DStream.
9
10
```scala { .api }
11
/**
12
* Transform each element using a function
13
* @param mapFunc - Function to apply to each element
14
* @returns New DStream with transformed elements
15
*/
16
def map[U: ClassTag](mapFunc: T => U): DStream[U]
17
18
/**
19
* Transform each element to zero or more elements
20
* @param flatMapFunc - Function returning a collection for each element
21
* @returns New DStream with flattened results
22
*/
23
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]
24
25
/**
26
* Filter elements based on a predicate
27
* @param filterFunc - Function returning true for elements to keep
28
* @returns New DStream with filtered elements
29
*/
30
def filter(filterFunc: T => Boolean): DStream[T]
31
32
/**
33
* Group elements within each RDD into arrays
34
* @returns DStream where each element becomes an array of elements
35
*/
36
def glom(): DStream[Array[T]]
37
38
/**
39
* Transform each partition using a function
40
* @param mapPartFunc - Function to transform an iterator of elements
41
* @param preservePartitioning - Whether to preserve the partitioner
42
* @returns New DStream with transformed partitions
43
*/
44
def mapPartitions[U: ClassTag](
45
mapPartFunc: Iterator[T] => Iterator[U],
46
preservePartitioning: Boolean = false
47
): DStream[U]
48
```
49
50
**Usage Examples:**
51
52
```scala
53
val lines = ssc.socketTextStream("localhost", 9999)
54
55
// Map transformation
56
val lengths = lines.map(_.length)
57
58
// FlatMap transformation
59
val words = lines.flatMap(_.split(" "))
60
61
// Filter transformation
62
val longLines = lines.filter(_.length > 10)
63
64
// Combined transformations
65
val wordCounts = lines
66
.flatMap(_.split(" "))
67
.map((_, 1))
68
.reduceByKey(_ + _)
69
```
70
71
### Advanced Transformations
72
73
More complex transformation operations including custom RDD transformations.
74
75
```scala { .api }
76
/**
77
* Transform each RDD using a custom function
78
* @param transformFunc - Function to transform each RDD
79
* @returns New DStream with transformed RDDs
80
*/
81
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
82
83
/**
84
* Transform each RDD with access to time information
85
* @param transformFunc - Function receiving RDD and time
86
* @returns New DStream with transformed RDDs
87
*/
88
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]
89
90
/**
91
* Transform two DStreams together
92
* @param other - Another DStream to combine with
93
* @param transformFunc - Function to transform both RDDs together
94
* @returns New DStream with combined transformation
95
*/
96
def transformWith[U: ClassTag, V: ClassTag](
97
other: DStream[U],
98
transformFunc: (RDD[T], RDD[U]) => RDD[V]
99
): DStream[V]
100
101
/**
102
* Transform two DStreams with time information
103
* @param other - Another DStream to combine with
104
* @param transformFunc - Function receiving both RDDs and time
105
* @returns New DStream with combined transformation
106
*/
107
def transformWith[U: ClassTag, V: ClassTag](
108
other: DStream[U],
109
transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
110
): DStream[V]
111
112
/**
113
* Repartition the DStream to specified number of partitions
114
* @param numPartitions - Number of partitions for output
115
* @returns Repartitioned DStream
116
*/
117
def repartition(numPartitions: Int): DStream[T]
118
119
/**
120
* Union this DStream with another DStream of same type
121
* @param that - DStream to union with
122
* @returns Combined DStream containing data from both streams
123
*/
124
def union(that: DStream[T]): DStream[T]
125
```
126
127
### Persistence and Caching
128
129
Control how DStream data is stored and cached across operations.
130
131
```scala { .api }
132
/**
133
* Persist DStream RDDs with specified storage level
134
* @param level - Storage level (MEMORY_ONLY, MEMORY_AND_DISK, etc.)
135
* @returns This DStream for method chaining
136
*/
137
def persist(level: StorageLevel): DStream[T]
138
139
/**
140
* Cache DStream RDDs in memory
141
* @returns This DStream for method chaining
142
*/
143
def cache(): DStream[T]
144
145
/**
146
* Enable checkpointing for this DStream
147
* @param interval - Interval between checkpoints
148
* @returns This DStream for method chaining
149
*/
150
def checkpoint(interval: Duration): DStream[T]
151
```
152
153
### Aggregation Operations
154
155
Operations that aggregate data within each batch.
156
157
```scala { .api }
158
/**
159
* Reduce elements in each RDD using a function
160
* @param reduceFunc - Associative and commutative reduce function
161
* @returns DStream with one element per RDD (the reduced result)
162
*/
163
def reduce(reduceFunc: (T, T) => T): DStream[T]
164
165
/**
166
* Count elements in each RDD
167
* @returns DStream of Long values representing counts
168
*/
169
def count(): DStream[Long]
170
171
/**
172
* Count occurrences of each unique element
173
* @param numPartitions - Number of partitions for the result
174
* @returns DStream of (element, count) pairs
175
*/
176
def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)]
177
```
178
179
### Window Operations
180
181
Time-based windowing operations for analyzing data across multiple batches.
182
183
```scala { .api }
184
/**
185
* Create windowed DStream
186
* @param windowDuration - Width of the window
187
* @param slideDuration - Sliding interval of the window (optional, defaults to batch interval)
188
* @returns DStream containing data from the specified window
189
*/
190
def window(windowDuration: Duration): DStream[T]
191
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
192
193
/**
194
* Count elements over a sliding window
195
* @param windowDuration - Width of the window
196
* @param slideDuration - Sliding interval of the window
197
* @returns DStream of window counts
198
*/
199
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
200
201
/**
202
* Count occurrences of each element over a sliding window
203
* @param windowDuration - Width of the window
204
* @param slideDuration - Sliding interval of the window
205
* @param numPartitions - Number of partitions for result (optional)
206
* @returns DStream of (element, count) pairs over the window
207
*/
208
def countByValueAndWindow(
209
windowDuration: Duration,
210
slideDuration: Duration,
211
numPartitions: Int = ssc.sc.defaultParallelism
212
): DStream[(T, Long)]
213
214
/**
215
* Reduce elements over a sliding window
216
* @param reduceFunc - Associative and commutative reduce function
217
* @param windowDuration - Width of the window
218
* @param slideDuration - Sliding interval of the window
219
* @returns DStream with reduced results over windows
220
*/
221
def reduceByWindow(
222
reduceFunc: (T, T) => T,
223
windowDuration: Duration,
224
slideDuration: Duration
225
): DStream[T]
226
227
/**
228
* Reduce elements over sliding window with inverse function for efficiency
229
* @param reduceFunc - Associative reduce function
230
* @param invReduceFunc - Inverse of the reduce function
231
* @param windowDuration - Width of the window
232
* @param slideDuration - Sliding interval of the window
233
* @returns DStream with reduced results over windows
234
*/
235
def reduceByWindow(
236
reduceFunc: (T, T) => T,
237
invReduceFunc: (T, T) => T,
238
windowDuration: Duration,
239
slideDuration: Duration
240
): DStream[T]
241
```
242
243
### Output Operations
244
245
Actions that send data to external systems or trigger computation.
246
247
```scala { .api }
248
/**
249
* Apply function to each RDD in the DStream
250
* @param foreachFunc - Function to apply to each RDD
251
*/
252
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
253
254
/**
255
* Apply function to each RDD with time information
256
* @param foreachFunc - Function receiving RDD and time
257
*/
258
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
259
260
/**
261
* Print first num elements of each RDD to console
262
* @param num - Number of elements to print (default 10)
263
*/
264
def print(num: Int = 10): Unit
265
266
/**
267
* Save DStream as text files
268
* @param prefix - Prefix for output file names
269
* @param suffix - Suffix for output file names (optional)
270
*/
271
def saveAsTextFiles(prefix: String, suffix: String = ""): Unit
272
273
/**
274
* Save DStream as object files (serialized)
275
* @param prefix - Prefix for output file names
276
* @param suffix - Suffix for output file names (optional)
277
*/
278
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit
279
```
280
281
**Usage Examples:**
282
283
```scala
284
val lines = ssc.socketTextStream("localhost", 9999)
285
val words = lines.flatMap(_.split(" "))
286
287
// Window operations
288
val windowedWords = words.window(Seconds(30), Seconds(10))
289
val windowedCounts = words.countByWindow(Seconds(30), Seconds(10))
290
291
// Reduce over windows
292
val windowedWordCount = words
293
.map((_, 1))
294
.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
295
296
// Output operations
297
words.foreachRDD { rdd =>
298
rdd.collect().foreach(println)
299
}
300
301
windowedWordCount.print()
302
windowedWordCount.saveAsTextFiles("output/wordcount")
303
```
304
305
### Utility Operations
306
307
Helper methods for accessing DStream data and metadata.
308
309
```scala { .api }
310
/**
311
* Get RDDs for a specific time range
312
* @param fromTime - Start time (inclusive)
313
* @param toTime - End time (exclusive)
314
* @returns Sequence of RDDs in the time range
315
*/
316
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]]
317
318
/**
319
* Get RDDs for a specific time interval
320
* @param interval - Time interval to retrieve
321
* @returns Sequence of RDDs in the interval
322
*/
323
def slice(interval: Interval): Seq[RDD[T]]
324
325
/**
326
* Compute the RDD for a specific time
327
* @param validTime - Time for which to compute the RDD
328
* @returns RDD containing data for the specified time
329
*/
330
def compute(validTime: Time): Option[RDD[T]]
331
```
332
333
## Key Properties
334
335
```scala { .api }
336
// DStream properties
337
def slideDuration: Duration // Sliding interval of the DStream
338
def dependencies: List[DStream[_]] // Parent DStreams this DStream depends on
339
def generatedRDDs: HashMap[Time, RDD[T]] // Cache of generated RDDs
340
def rememberDuration: Duration // How long to remember RDDs
341
def storageLevel: StorageLevel // Storage level for persistence
342
def mustCheckpoint: Boolean // Whether this DStream requires checkpointing
343
def checkpointDuration: Duration // Checkpoint interval
344
```
345
346
**Advanced Usage:**
347
348
```scala
349
// Custom transformation with RDD operations
350
val processed = lines.transform { rdd =>
351
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
352
import sqlContext.implicits._
353
354
val df = rdd.toDF("line")
355
df.filter($"line".contains("ERROR"))
356
.select($"line")
357
.rdd
358
.map(_.getString(0))
359
}
360
361
// Complex windowed operations
362
val complexWindow = words
363
.map((_, 1))
364
.reduceByKeyAndWindow(
365
(a: Int, b: Int) => a + b, // Add new values
366
(a: Int, b: Int) => a - b, // Remove old values
367
Seconds(30), // Window duration
368
Seconds(10) // Slide duration
369
)
370
.filter(_._2 > 5) // Filter low counts
371
372
// Combining multiple streams with transformWith
373
val stream1 = ssc.socketTextStream("host1", 9999)
374
val stream2 = ssc.socketTextStream("host2", 9999)
375
376
val combined = stream1.transformWith(stream2, (rdd1: RDD[String], rdd2: RDD[String]) => {
377
rdd1.intersection(rdd2) // Find common elements
378
})
379
```