0
# Input Sources and Data Ingestion
1
2
Comprehensive data ingestion capabilities supporting various input sources including files, sockets, queues, and custom receivers. Provides both receiver-based and direct stream approaches for fault-tolerant data consumption.
3
4
## Capabilities
5
6
### File-based Input Streams
7
8
Monitor directories for new files and process them as streaming data.
9
10
```scala { .api }
11
class StreamingContext {
12
/** Monitor directory for new text files */
13
def textFileStream(directory: String): DStream[String]
14
15
/** Monitor directory for files using Hadoop InputFormat */
16
def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): InputDStream[(K, V)]
17
18
/** Monitor directory with custom key/value/format and filtering function */
19
def fileStream[K, V, F <: NewInputFormat[K, V]](
20
directory: String,
21
filter: Path => Boolean,
22
newFilesOnly: Boolean
23
): InputDStream[(K, V)]
24
25
/** Monitor directory with additional configuration */
26
def fileStream[K, V, F <: NewInputFormat[K, V]](
27
directory: String,
28
filter: Path => Boolean,
29
newFilesOnly: Boolean,
30
conf: Configuration
31
): InputDStream[(K, V)]
32
33
/** Monitor directory for binary records with fixed length */
34
def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]
35
}
36
```
37
38
**Usage Examples:**
39
40
```scala
41
import org.apache.spark.streaming._
42
43
// Monitor directory for text files
44
val lines = ssc.textFileStream("/path/to/text/files")
45
46
// Monitor for JSON files using Hadoop TextInputFormat
47
val jsonFiles = ssc.fileStream[LongWritable, Text, TextInputFormat]("/path/to/json")
48
.map(_._2.toString)
49
50
// Monitor with custom filter for CSV files only
51
val csvFiles = ssc.fileStream[LongWritable, Text, TextInputFormat](
52
"/path/to/files",
53
(path: Path) => path.getName.endsWith(".csv"),
54
newFilesOnly = true
55
)
56
57
// Binary data with fixed record length
58
val binaryData = ssc.binaryRecordsStream("/path/to/binary", recordLength = 1024)
59
```
60
61
### Socket-based Input Streams
62
63
Receive streaming data from TCP socket connections.
64
65
```scala { .api }
66
class StreamingContext {
67
/** Receive text data from TCP socket */
68
def socketTextStream(hostname: String, port: Int): ReceiverInputDStream[String]
69
70
/** Receive text with custom storage level */
71
def socketTextStream(
72
hostname: String,
73
port: Int,
74
storageLevel: StorageLevel
75
): ReceiverInputDStream[String]
76
77
/** Receive data with custom converter function */
78
def socketStream[T](
79
hostname: String,
80
port: Int,
81
converter: (InputStream) => Iterator[T],
82
storageLevel: StorageLevel
83
): ReceiverInputDStream[T]
84
85
/** Receive raw bytes from socket */
86
def rawSocketStream[T](
87
hostname: String,
88
port: Int,
89
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
90
): ReceiverInputDStream[T]
91
}
92
```
93
94
**Usage Examples:**
95
96
```scala
97
// Basic text socket stream
98
val socketLines = ssc.socketTextStream("localhost", 9999)
99
100
// Socket stream with persistence configuration
101
val persistentSocket = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_ONLY)
102
103
// Custom data format from socket
104
val customSocket = ssc.socketStream[MyData](
105
"localhost",
106
8888,
107
inputStream => {
108
// Custom deserialization logic
109
val buffer = new Array[Byte](1024)
110
Iterator.continually {
111
inputStream.read(buffer)
112
MyData.deserialize(buffer)
113
}.takeWhile(_ != null)
114
},
115
StorageLevel.MEMORY_ONLY
116
)
117
```
118
119
### Queue-based Input Streams
120
121
Create DStreams from a queue of RDDs, useful for testing and controlled data injection.
122
123
```scala { .api }
124
class StreamingContext {
125
/** Create DStream from queue of RDDs */
126
def queueStream[T](queue: Queue[RDD[T]]): InputDStream[T]
127
128
/** Create with oneAtATime processing flag */
129
def queueStream[T](queue: Queue[RDD[T]], oneAtATime: Boolean): InputDStream[T]
130
131
/** Create with default RDD for empty batches */
132
def queueStream[T](
133
queue: Queue[RDD[T]],
134
oneAtATime: Boolean,
135
defaultRDD: RDD[T]
136
): InputDStream[T]
137
}
138
```
139
140
**Usage Examples:**
141
142
```scala
143
import scala.collection.mutable.Queue
144
145
// Create queue and RDDs
146
val rddQueue = new Queue[RDD[Int]]()
147
val rdd1 = ssc.sparkContext.parallelize(1 to 100)
148
val rdd2 = ssc.sparkContext.parallelize(101 to 200)
149
150
rddQueue.enqueue(rdd1, rdd2)
151
152
// Create queue stream
153
val queueStream = ssc.queueStream(rddQueue)
154
155
// Process one RDD at a time
156
val orderedStream = ssc.queueStream(rddQueue, oneAtATime = true)
157
158
// With default RDD for empty periods
159
val defaultRDD = ssc.sparkContext.parallelize(Seq(0))
160
val streamWithDefault = ssc.queueStream(rddQueue, oneAtATime = false, defaultRDD)
161
```
162
163
### Custom Receiver Input Streams
164
165
Create DStreams using custom Receiver implementations for specialized data sources.
166
167
```scala { .api }
168
class StreamingContext {
169
/** Create DStream from custom Receiver */
170
def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T]
171
}
172
173
abstract class Receiver[T](storageLevel: StorageLevel) extends Serializable {
174
/** Called when receiver is started */
175
def onStart(): Unit
176
177
/** Called when receiver is stopped */
178
def onStop(): Unit
179
180
/** Store single data item */
181
def store(dataItem: T): Unit
182
183
/** Store collection of data items */
184
def store(dataBuffer: ArrayBuffer[T]): Unit
185
186
/** Store iterator of data items */
187
def store(dataIterator: Iterator[T]): Unit
188
189
/** Store raw bytes */
190
def store(bytes: ByteBuffer): Unit
191
192
/** Store with metadata */
193
def store(bytes: ByteBuffer, metadata: Any): Unit
194
195
/** Restart receiver with message */
196
def restart(message: String): Unit
197
198
/** Restart with message and error */
199
def restart(message: String, error: Throwable): Unit
200
201
/** Stop receiver with message */
202
def stop(message: String): Unit
203
204
/** Stop with message and error */
205
def stop(message: String, error: Throwable): Unit
206
207
/** Report error */
208
def reportError(message: String, throwable: Throwable): Unit
209
210
/** Check if receiver is started */
211
def isStarted(): Boolean
212
213
/** Check if receiver is stopped */
214
def isStopped(): Boolean
215
216
/** Preferred execution location */
217
def preferredLocation: Option[String]
218
219
/** Associated stream ID */
220
def streamId: Int
221
}
222
```
223
224
**Usage Examples:**
225
226
```scala
227
import java.net._
228
import java.io._
229
230
// Custom receiver for UDP data
231
class UDPReceiver(port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
232
private var socket: DatagramSocket = _
233
234
def onStart(): Unit = {
235
new Thread("UDP Receiver") {
236
override def run(): Unit = receive()
237
}.start()
238
}
239
240
def onStop(): Unit = {
241
if (socket != null) {
242
socket.close()
243
socket = null
244
}
245
}
246
247
private def receive(): Unit = {
248
try {
249
socket = new DatagramSocket(port)
250
val buffer = new Array[Byte](1024)
251
252
while (!isStopped()) {
253
val packet = new DatagramPacket(buffer, buffer.length)
254
socket.receive(packet)
255
val data = new String(packet.getData, 0, packet.getLength)
256
store(data)
257
}
258
} catch {
259
case e: Exception if !isStopped() =>
260
restart("Error receiving data", e)
261
}
262
}
263
}
264
265
// Use custom receiver
266
val udpStream = ssc.receiverStream(new UDPReceiver(9999))
267
268
// Custom receiver with batching
269
class BatchedReceiver[T](batchSize: Int) extends Receiver[T](StorageLevel.MEMORY_ONLY) {
270
private val buffer = new ArrayBuffer[T]()
271
272
def onStart(): Unit = {
273
// Implementation
274
}
275
276
def onStop(): Unit = {
277
// Flush remaining data
278
if (buffer.nonEmpty) {
279
store(buffer)
280
buffer.clear()
281
}
282
}
283
284
private def addData(item: T): Unit = {
285
buffer += item
286
if (buffer.size >= batchSize) {
287
store(buffer.toArray)
288
buffer.clear()
289
}
290
}
291
}
292
```
293
294
### Union Operations
295
296
Combine multiple input streams into a single DStream.
297
298
```scala { .api }
299
class StreamingContext {
300
/** Union multiple DStreams */
301
def union[T](streams: Seq[DStream[T]]): DStream[T]
302
}
303
```
304
305
**Usage Examples:**
306
307
```scala
308
// Multiple input sources
309
val fileStream = ssc.textFileStream("/path/to/files")
310
val socketStream = ssc.socketTextStream("localhost", 9999)
311
val queueStream = ssc.queueStream(rddQueue)
312
313
// Union all streams
314
val combinedStream = ssc.union(Seq(fileStream, socketStream, queueStream))
315
316
// Process combined data
317
combinedStream.foreachRDD { rdd =>
318
println(s"Combined batch size: ${rdd.count()}")
319
}
320
```
321
322
### Input Stream Base Classes
323
324
Base abstractions for implementing custom input streams.
325
326
```scala { .api }
327
/**
328
* Base class for input streams that receive data into Spark Streaming
329
*/
330
abstract class InputDStream[T](ssc: StreamingContext) extends DStream[T](ssc) {
331
/** Start the stream */
332
def start(): Unit
333
334
/** Stop the stream */
335
def stop(): Unit
336
}
337
338
/**
339
* Input streams that use receivers to receive data
340
*/
341
abstract class ReceiverInputDStream[T](ssc: StreamingContext) extends InputDStream[T](ssc) {
342
/** Get receiver for this input stream */
343
def getReceiver(): Receiver[T]
344
}
345
346
/**
347
* File-based input stream implementation
348
*/
349
class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
350
ssc: StreamingContext,
351
directory: String,
352
filter: Path => Boolean,
353
newFilesOnly: Boolean,
354
conf: Option[Configuration]
355
) extends InputDStream[(K, V)](ssc)
356
357
/**
358
* Socket-based input stream implementation
359
*/
360
class SocketInputDStream[T](
361
ssc: StreamingContext,
362
host: String,
363
port: Int,
364
converter: (InputStream) => Iterator[T],
365
storageLevel: StorageLevel
366
) extends ReceiverInputDStream[T](ssc)
367
368
/**
369
* Queue-based input stream implementation
370
*/
371
class QueueInputDStream[T](
372
ssc: StreamingContext,
373
queue: Queue[RDD[T]],
374
oneAtATime: Boolean,
375
defaultRDD: Option[RDD[T]]
376
) extends InputDStream[T](ssc)
377
```