0
# Core Streaming Operations
1
2
Core streaming functionality providing the essential components for building streaming applications including StreamingContext creation, DStream transformations, and output operations.
3
4
## Capabilities
5
6
### StreamingContext
7
8
The main entry point for Spark Streaming functionality. Creates and manages streaming computations.
9
10
```scala { .api }
11
/**
12
* Main entry point for Spark Streaming functionality
13
*/
14
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {
15
/** Start the streaming computation */
16
def start(): Unit
17
18
/** Stop the streaming computation (uses spark.streaming.stopSparkContextByDefault config) */
19
def stop(): Unit
20
21
/** Stop the streaming computation with option to stop SparkContext */
22
def stop(stopSparkContext: Boolean): Unit
23
24
/** Stop with graceful shutdown and optional SparkContext stop */
25
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit
26
27
/** Wait for the streaming context to terminate */
28
def awaitTermination(): Unit
29
30
/** Wait for termination or timeout */
31
def awaitTerminationOrTimeout(timeout: Long): Boolean
32
33
/** Get current state of the streaming context */
34
def getState(): StreamingContextState
35
36
/** Set checkpoint directory for fault tolerance */
37
def checkpoint(directory: String): Unit
38
39
/** Set how long DStreams should remember their RDDs */
40
def remember(duration: Duration): Unit
41
42
/** Get the underlying SparkContext */
43
def sparkContext: SparkContext
44
45
/** Add a streaming listener for event notifications */
46
def addStreamingListener(streamingListener: StreamingListener): Unit
47
48
/** Remove a streaming listener */
49
def removeStreamingListener(streamingListener: StreamingListener): Unit
50
}
51
```
52
53
**Alternative Constructors:**
54
55
```scala { .api }
56
/** Create from SparkConf */
57
def this(conf: SparkConf, batchDuration: Duration)
58
59
/** Create with master and app name */
60
def this(master: String, appName: String, batchDuration: Duration)
61
62
/** Create with master, app name, batch duration, and Spark home */
63
def this(master: String, appName: String, batchDuration: Duration, sparkHome: String)
64
65
/** Create with master, app name, batch duration, Spark home, and JAR files */
66
def this(master: String, appName: String, batchDuration: Duration, sparkHome: String, jars: Seq[String])
67
68
/** Create with master, app name, batch duration, Spark home, JARs, and environment */
69
def this(master: String, appName: String, batchDuration: Duration, sparkHome: String, jars: Seq[String], environment: Map[String, String])
70
71
/** Restore from checkpoint */
72
def this(path: String)
73
74
/** Restore from checkpoint with Hadoop configuration */
75
def this(path: String, hadoopConf: Configuration)
76
77
/** Restore from checkpoint with existing SparkContext */
78
def this(path: String, sparkContext: SparkContext)
79
```
80
81
**Companion Object Methods:**
82
83
```scala { .api }
84
object StreamingContext {
85
/** Get currently active StreamingContext */
86
def getActive(): Option[StreamingContext]
87
88
/** Get active context or create new one */
89
def getActiveOrCreate(creatingFunc: () => StreamingContext): StreamingContext
90
91
/** Create from checkpoint or use creating function */
92
def getOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext): StreamingContext
93
94
/** Create from checkpoint with Hadoop configuration or use creating function */
95
def getOrCreate(checkpointPath: String, hadoopConf: Configuration, creatingFunc: () => StreamingContext): StreamingContext
96
97
/** Create from checkpoint with Hadoop configuration and SparkContext or use creating function */
98
def getOrCreate(checkpointPath: String, hadoopConf: Configuration, creatingFunc: () => StreamingContext, createOnError: Boolean): StreamingContext
99
}
100
```
101
102
**Usage Examples:**
103
104
```scala
105
import org.apache.spark._
106
import org.apache.spark.streaming._
107
108
// Create with SparkContext and batch duration
109
val conf = new SparkConf().setAppName("MyStreamingApp")
110
val sc = new SparkContext(conf)
111
val ssc = new StreamingContext(sc, Seconds(1))
112
113
// Create directly from SparkConf
114
val ssc2 = new StreamingContext(conf, Seconds(2))
115
116
// Create with checkpoint recovery
117
val ssc3 = StreamingContext.getOrCreate("/path/to/checkpoint", () => {
118
val conf = new SparkConf().setAppName("RecoverableApp")
119
new StreamingContext(conf, Seconds(1))
120
})
121
122
// Configure and start
123
ssc.checkpoint("/path/to/checkpoint")
124
ssc.start()
125
ssc.awaitTermination()
126
```
127
128
### DStream[T]
129
130
Discretized Stream - represents a continuous sequence of RDDs. The fundamental abstraction in Spark Streaming.
131
132
```scala { .api }
133
/**
134
* Discretized Stream - represents a continuous sequence of RDDs
135
* @tparam T the type of elements in the stream
136
*/
137
abstract class DStream[T] {
138
/** The StreamingContext associated with this DStream */
139
def context: StreamingContext
140
141
/** The slide duration of this DStream */
142
def slideDuration: Duration
143
144
/** List of parent DStreams on which this DStream depends */
145
def dependencies: List[DStream[_]]
146
147
/** Persist RDDs in this DStream with the default storage level */
148
def persist(): DStream[T]
149
150
/** Persist RDDs with specific storage level */
151
def persist(level: StorageLevel): DStream[T]
152
153
/** Cache RDDs in memory */
154
def cache(): DStream[T]
155
156
/** Enable periodic checkpointing */
157
def checkpoint(interval: Duration): DStream[T]
158
}
159
```
160
161
**Basic Transformations:**
162
163
```scala { .api }
164
abstract class DStream[T] {
165
/** Transform each element using the provided function */
166
def map[U](mapFunc: T => U): DStream[U]
167
168
/** Transform each element and flatten results */
169
def flatMap[U](flatMapFunc: T => TraversableOnce[U]): DStream[U]
170
171
/** Filter elements based on predicate */
172
def filter(filterFunc: T => Boolean): DStream[T]
173
174
/** Transform each partition independently */
175
def mapPartitions[U](mapPartFunc: Iterator[T] => Iterator[U]): DStream[U]
176
177
/** Group elements of each partition into an array */
178
def glom(): DStream[Array[T]]
179
180
/** Repartition RDDs in the DStream */
181
def repartition(numPartitions: Int): DStream[T]
182
183
/** Union with another DStream */
184
def union(that: DStream[T]): DStream[T]
185
}
186
```
187
188
**Reduction Operations:**
189
190
```scala { .api }
191
abstract class DStream[T] {
192
/** Reduce elements using associative and commutative function */
193
def reduce(reduceFunc: (T, T) => T): DStream[T]
194
195
/** Count number of elements in each RDD */
196
def count(): DStream[Long]
197
198
/** Count occurrences of each unique value */
199
def countByValue(): DStream[(T, Long)]
200
}
201
```
202
203
**Output Operations:**
204
205
```scala { .api }
206
abstract class DStream[T] {
207
/** Print first 10 elements of each RDD */
208
def print(): Unit
209
210
/** Print first num elements of each RDD */
211
def print(num: Int): Unit
212
213
/** Apply function to each RDD in the DStream */
214
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
215
216
/** Apply function to each RDD with timestamp */
217
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
218
219
/** Save as object files with prefix and suffix */
220
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit
221
222
/** Save as text files with prefix and suffix */
223
def saveAsTextFiles(prefix: String, suffix: String = ""): Unit
224
}
225
```
226
227
**Advanced Transformations:**
228
229
```scala { .api }
230
abstract class DStream[T] {
231
/** Transform using arbitrary RDD operation */
232
def transform[U](transformFunc: RDD[T] => RDD[U]): DStream[U]
233
234
/** Transform with access to timestamp */
235
def transform[U](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]
236
237
/** Transform with another DStream */
238
def transformWith[U, V](other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]): DStream[V]
239
240
/** Transform with another DStream and timestamp access */
241
def transformWith[U, V](other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]): DStream[V]
242
}
243
```
244
245
**Utility Methods:**
246
247
```scala { .api }
248
abstract class DStream[T] {
249
/** Retrieve RDDs in specified time interval */
250
def slice(interval: Interval): Seq[RDD[T]]
251
252
/** Retrieve RDDs between specified times */
253
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]]
254
}
255
```
256
257
**Usage Examples:**
258
259
```scala
260
import org.apache.spark.streaming._
261
262
// Basic transformations
263
val lines: DStream[String] = ssc.textFileStream("/path/to/files")
264
val words = lines.flatMap(_.split(" "))
265
val filtered = words.filter(_.length > 3)
266
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
267
268
// Persistence and checkpointing
269
val important = lines.filter(_.contains("ERROR")).cache()
270
important.checkpoint(Seconds(30))
271
272
// Custom transformations
273
val processed = lines.transform { rdd =>
274
rdd.mapPartitions(partition => {
275
// Custom partition-level processing
276
partition.map(_.toUpperCase)
277
})
278
}
279
280
// Output operations
281
wordCounts.print(20)
282
wordCounts.foreachRDD { (rdd, time) =>
283
println(s"Batch time: $time")
284
rdd.take(10).foreach(println)
285
}
286
```
287
288
### PairDStreamFunctions[K, V]
289
290
Enhanced functionality for DStreams of key-value pairs, providing operations like grouping, joins, and aggregations.
291
292
```scala { .api }
293
/**
294
* Extra functionality available on DStreams of (key, value) pairs
295
* Available as implicit conversion from DStream[(K, V)]
296
*/
297
class PairDStreamFunctions[K, V](self: DStream[(K, V)]) {
298
/** Group values by key */
299
def groupByKey(): DStream[(K, Iterable[V])]
300
301
/** Group by key with specific number of partitions */
302
def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]
303
304
/** Group by key using custom partitioner */
305
def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]
306
307
/** Reduce values by key */
308
def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)]
309
310
/** Reduce by key with specific partitions */
311
def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)]
312
313
/** Reduce by key with custom partitioner */
314
def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]
315
316
/** Combine values by key with custom combiners */
317
def combineByKey[C](
318
createCombiner: V => C,
319
mergeValue: (C, V) => C,
320
mergeCombiner: (C, C) => C
321
): DStream[(K, C)]
322
323
/** Transform values while preserving keys */
324
def mapValues[U](mapValuesFunc: V => U): DStream[(K, U)]
325
326
/** Flat map values while preserving keys */
327
def flatMapValues[U](flatMapValuesFunc: V => TraversableOnce[U]): DStream[(K, U)]
328
}
329
```
330
331
**Join Operations:**
332
333
```scala { .api }
334
class PairDStreamFunctions[K, V] {
335
/** Inner join with another pair DStream */
336
def join[W](other: DStream[(K, W)]): DStream[(K, (V, W))]
337
338
/** Left outer join */
339
def leftOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
340
341
/** Right outer join */
342
def rightOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
343
344
/** Full outer join */
345
def fullOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]
346
347
/** Cogroup with another pair DStream */
348
def cogroup[W](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]
349
}
350
```
351
352
**Usage Examples:**
353
354
```scala
355
val pairs: DStream[(String, Int)] = words.map(word => (word, 1))
356
357
// Basic operations
358
val wordCounts = pairs.reduceByKey(_ + _)
359
val upperWords = pairs.mapValues(_.toString.toUpperCase)
360
val grouped = pairs.groupByKey()
361
362
// Joins
363
val pairs2: DStream[(String, Double)] = // another pair DStream
364
val joined = pairs.join(pairs2) // DStream[(String, (Int, Double))]
365
val leftJoined = pairs.leftOuterJoin(pairs2)
366
367
// Custom partitioning
368
val customPartitioner = new HashPartitioner(4)
369
val partitioned = pairs.reduceByKey(_ + _, customPartitioner)
370
```