0
# Input Sources
1
2
Methods for ingesting data streams from various external sources including network sockets, file systems, in-memory queues, and custom receivers.
3
4
## Socket-based Input Streams
5
6
### Text Socket Streams
7
8
Create DStream from socket text data:
9
```scala { .api }
10
def socketTextStream(
11
hostname: String,
12
port: Int,
13
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
14
): ReceiverInputDStream[String]
15
```
16
17
Example socket text stream:
18
```scala
19
val lines = ssc.socketTextStream("localhost", 9999)
20
val words = lines.flatMap(_.split(" "))
21
words.print()
22
```
23
24
### Custom Socket Streams
25
26
Create DStream with custom converter function:
27
```scala { .api }
28
def socketStream[T: ClassTag](
29
hostname: String,
30
port: Int,
31
converter: (InputStream) => Iterator[T],
32
storageLevel: StorageLevel
33
): ReceiverInputDStream[T]
34
```
35
36
Example with custom converter:
37
```scala
38
import java.io.InputStream
39
import scala.io.Source
40
41
def jsonConverter(inputStream: InputStream): Iterator[MyJsonObject] = {
42
Source.fromInputStream(inputStream).getLines().map(parseJson)
43
}
44
45
val jsonStream = ssc.socketStream("localhost", 8080, jsonConverter, StorageLevel.MEMORY_ONLY)
46
```
47
48
### Raw Socket Streams
49
50
Create DStream for raw binary data:
51
```scala { .api }
52
def rawSocketStream[T: ClassTag](
53
hostname: String,
54
port: Int,
55
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
56
): ReceiverInputDStream[T]
57
```
58
59
## File-based Input Streams
60
61
### Text File Streams
62
63
Monitor directory for new text files:
64
```scala { .api }
65
def textFileStream(directory: String): DStream[String]
66
```
67
68
Example text file monitoring:
69
```scala
70
val fileStream = ssc.textFileStream("/data/streaming-input")
71
val processedLines = fileStream.filter(_.nonEmpty).map(_.toUpperCase)
72
processedLines.print()
73
```
74
75
### Generic File Streams
76
77
Monitor directory with custom input format:
78
```scala { .api }
79
def fileStream[K, V, F <: NewInputFormat[K, V]](
80
directory: String
81
)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): InputDStream[(K, V)]
82
```
83
84
Overloaded versions:
85
```scala { .api }
86
def fileStream[K, V, F <: NewInputFormat[K, V]](
87
directory: String,
88
filter: Path => Boolean,
89
newFilesOnly: Boolean
90
)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): InputDStream[(K, V)]
91
92
def fileStream[K, V, F <: NewInputFormat[K, V]](
93
directory: String,
94
filter: Path => Boolean,
95
newFilesOnly: Boolean,
96
conf: Configuration
97
)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): InputDStream[(K, V)]
98
```
99
100
Example with Hadoop input format:
101
```scala
102
import org.apache.hadoop.io.{LongWritable, Text}
103
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
104
105
val hadoopStream = ssc.fileStream[LongWritable, Text, TextInputFormat]("/data/input")
106
val textStream = hadoopStream.map(_._2.toString)
107
```
108
109
### Binary Records Streams
110
111
Read fixed-length binary records:
112
```scala { .api }
113
def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]
114
```
115
116
Example binary records:
117
```scala
118
val binaryStream = ssc.binaryRecordsStream("/data/binary", 1024)
119
val processedRecords = binaryStream.map { bytes =>
120
// Process fixed 1024-byte records
121
processRecord(bytes)
122
}
123
```
124
125
## Queue-based Input Streams
126
127
### RDD Queue Streams
128
129
Create DStream from queue of RDDs:
130
```scala { .api }
131
def queueStream[T: ClassTag](
132
queue: Queue[RDD[T]],
133
oneAtATime: Boolean = true
134
): InputDStream[T]
135
136
def queueStream[T: ClassTag](
137
queue: Queue[RDD[T]],
138
oneAtATime: Boolean,
139
defaultRDD: RDD[T]
140
): InputDStream[T]
141
```
142
143
Example queue stream:
144
```scala
145
import scala.collection.mutable.Queue
146
147
val rddQueue = new Queue[RDD[Int]]()
148
val queueStream = ssc.queueStream(rddQueue)
149
150
// Add RDDs to queue in another thread
151
new Thread {
152
override def run(): Unit = {
153
for (i <- 1 to 100) {
154
rddQueue += ssc.sparkContext.parallelize(1 to 10)
155
Thread.sleep(1000)
156
}
157
}
158
}.start()
159
160
queueStream.print()
161
```
162
163
**Note**: Queue streams do not support checkpointing and should not be used in production for fault tolerance.
164
165
## Custom Receiver Streams
166
167
### Receiver-based Streams
168
169
Create DStream with custom receiver:
170
```scala { .api }
171
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]
172
```
173
174
Example custom receiver:
175
```scala
176
import org.apache.spark.storage.StorageLevel
177
import org.apache.spark.streaming.receiver.Receiver
178
import java.util.concurrent.Executors
179
180
class CustomReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
181
private val executor = Executors.newSingleThreadExecutor()
182
183
def onStart(): Unit = {
184
executor.execute(new Runnable {
185
def run(): Unit = {
186
receive()
187
}
188
})
189
}
190
191
def onStop(): Unit = {
192
executor.shutdown()
193
}
194
195
private def receive(): Unit = {
196
try {
197
while (!isStopped()) {
198
// Simulate receiving data
199
val data = generateData()
200
store(data)
201
Thread.sleep(100)
202
}
203
} catch {
204
case e: Exception => restart("Error receiving data", e)
205
}
206
}
207
208
private def generateData(): String = {
209
// Custom data generation logic
210
s"data-${System.currentTimeMillis()}"
211
}
212
}
213
214
val customStream = ssc.receiverStream(new CustomReceiver())
215
customStream.print()
216
```
217
218
### Pluggable Input Streams
219
220
Alternative constructor for custom receivers:
221
```scala { .api }
222
class PluggableInputDStream[T: ClassTag](
223
ssc: StreamingContext,
224
receiver: Receiver[T]
225
) extends ReceiverInputDStream[T](ssc)
226
```
227
228
## Input Stream Properties
229
230
### Storage Levels
231
232
Common storage levels for input streams:
233
- `StorageLevel.MEMORY_ONLY` - Store in memory only
234
- `StorageLevel.MEMORY_AND_DISK` - Memory with disk fallback
235
- `StorageLevel.MEMORY_ONLY_SER` - Memory with serialization
236
- `StorageLevel.MEMORY_AND_DISK_SER` - Memory and disk with serialization
237
- `StorageLevel.MEMORY_AND_DISK_SER_2` - Replicated version
238
239
Example with custom storage level:
240
```scala
241
val stream = ssc.socketTextStream(
242
"localhost",
243
9999,
244
StorageLevel.MEMORY_ONLY_SER
245
)
246
```
247
248
### Input Stream Identification
249
250
All input streams have unique identifiers:
251
```scala { .api }
252
abstract class InputDStream[T] extends DStream[T] {
253
val id: Int
254
val name: String
255
def start(): Unit
256
def stop(): Unit
257
}
258
```
259
260
Access input stream properties:
261
```scala
262
val fileStream = ssc.textFileStream("/data/input")
263
println(s"Stream ID: ${fileStream.id}")
264
println(s"Stream name: ${fileStream.name}")
265
```
266
267
## File Stream Configuration
268
269
### File Monitoring Behavior
270
271
File streams monitor directories with these characteristics:
272
- Only files in the monitored directory (not subdirectories) are processed
273
- Files are processed based on modification time, not creation time
274
- Files must be written atomically (e.g., move operation) to be processed correctly
275
- File names should be consistent (lexicographically increasing) for best results
276
277
### File Processing Guarantees
278
279
- Each file is processed exactly once (assuming no failures)
280
- Files are processed in order of modification time
281
- Processing latency depends on batch interval and file discovery mechanism
282
283
Example atomic file writing pattern:
284
```scala
285
// Write to temporary file first
286
val tempFile = new File("/data/streaming-input/.temp-file")
287
writeDataToFile(tempFile, data)
288
289
// Atomically move to final location
290
val finalFile = new File("/data/streaming-input/data-file.txt")
291
tempFile.renameTo(finalFile)
292
```
293
294
## Input Stream Reliability
295
296
### Reliable Receivers
297
298
For fault-tolerant processing, use receivers that support write-ahead logs:
299
```scala
300
// Enable write-ahead logs in Spark configuration
301
val conf = new SparkConf()
302
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
303
.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
304
305
val ssc = new StreamingContext(conf, Seconds(5))
306
ssc.checkpoint("checkpoint-directory")
307
```
308
309
### Unreliable Receivers
310
311
Socket streams and some custom receivers do not support write-ahead logs and may lose data on failure. For production use cases requiring fault tolerance, prefer:
312
- Kafka integration (external library)
313
- File-based inputs with HDFS
314
- Custom receivers with reliable storage
315
- Message queue systems with acknowledgment