0
# Core Streaming Operations
1
2
Core functionality for creating and managing Spark Streaming contexts, controlling application lifecycle, and basic DStream operations.
3
4
## StreamingContext Creation
5
6
### Primary Constructors
7
8
Create StreamingContext with existing SparkContext:
9
```scala { .api }
10
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration)
11
```
12
13
Create StreamingContext with Spark configuration:
14
```scala { .api }
15
class StreamingContext(conf: SparkConf, batchDuration: Duration)
16
```
17
18
Create StreamingContext with master URL and app name:
19
```scala { .api }
20
class StreamingContext(
21
master: String,
22
appName: String,
23
batchDuration: Duration,
24
sparkHome: String = null,
25
jars: Seq[String] = Nil,
26
environment: Map[String, String] = Map()
27
)
28
```
29
30
### Checkpoint Recovery
31
32
Create StreamingContext from checkpoint:
33
```scala { .api }
34
class StreamingContext(path: String)
35
class StreamingContext(path: String, hadoopConf: Configuration)
36
class StreamingContext(path: String, sparkContext: SparkContext)
37
```
38
39
Example checkpoint recovery:
40
```scala
41
val checkpointDir = "hdfs://checkpoint-dir"
42
43
def createStreamingContext(): StreamingContext = {
44
val conf = new SparkConf().setAppName("MyApp")
45
val ssc = new StreamingContext(conf, Seconds(5))
46
ssc.checkpoint(checkpointDir)
47
// Define your streams and transformations here
48
ssc
49
}
50
51
val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext _)
52
```
53
54
## Lifecycle Management
55
56
### Starting and Stopping
57
58
Start streaming computation:
59
```scala { .api }
60
def start(): Unit
61
```
62
63
Stop streaming computation:
64
```scala { .api }
65
def stop(stopSparkContext: Boolean = true): Unit
66
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit
67
```
68
69
Wait for termination:
70
```scala { .api }
71
def awaitTermination(): Unit
72
def awaitTerminationOrTimeout(timeout: Long): Boolean
73
```
74
75
Example lifecycle management:
76
```scala
77
val ssc = new StreamingContext(conf, Seconds(1))
78
79
// Define your streams and transformations
80
val lines = ssc.socketTextStream("localhost", 9999)
81
val wordCounts = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
82
wordCounts.print()
83
84
// Start the streaming context
85
ssc.start()
86
87
// Add shutdown hook for graceful termination
88
sys.ShutdownHookThread {
89
println("Gracefully stopping Spark Streaming Application")
90
ssc.stop(true, true)
91
println("Application stopped")
92
}
93
94
// Wait for termination
95
ssc.awaitTermination()
96
```
97
98
### State Management
99
100
Get current context state:
101
```scala { .api }
102
def getState(): StreamingContextState
103
```
104
105
StreamingContextState values:
106
- `INITIALIZED` - Context created but not started
107
- `ACTIVE` - Context started and running
108
- `STOPPED` - Context stopped
109
110
Access underlying SparkContext:
111
```scala { .api }
112
def sparkContext: SparkContext
113
```
114
115
## Configuration and Checkpointing
116
117
### Checkpoint Configuration
118
119
Set checkpoint directory:
120
```scala { .api }
121
def checkpoint(directory: String): Unit
122
```
123
124
Set remember duration for DStreams:
125
```scala { .api }
126
def remember(duration: Duration): Unit
127
```
128
129
Example checkpoint setup:
130
```scala
131
val ssc = new StreamingContext(conf, Seconds(5))
132
ssc.checkpoint("hdfs://namenode:9000/checkpoint")
133
ssc.remember(Minutes(2)) // Remember last 2 minutes of data
134
```
135
136
## Static Context Management
137
138
### Singleton Context Management
139
140
Get currently active StreamingContext:
141
```scala { .api }
142
def getActive(): Option[StreamingContext]
143
```
144
145
Get or create StreamingContext:
146
```scala { .api }
147
def getActiveOrCreate(creatingFunc: () => StreamingContext): StreamingContext
148
```
149
150
Get or create from checkpoint:
151
```scala { .api }
152
def getOrCreate(
153
checkpointPath: String,
154
creatingFunc: () => StreamingContext,
155
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
156
createOnError: Boolean = false
157
): StreamingContext
158
```
159
160
Example singleton management:
161
```scala
162
object StreamingApp {
163
def createContext(): StreamingContext = {
164
val conf = new SparkConf().setAppName("MyStreamingApp")
165
val ssc = new StreamingContext(conf, Seconds(2))
166
ssc.checkpoint("checkpoint-dir")
167
// Define streaming logic
168
ssc
169
}
170
171
def main(args: Array[String]): Unit = {
172
// This will create new context or recover from checkpoint
173
val ssc = StreamingContext.getOrCreate("checkpoint-dir", createContext _)
174
ssc.start()
175
ssc.awaitTermination()
176
}
177
}
178
```
179
180
## DStream Union and Transform
181
182
### Union Operations
183
184
Union multiple DStreams:
185
```scala { .api }
186
def union[T](streams: Seq[DStream[T]]): DStream[T]
187
```
188
189
Example union:
190
```scala
191
val stream1 = ssc.socketTextStream("localhost", 9999)
192
val stream2 = ssc.socketTextStream("localhost", 9998)
193
val stream3 = ssc.textFileStream("/data/input")
194
195
val unionStream = ssc.union(Seq(stream1, stream2, stream3))
196
```
197
198
### Transform Operations
199
200
Transform multiple DStreams together:
201
```scala { .api }
202
def transform[T](
203
dstreams: Seq[DStream[_]],
204
transformFunc: (Seq[RDD[_]], Time) => RDD[T]
205
): DStream[T]
206
```
207
208
Example multi-stream transform:
209
```scala
210
val stream1 = ssc.socketTextStream("localhost", 9999)
211
val stream2 = ssc.socketTextStream("localhost", 9998)
212
213
val combinedStream = ssc.transform(Seq(stream1, stream2)) { (rdds, time) =>
214
val rdd1 = rdds(0).asInstanceOf[RDD[String]]
215
val rdd2 = rdds(1).asInstanceOf[RDD[String]]
216
217
// Custom transformation logic
218
rdd1.union(rdd2).filter(_.length > 5)
219
}
220
```
221
222
## Event Listeners
223
224
### Adding and Removing Listeners
225
226
Add streaming listener:
227
```scala { .api }
228
def addStreamingListener(streamingListener: StreamingListener): Unit
229
```
230
231
Remove streaming listener:
232
```scala { .api }
233
def removeStreamingListener(streamingListener: StreamingListener): Unit
234
```
235
236
Example custom listener:
237
```scala
238
class MyStreamingListener extends StreamingListener {
239
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
240
val batchInfo = batchCompleted.batchInfo
241
println(s"Batch ${batchInfo.batchTime} completed in ${batchInfo.processingDelay}ms")
242
}
243
244
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
245
println(s"Receiver error: ${receiverError.receiverInfo.lastErrorMessage}")
246
}
247
}
248
249
ssc.addStreamingListener(new MyStreamingListener())
250
```
251
252
## Utility Methods
253
254
### JAR Management
255
256
Get JAR file for a class:
257
```scala { .api }
258
def jarOfClass(cls: Class[_]): Option[String]
259
```
260
261
Example usage:
262
```scala
263
val jarPath = StreamingContext.jarOfClass(classOf[MyCustomClass])
264
```
265
266
### Context Access
267
268
Access the underlying Spark context:
269
```scala
270
val sc = ssc.sparkContext
271
val appName = sc.appName
272
val mastser = sc.master
273
```
274
275
## Duration Helpers
276
277
Create duration objects:
278
```scala { .api }
279
object Milliseconds {
280
def apply(milliseconds: Long): Duration
281
}
282
283
object Seconds {
284
def apply(seconds: Long): Duration
285
}
286
287
object Minutes {
288
def apply(minutes: Long): Duration
289
}
290
```
291
292
Common duration patterns:
293
```scala
294
val batchInterval = Seconds(5) // 5 second batches
295
val windowSize = Minutes(10) // 10 minute windows
296
val slideInterval = Seconds(30) // 30 second slides
297
val checkpointInterval = Minutes(2) // Checkpoint every 2 minutes
298
```