0
# Streaming Context
1
2
The StreamingContext is the main entry point for Apache Spark Streaming functionality. It coordinates the streaming application, manages the execution, and provides methods for creating input streams and configuring the streaming environment.
3
4
## Capabilities
5
6
### StreamingContext Creation
7
8
Create a StreamingContext with various configuration options.
9
10
```scala { .api }
11
/**
12
* Create StreamingContext from SparkContext
13
* @param sparkContext - Existing SparkContext instance
14
* @param batchDuration - Time interval at which streaming data will be divided into batches
15
*/
16
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration)
17
18
/**
19
* Create StreamingContext from SparkConf
20
* @param conf - Spark configuration
21
* @param batchDuration - Time interval at which streaming data will be divided into batches
22
*/
23
class StreamingContext(conf: SparkConf, batchDuration: Duration)
24
25
/**
26
* Create StreamingContext with master and app name
27
* @param master - Cluster URL to connect to
28
* @param appName - Name for your application
29
* @param batchDuration - Time interval at which streaming data will be divided into batches
30
* @param sparkHome - Spark home directory (optional)
31
* @param jars - JAR files to send to the cluster (optional)
32
* @param environment - Environment variables (optional)
33
*/
34
class StreamingContext(
35
master: String,
36
appName: String,
37
batchDuration: Duration,
38
sparkHome: String = null,
39
jars: Seq[String] = Nil,
40
environment: Map[String, String] = Map()
41
)
42
```
43
44
**Usage Examples:**
45
46
```scala
47
import org.apache.spark.SparkConf
48
import org.apache.spark.streaming.{StreamingContext, Seconds}
49
50
// From SparkConf
51
val conf = new SparkConf().setAppName("MyStreamingApp").setMaster("local[2]")
52
val ssc = new StreamingContext(conf, Seconds(1))
53
54
// Direct creation
55
val ssc2 = new StreamingContext("local[2]", "MyApp", Seconds(5))
56
```
57
58
### Lifecycle Management
59
60
Control the streaming application lifecycle with start, stop, and termination methods.
61
62
```scala { .api }
63
/**
64
* Start the streaming context and begin processing
65
*/
66
def start(): Unit
67
68
/**
69
* Stop the streaming context
70
* @param stopSparkContext - Whether to stop the underlying SparkContext
71
* @param stopGracefully - Whether to stop gracefully by waiting for data to be processed
72
*/
73
def stop(stopSparkContext: Boolean = true, stopGracefully: Boolean = false): Unit
74
75
/**
76
* Wait for the streaming to terminate
77
*/
78
def awaitTermination(): Unit
79
80
/**
81
* Wait for termination or timeout
82
* @param timeout - Maximum time to wait in milliseconds
83
* @returns true if terminated within timeout, false if timeout occurred
84
*/
85
def awaitTerminationOrTimeout(timeout: Long): Boolean
86
87
/**
88
* Get the current state of the streaming context
89
* @returns StreamingContextState (INITIALIZED, ACTIVE, or STOPPED)
90
*/
91
def getState(): StreamingContextState
92
```
93
94
### Configuration Methods
95
96
Configure checkpointing, data retention, and streaming behavior.
97
98
```scala { .api }
99
/**
100
* Set checkpoint directory for fault tolerance
101
* @param directory - HDFS-compatible directory path for checkpoints
102
*/
103
def checkpoint(directory: String): Unit
104
105
/**
106
* Set how long to remember RDDs for recovery
107
* @param duration - Duration to keep RDDs in memory for recovery
108
*/
109
def remember(duration: Duration): Unit
110
111
/**
112
* Add streaming listener for monitoring
113
* @param streamingListener - Listener to receive streaming events
114
*/
115
def addStreamingListener(streamingListener: StreamingListener): Unit
116
```
117
118
### Socket Input Streams
119
120
Create input streams from TCP sockets.
121
122
```scala { .api }
123
/**
124
* Create text input stream from TCP socket
125
* @param hostname - Hostname to connect to
126
* @param port - Port number to connect to
127
* @param storageLevel - Storage level for received data
128
* @returns ReceiverInputDStream of strings
129
*/
130
def socketTextStream(
131
hostname: String,
132
port: Int,
133
storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2
134
): ReceiverInputDStream[String]
135
136
/**
137
* Create binary input stream from TCP socket
138
* @param hostname - Hostname to connect to
139
* @param port - Port number to connect to
140
* @param converter - Function to convert InputStream to Iterator[T]
141
* @param storageLevel - Storage level for received data
142
* @returns ReceiverInputDStream of converted type
143
*/
144
def socketStream[T: ClassTag](
145
hostname: String,
146
port: Int,
147
converter: (InputStream) => Iterator[T],
148
storageLevel: StorageLevel
149
): ReceiverInputDStream[T]
150
151
/**
152
* Create raw TCP socket stream
153
* @param hostname - Hostname to connect to
154
* @param port - Port number to connect to
155
* @param storageLevel - Storage level for received data
156
* @returns ReceiverInputDStream of byte arrays
157
*/
158
def rawSocketStream[T: ClassTag](
159
hostname: String,
160
port: Int,
161
storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2
162
): ReceiverInputDStream[T]
163
```
164
165
### File Input Streams
166
167
Create input streams that monitor file systems for new files.
168
169
```scala { .api }
170
/**
171
* Create input stream from text files in a directory
172
* @param directory - Directory path to monitor
173
* @returns DStream of strings (file contents line by line)
174
*/
175
def textFileStream(directory: String): DStream[String]
176
177
/**
178
* Create input stream from binary files
179
* @param directory - Directory path to monitor
180
* @param recordLength - Length of each record in bytes
181
* @returns DStream of byte arrays
182
*/
183
def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]
184
185
/**
186
* Create generic file input stream
187
* @param directory - Directory path to monitor
188
* @returns InputDStream of key-value pairs based on input format
189
*/
190
def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](
191
directory: String
192
): InputDStream[(K, V)]
193
```
194
195
### Queue and Custom Input Streams
196
197
Create input streams from RDD queues or custom receivers.
198
199
```scala { .api }
200
/**
201
* Create input stream from queue of RDDs
202
* @param queue - Queue containing RDDs to process
203
* @param oneAtATime - Whether to process one RDD per batch
204
* @param defaultRDD - Default RDD when queue is empty
205
* @returns InputDStream of queue elements
206
*/
207
def queueStream[T: ClassTag](
208
queue: Queue[RDD[T]],
209
oneAtATime: Boolean = true,
210
defaultRDD: RDD[T] = null
211
): InputDStream[T]
212
213
/**
214
* Create input stream from custom receiver
215
* @param receiver - Custom receiver implementation
216
* @returns ReceiverInputDStream from the receiver
217
*/
218
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]
219
```
220
221
### Stream Combination Operations
222
223
Combine multiple streams into unified streams.
224
225
```scala { .api }
226
/**
227
* Union multiple DStreams of the same type
228
* @param streams - Sequence of DStreams to union
229
* @returns Single DStream containing data from all input streams
230
*/
231
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]
232
233
/**
234
* Transform multiple DStreams using a custom function
235
* @param dstreams - Sequence of input DStreams
236
* @param transformFunc - Function to transform RDDs from all streams
237
* @returns DStream with transformed data
238
*/
239
def transform[T: ClassTag](
240
dstreams: Seq[DStream[_]],
241
transformFunc: (Seq[RDD[_]], Time) => RDD[T]
242
): DStream[T]
243
```
244
245
**Usage Examples:**
246
247
```scala
248
// Basic lifecycle
249
val ssc = new StreamingContext(conf, Seconds(1))
250
ssc.checkpoint("hdfs://checkpoint")
251
252
val lines = ssc.socketTextStream("localhost", 9999)
253
lines.print()
254
255
ssc.start()
256
ssc.awaitTermination()
257
258
// Combining streams
259
val stream1 = ssc.socketTextStream("host1", 9999)
260
val stream2 = ssc.socketTextStream("host2", 9999)
261
val combined = ssc.union(Seq(stream1, stream2))
262
263
// Custom transformation
264
val transformed = ssc.transform(Seq(stream1, stream2), (rdds, time) => {
265
val rdd1 = rdds(0).asInstanceOf[RDD[String]]
266
val rdd2 = rdds(1).asInstanceOf[RDD[String]]
267
rdd1.union(rdd2).filter(_.nonEmpty)
268
})
269
```
270
271
## Context Factory Methods
272
273
Factory methods for creating StreamingContext from checkpoints.
274
275
```scala { .api }
276
object StreamingContext {
277
/**
278
* Recreate StreamingContext from checkpoint
279
* @param path - Path to checkpoint directory
280
* @param hadoopConf - Hadoop configuration (optional)
281
* @returns StreamingContext restored from checkpoint
282
*/
283
def getOrCreate(
284
path: String,
285
creatingFunc: () => StreamingContext,
286
hadoopConf: Configuration = new Configuration(),
287
createOnError: Boolean = false
288
): StreamingContext
289
290
/**
291
* Get active StreamingContext
292
* @returns Currently active StreamingContext or null
293
*/
294
def getActive(): Option[StreamingContext]
295
}
296
```
297
298
## Types
299
300
```scala { .api }
301
// Context state enumeration
302
object StreamingContextState extends Enumeration {
303
type StreamingContextState = Value
304
val INITIALIZED, ACTIVE, STOPPED = Value
305
}
306
```