0
# Input Sources
1
2
Input sources in Spark Streaming provide mechanisms for ingesting data from external systems. These sources create InputDStreams that continuously receive data and convert it into a stream of RDDs for processing.
3
4
## Capabilities
5
6
### Socket-Based Input Streams
7
8
Input streams that connect to TCP sockets to receive data.
9
10
```scala { .api }
11
/**
12
* Create text input stream from TCP socket
13
* @param hostname - Hostname to connect to
14
* @param port - Port number to connect to
15
* @param storageLevel - Storage level for received data (default: MEMORY_AND_DISK_SER_2)
16
* @returns ReceiverInputDStream of strings
17
*/
18
def socketTextStream(
19
hostname: String,
20
port: Int,
21
storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2
22
): ReceiverInputDStream[String]
23
24
/**
25
* Create binary input stream from TCP socket with custom converter
26
* @param hostname - Hostname to connect to
27
* @param port - Port number to connect to
28
* @param converter - Function to convert InputStream to Iterator[T]
29
* @param storageLevel - Storage level for received data
30
* @returns ReceiverInputDStream of converted type T
31
*/
32
def socketStream[T: ClassTag](
33
hostname: String,
34
port: Int,
35
converter: (InputStream) => Iterator[T],
36
storageLevel: StorageLevel
37
): ReceiverInputDStream[T]
38
39
/**
40
* Create raw TCP socket stream for binary data
41
* @param hostname - Hostname to connect to
42
* @param port - Port number to connect to
43
* @param storageLevel - Storage level for received data (default: MEMORY_AND_DISK_SER_2)
44
* @returns ReceiverInputDStream of byte arrays
45
*/
46
def rawSocketStream[T: ClassTag](
47
hostname: String,
48
port: Int,
49
storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2
50
): ReceiverInputDStream[T]
51
```
52
53
**Usage Examples:**
54
55
```scala
56
// Basic text socket stream
57
val lines = ssc.socketTextStream("localhost", 9999)
58
59
// Custom converter for JSON parsing
60
val jsonStream = ssc.socketStream(
61
"data-server", 8080,
62
(inputStream: InputStream) => {
63
scala.io.Source.fromInputStream(inputStream)
64
.getLines()
65
.map(parseJson)
66
},
67
StorageLevel.MEMORY_ONLY
68
)
69
70
// Raw socket for binary protocols
71
val binaryStream = ssc.rawSocketStream[Array[Byte]]("binary-server", 7777)
72
```
73
74
### File System Input Streams
75
76
Input streams that monitor file systems for new files and process them as they arrive.
77
78
```scala { .api }
79
/**
80
* Create input stream from text files in a directory
81
* @param directory - Directory path to monitor for new files
82
* @returns DStream of strings (file contents line by line)
83
*/
84
def textFileStream(directory: String): DStream[String]
85
86
/**
87
* Create input stream from binary files with fixed record length
88
* @param directory - Directory path to monitor for new files
89
* @param recordLength - Length of each record in bytes
90
* @returns DStream of byte arrays
91
*/
92
def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]
93
94
/**
95
* Create generic file input stream using Hadoop InputFormat
96
* @param directory - Directory path to monitor for new files
97
* @returns InputDStream of key-value pairs based on input format F
98
*/
99
def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](
100
directory: String
101
): InputDStream[(K, V)]
102
103
/**
104
* Create file input stream with custom key, value, and input format classes
105
* @param directory - Directory path to monitor
106
* @param keyClass - Class of keys
107
* @param valueClass - Class of values
108
* @param inputFormatClass - Hadoop InputFormat class
109
* @param filter - Function to filter files (optional)
110
* @param newFilesOnly - Whether to process only new files (default: true)
111
* @returns InputDStream of key-value pairs
112
*/
113
def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](
114
directory: String,
115
keyClass: Class[K],
116
valueClass: Class[V],
117
inputFormatClass: Class[F],
118
filter: Path => Boolean = null,
119
newFilesOnly: Boolean = true
120
): InputDStream[(K, V)]
121
```
122
123
**Usage Examples:**
124
125
```scala
126
// Monitor directory for text files
127
val logFiles = ssc.textFileStream("/var/log/app")
128
129
// Process binary data files
130
val binaryData = ssc.binaryRecordsStream("/data/binary", 1024)
131
132
// Process Hadoop sequence files
133
val sequenceFiles = ssc.fileStream[Text, Text, SequenceFileInputFormat[Text, Text]]("/data/sequence")
134
135
// Custom file processing with filter
136
val csvFiles = ssc.fileStream[LongWritable, Text, TextInputFormat](
137
"/data/csv",
138
classOf[LongWritable],
139
classOf[Text],
140
classOf[TextInputFormat],
141
(path: Path) => path.getName.endsWith(".csv")
142
)
143
```
144
145
### Queue-Based Input Streams
146
147
Input streams created from queues of RDDs, useful for testing and programmatic data injection.
148
149
```scala { .api }
150
/**
151
* Create input stream from queue of RDDs
152
* @param queue - Queue containing RDDs to process
153
* @param oneAtATime - Whether to process one RDD per batch (default: true)
154
* @param defaultRDD - Default RDD when queue is empty (optional)
155
* @returns InputDStream of queue elements
156
*/
157
def queueStream[T: ClassTag](
158
queue: Queue[RDD[T]],
159
oneAtATime: Boolean = true,
160
defaultRDD: RDD[T] = null
161
): InputDStream[T]
162
```
163
164
**Usage Examples:**
165
166
```scala
167
import scala.collection.mutable.Queue
168
169
// Create queue and add RDDs
170
val rddQueue = Queue[RDD[Int]]()
171
val queueStream = ssc.queueStream(rddQueue)
172
173
// Add data to queue (typically done in another thread)
174
for (i <- 1 to 10) {
175
rddQueue += ssc.sparkContext.parallelize(1 to 100, 2)
176
}
177
178
// Process one RDD at a time
179
val singleRDDStream = ssc.queueStream(rddQueue, oneAtATime = true)
180
```
181
182
### Custom Receiver Input Streams
183
184
Input streams using custom receiver implementations for specialized data sources.
185
186
```scala { .api }
187
/**
188
* Create input stream from custom receiver
189
* @param receiver - Custom receiver implementation extending Receiver[T]
190
* @returns ReceiverInputDStream from the receiver
191
*/
192
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]
193
194
/**
195
* Create input stream from multiple receivers (union of all receivers)
196
* @param receivers - Sequence of receivers to union
197
* @returns Combined ReceiverInputDStream from all receivers
198
*/
199
def union[T: ClassTag](receivers: Seq[ReceiverInputDStream[T]]): DStream[T]
200
```
201
202
**Custom Receiver Implementation:**
203
204
```scala
205
abstract class Receiver[T](storageLevel: StorageLevel) extends Serializable {
206
/**
207
* Start the receiver - implement data receiving logic
208
*/
209
def onStart(): Unit
210
211
/**
212
* Stop the receiver - implement cleanup logic
213
*/
214
def onStop(): Unit
215
216
/**
217
* Store received data
218
* @param data - Single data item to store
219
*/
220
def store(data: T): Unit
221
222
/**
223
* Store multiple data items
224
* @param data - Iterator of data items to store
225
*/
226
def store(data: Iterator[T]): Unit
227
228
/**
229
* Report error to driver
230
* @param message - Error message
231
* @param throwable - Exception that occurred (optional)
232
*/
233
def reportError(message: String, throwable: Throwable = null): Unit
234
235
/**
236
* Check if receiver is stopped
237
* @returns true if receiver has been stopped
238
*/
239
def isStopped(): Boolean
240
}
241
```
242
243
**Usage Examples:**
244
245
```scala
246
// Custom HTTP receiver
247
class HttpReceiver(url: String, storageLevel: StorageLevel)
248
extends Receiver[String](storageLevel) {
249
250
var httpClient: HttpClient = _
251
252
def onStart() {
253
httpClient = new HttpClient()
254
// Start background thread to poll HTTP endpoint
255
new Thread("Http Receiver") {
256
override def run() {
257
while (!isStopped()) {
258
val response = httpClient.get(url)
259
store(response.body)
260
Thread.sleep(1000)
261
}
262
}
263
}.start()
264
}
265
266
def onStop() {
267
if (httpClient != null) httpClient.close()
268
}
269
}
270
271
// Use custom receiver
272
val httpStream = ssc.receiverStream(new HttpReceiver("http://api.example.com/data", MEMORY_ONLY))
273
```
274
275
### Union Operations for Input Streams
276
277
Combine multiple input streams into a single stream.
278
279
```scala { .api }
280
/**
281
* Union multiple DStreams of the same type
282
* @param streams - Sequence of DStreams to union
283
* @returns Single DStream containing data from all input streams
284
*/
285
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]
286
```
287
288
**Usage Examples:**
289
290
```scala
291
// Combine multiple socket streams
292
val stream1 = ssc.socketTextStream("host1", 9999)
293
val stream2 = ssc.socketTextStream("host2", 9999)
294
val stream3 = ssc.socketTextStream("host3", 9999)
295
296
val combinedStream = ssc.union(Seq(stream1, stream2, stream3))
297
298
// Combine different types of streams
299
val fileStream = ssc.textFileStream("/data/logs")
300
val socketStream = ssc.socketTextStream("localhost", 9999)
301
val allLogs = ssc.union(Seq(fileStream, socketStream))
302
```
303
304
## Input Stream Types and Properties
305
306
### ReceiverInputDStream
307
308
Base class for receiver-based input streams that actively receive data.
309
310
```scala { .api }
311
abstract class ReceiverInputDStream[T: ClassTag](ssc: StreamingContext)
312
extends InputDStream[T](ssc) {
313
314
/**
315
* Get the receiver for this input stream
316
* @returns Receiver instance used by this stream
317
*/
318
def getReceiver(): Receiver[T]
319
}
320
```
321
322
### InputDStream
323
324
Base class for all input streams.
325
326
```scala { .api }
327
abstract class InputDStream[T: ClassTag](ssc: StreamingContext)
328
extends DStream[T](ssc) {
329
330
/**
331
* Start the input stream (called automatically by StreamingContext)
332
*/
333
def start(): Unit
334
335
/**
336
* Stop the input stream (called automatically by StreamingContext)
337
*/
338
def stop(): Unit
339
}
340
```
341
342
### Storage Levels
343
344
Constants for controlling how received data is stored and replicated.
345
346
```scala { .api }
347
import org.apache.spark.storage.StorageLevel
348
349
// Common storage levels for input streams
350
StorageLevel.MEMORY_ONLY // Store in memory only
351
StorageLevel.MEMORY_ONLY_2 // Store in memory, replicated 2x
352
StorageLevel.MEMORY_AND_DISK // Store in memory, spill to disk
353
StorageLevel.MEMORY_AND_DISK_2 // Store in memory and disk, replicated 2x
354
StorageLevel.MEMORY_AND_DISK_SER // Store serialized in memory, spill to disk
355
StorageLevel.MEMORY_AND_DISK_SER_2 // Store serialized, replicated 2x (default)
356
```
357
358
**Choosing Storage Levels:**
359
360
```scala
361
// High performance, risk of data loss
362
val fastStream = ssc.socketTextStream("host", 9999, StorageLevel.MEMORY_ONLY)
363
364
// Fault tolerant, slower
365
val reliableStream = ssc.socketTextStream("host", 9999, StorageLevel.MEMORY_AND_DISK_SER_2)
366
367
// Memory efficient
368
val compactStream = ssc.socketTextStream("host", 9999, StorageLevel.MEMORY_AND_DISK_SER)
369
```
370
371
## Advanced Input Stream Configuration
372
373
### File Stream Monitoring Behavior
374
375
File streams have specific behavior for monitoring directories:
376
377
- **New Files Only**: By default, only files created after the stream started are processed
378
- **File Atomicity**: Files should be moved into the directory atomically (rename operation)
379
- **File Formats**: Support for text files, binary files, and Hadoop InputFormats
380
- **Nested Directories**: Recursive monitoring of subdirectories is not supported
381
382
### Receiver Reliability
383
384
Receivers can be reliable or unreliable:
385
386
- **Reliable Receivers**: Acknowledge data receipt and can replay data on failure
387
- **Unreliable Receivers**: Do not acknowledge receipt, data may be lost on failure
388
389
```scala
390
// Reliable receiver pattern
391
class ReliableReceiver extends Receiver[String](MEMORY_AND_DISK_2) {
392
def onStart() {
393
// Implementation that can replay data from last acknowledged offset
394
}
395
}
396
397
// Unreliable receiver (simpler but less fault-tolerant)
398
class SimpleReceiver extends Receiver[String](MEMORY_ONLY) {
399
def onStart() {
400
// Simple implementation, data may be lost on failure
401
}
402
}
403
```
404
405
### Backpressure and Rate Limiting
406
407
Control the rate of data ingestion to prevent overwhelming the system:
408
409
```scala
410
// Enable backpressure (automatically adjusts receiving rate)
411
val conf = new SparkConf()
412
.set("spark.streaming.backpressure.enabled", "true")
413
.set("spark.streaming.backpressure.initialRate", "1000")
414
415
// Set maximum rate for receivers
416
val conf2 = new SparkConf()
417
.set("spark.streaming.receiver.maxRate", "1000")
418
```
419
420
**Configuration Examples:**
421
422
```scala
423
// Production configuration for reliable file processing
424
val conf = new SparkConf()
425
.setAppName("LogProcessor")
426
.set("spark.streaming.stopGracefullyOnShutdown", "true")
427
.set("spark.streaming.backpressure.enabled", "true")
428
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
429
430
val ssc = new StreamingContext(conf, Seconds(5))
431
ssc.checkpoint("hdfs://checkpoint")
432
433
val logStream = ssc.textFileStream("/var/log/app")
434
.filter(_.contains("ERROR"))
435
.cache()
436
437
logStream.foreachRDD { rdd =>
438
if (!rdd.isEmpty()) {
439
rdd.saveAsTextFile(s"hdfs://processed/${System.currentTimeMillis()}")
440
}
441
}
442
```