0
# Core Streaming
1
2
Core streaming abstractions including StreamingContext for coordinating stream processing, DStream operations for data transformations, and time management classes.
3
4
## Capabilities
5
6
### StreamingContext
7
8
Main entry point for Spark Streaming functionality, responsible for creating input streams, managing the streaming lifecycle, and coordinating batch processing.
9
10
```scala { .api }
11
/**
12
* Main entry point for all streaming operations
13
* @param sparkContext - Spark context for cluster communication
14
* @param batchDuration - Time interval for micro-batch processing
15
*/
16
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {
17
18
/** Alternative constructor using SparkConf */
19
def this(conf: SparkConf, batchDuration: Duration)
20
21
/** Constructor with master URL and app name */
22
def this(
23
master: String,
24
appName: String,
25
batchDuration: Duration,
26
sparkHome: String = null,
27
jars: Seq[String] = Nil,
28
environment: Map[String, String] = Map()
29
)
30
31
/** Constructor for recreating from checkpoint directory */
32
def this(path: String, hadoopConf: Configuration)
33
34
/** Constructor for recreating from checkpoint directory (simplified) */
35
def this(path: String)
36
37
// Lifecycle management
38
/** Start the streaming context and begin processing */
39
def start(): Unit
40
41
/** Stop the streaming context */
42
def stop(stopSparkContext: Boolean = true): Unit
43
44
/** Wait for the streaming context to terminate */
45
def awaitTermination(): Unit
46
47
/** Wait for termination or timeout */
48
def awaitTerminationOrTimeout(timeout: Long): Boolean
49
50
// Configuration
51
/** Set checkpoint directory for fault tolerance */
52
def checkpoint(directory: String): Unit
53
54
/** Set remember duration for caching intermediate RDDs */
55
def remember(duration: Duration): Unit
56
57
// State and properties
58
/** Get current state of the streaming context */
59
def getState(): StreamingContextState
60
61
/** Access to underlying Spark context */
62
def sparkContext: SparkContext
63
64
// Input stream creation (detailed in Input Sources doc)
65
def socketTextStream(hostname: String, port: Int): DStream[String]
66
def textFileStream(directory: String): DStream[String]
67
def queueStream[T](queue: Queue[RDD[T]], oneAtATime: Boolean = true): DStream[T]
68
69
// Listeners
70
/** Add a streaming listener for monitoring */
71
def addStreamingListener(listener: StreamingListener): Unit
72
73
/** Remove a streaming listener */
74
def removeStreamingListener(listener: StreamingListener): Unit
75
}
76
```
77
78
**Usage Examples:**
79
80
```scala
81
import org.apache.spark.{SparkConf, SparkContext}
82
import org.apache.spark.streaming._
83
84
// Create from SparkContext
85
val sc = new SparkContext(new SparkConf().setAppName("StreamingApp"))
86
val ssc = new StreamingContext(sc, Seconds(2))
87
88
// Create from SparkConf directly
89
val ssc2 = new StreamingContext(
90
new SparkConf().setAppName("StreamingApp").setMaster("local[2]"),
91
Seconds(1)
92
)
93
94
// Configure checkpointing for fault tolerance
95
ssc.checkpoint("hdfs://namenode:9000/checkpoints")
96
97
// Create input streams and transformations
98
val lines = ssc.socketTextStream("localhost", 9999)
99
val words = lines.flatMap(_.split(" "))
100
101
// Start processing
102
ssc.start()
103
ssc.awaitTermination()
104
```
105
106
### DStream Operations
107
108
Core abstraction representing a continuous sequence of RDDs with transformation and action operations.
109
110
```scala { .api }
111
/**
112
* Discretized Stream - represents a continuous sequence of RDDs
113
*/
114
abstract class DStream[T] {
115
116
// Core properties
117
/** Duration of each batch */
118
def slideDuration: Duration
119
120
/** Dependencies on other DStreams */
121
def dependencies: List[DStream[_]]
122
123
/** Access to streaming context */
124
def context: StreamingContext
125
126
// Transformations - create new DStreams
127
/** Transform each element using a function */
128
def map[U](mapFunc: T => U): DStream[U]
129
130
/** Transform each element to zero or more elements */
131
def flatMap[U](flatMapFunc: T => Iterable[U]): DStream[U]
132
133
/** Filter elements based on a predicate */
134
def filter(filterFunc: T => Boolean): DStream[T]
135
136
/** Group elements into arrays for each batch */
137
def glom(): DStream[Array[T]]
138
139
/** Repartition the DStream */
140
def repartition(numPartitions: Int): DStream[T]
141
142
/** Union with another DStream */
143
def union(that: DStream[T]): DStream[T]
144
145
/** Cache the DStream RDDs at default storage level */
146
def cache(): DStream[T]
147
148
/** Persist the DStream RDDs at specified storage level */
149
def persist(level: StorageLevel): DStream[T]
150
151
// Window operations
152
/** Create windowed DStream */
153
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
154
155
/** Reduce elements in a sliding window */
156
def reduceByWindow(
157
reduceFunc: (T, T) => T,
158
windowDuration: Duration,
159
slideDuration: Duration
160
): DStream[T]
161
162
/** Count elements in a sliding window */
163
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
164
165
// Note: Stateful operations (updateStateByKey, mapWithState) are only available
166
// on DStream[(K, V)] through PairDStreamFunctions - see that section below
167
168
// Actions - trigger computation
169
/** Print first 10 elements of each batch */
170
def print(): Unit
171
172
/** Print first num elements of each batch */
173
def print(num: Int): Unit
174
175
/** Apply a function to each RDD in the DStream */
176
def foreachRDD(func: RDD[T] => Unit): Unit
177
178
/** Apply a function to each RDD with timestamp */
179
def foreachRDD(func: (RDD[T], Time) => Unit): Unit
180
181
// Output operations
182
/** Save as text files with prefix */
183
def saveAsTextFiles(prefix: String, suffix: String = ""): Unit
184
185
/** Save as Hadoop files */
186
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
187
prefix: String,
188
suffix: String = ""
189
): Unit
190
}
191
```
192
193
**Usage Examples:**
194
195
```scala
196
// Basic transformations
197
val lines: DStream[String] = ssc.socketTextStream("localhost", 9999)
198
val words: DStream[String] = lines.flatMap(_.split(" "))
199
val filtered: DStream[String] = words.filter(_.length > 3)
200
201
// Window operations
202
val windowedWords = words.window(Seconds(10), Seconds(2))
203
val wordCounts = words.map((_, 1))
204
.reduceByKeyAndWindow(_ + _, Seconds(10), Seconds(2))
205
206
// Actions
207
words.print(20) // Print first 20 elements each batch
208
words.foreachRDD { rdd =>
209
if (!rdd.isEmpty()) {
210
println(s"Batch has ${rdd.count()} elements")
211
}
212
}
213
214
// Save to files
215
words.saveAsTextFiles("hdfs://namenode/output", "txt")
216
```
217
218
### PairDStreamFunctions
219
220
Operations available only on DStreams of key-value pairs, providing aggregation and join capabilities.
221
222
```scala { .api }
223
/**
224
* Additional operations on DStreams of (K, V) pairs
225
* Available via implicit conversion when DStream contains tuples
226
*/
227
class PairDStreamFunctions[K, V](self: DStream[(K, V)]) {
228
229
// Key-based aggregations
230
/** Group values by key */
231
def groupByKey(): DStream[(K, Iterable[V])]
232
233
/** Reduce values by key using associative function */
234
def reduceByKey(func: (V, V) => V): DStream[(K, V)]
235
236
/** Combine values by key using combiner functions */
237
def combineByKey[C](
238
createCombiner: V => C,
239
mergeValue: (C, V) => C,
240
mergeCombiner: (C, C) => C
241
): DStream[(K, C)]
242
243
/** Count occurrences of each key */
244
def countByKey(): DStream[(K, Long)]
245
246
// Joins with other pair DStreams
247
/** Inner join with another pair DStream */
248
def join[W](other: DStream[(K, W)]): DStream[(K, (V, W))]
249
250
/** Left outer join */
251
def leftOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
252
253
/** Right outer join */
254
def rightOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
255
256
/** Full outer join */
257
def fullOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]
258
259
// Windowed operations
260
/** Group by key within a sliding window */
261
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]
262
263
/** Reduce by key within a sliding window */
264
def reduceByKeyAndWindow(
265
func: (V, V) => V,
266
windowDuration: Duration,
267
slideDuration: Duration
268
): DStream[(K, V)]
269
270
// Stateful operations
271
/** Update state by key using a function */
272
def updateStateByKey[S](
273
updateFunc: (Seq[V], Option[S]) => Option[S]
274
): DStream[(K, S)]
275
276
/** Update state by key with custom partitioner */
277
def updateStateByKey[S](
278
updateFunc: (Seq[V], Option[S]) => Option[S],
279
partitioner: Partitioner
280
): DStream[(K, S)]
281
282
/** Update state by key with number of partitions */
283
def updateStateByKey[S](
284
updateFunc: (Seq[V], Option[S]) => Option[S],
285
numPartitions: Int
286
): DStream[(K, S)]
287
288
/** Map with state using StateSpec */
289
def mapWithState[StateType, MappedType](
290
spec: StateSpec[K, V, StateType, MappedType]
291
): MapWithStateDStream[K, V, StateType, MappedType]
292
}
293
```
294
295
### Time and Duration Classes
296
297
Time management classes for specifying batch intervals, window durations, and timeout periods.
298
299
```scala { .api }
300
/**
301
* Represents a duration for streaming operations
302
*/
303
case class Duration(milliseconds: Long) {
304
// Arithmetic operations
305
def +(other: Duration): Duration
306
def -(other: Duration): Duration
307
def *(times: Int): Duration
308
def /(divisor: Int): Duration
309
310
// Comparison operations
311
def <(other: Duration): Boolean
312
def <=(other: Duration): Boolean
313
def >(other: Duration): Boolean
314
def >=(other: Duration): Boolean
315
316
// Utility methods
317
def isMultipleOf(other: Duration): Boolean
318
def min(other: Duration): Duration
319
def max(other: Duration): Duration
320
def isZero: Boolean
321
def prettyPrint: String
322
}
323
324
/**
325
* Represents an absolute point in time
326
*/
327
case class Time(milliseconds: Long) {
328
// Arithmetic with Duration
329
def +(duration: Duration): Time
330
def -(duration: Duration): Time
331
def -(other: Time): Duration
332
333
// Utility methods
334
def floor(duration: Duration): Time
335
def isMultipleOf(duration: Duration): Boolean
336
def until(endTime: Time, stepSize: Duration): Seq[Time]
337
def to(endTime: Time, stepSize: Duration): Seq[Time]
338
}
339
340
/**
341
* Factory objects for creating durations
342
*/
343
object Milliseconds {
344
def apply(milliseconds: Long): Duration
345
}
346
347
object Seconds {
348
def apply(seconds: Long): Duration
349
}
350
351
object Minutes {
352
def apply(minutes: Long): Duration
353
}
354
355
/**
356
* Java-friendly duration factories
357
*/
358
object Durations {
359
def milliseconds(milliseconds: Long): Duration
360
def seconds(seconds: Long): Duration
361
def minutes(minutes: Long): Duration
362
}
363
364
/**
365
* Represents a time interval with start and end
366
*/
367
case class Interval(beginTime: Time, endTime: Time) {
368
def duration: Duration = endTime - beginTime
369
def contains(time: Time): Boolean
370
}
371
```
372
373
**Usage Examples:**
374
375
```scala
376
// Creating durations
377
val batchInterval = Seconds(2)
378
val windowSize = Minutes(10)
379
val slideInterval = Seconds(30)
380
381
// Time arithmetic
382
val future = Time(System.currentTimeMillis()) + Minutes(5)
383
val elapsed = Time(System.currentTimeMillis()) - Time(startTime)
384
385
// Using in streaming operations
386
val ssc = new StreamingContext(conf, Seconds(1))
387
val windowed = dstream.window(Minutes(5), Seconds(30))
388
val reduced = dstream.reduceByWindow(_ + _, Minutes(2), Seconds(10))
389
```