0
# Input Sources
1
2
Data ingestion capabilities for reading from various external sources including files, sockets, message queues, and custom receivers.
3
4
## Capabilities
5
6
### Socket Streams
7
8
Create DStreams from TCP socket connections for real-time text or binary data ingestion.
9
10
```scala { .api }
11
/**
12
* Create a DStream from TCP socket text stream
13
* @param hostname - Host to connect to
14
* @param port - Port number
15
* @param storageLevel - Storage level for received data
16
* @return DStream of strings from socket
17
*/
18
def socketTextStream(
19
hostname: String,
20
port: Int,
21
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
22
): DStream[String]
23
24
/**
25
* Create a DStream from TCP socket with custom converter
26
* @param hostname - Host to connect to
27
* @param port - Port number
28
* @param converter - Function to convert bytes to objects
29
* @param storageLevel - Storage level for received data
30
*/
31
def socketStream[T](
32
hostname: String,
33
port: Int,
34
converter: (InputStream) => Iterator[T],
35
storageLevel: StorageLevel
36
): DStream[T]
37
38
/**
39
* Create a DStream of raw bytes from TCP socket
40
*/
41
def rawSocketStream[T](
42
hostname: String,
43
port: Int,
44
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
45
): DStream[T]
46
```
47
48
**Usage Examples:**
49
50
```scala
51
// Basic text stream from socket
52
val lines = ssc.socketTextStream("localhost", 9999)
53
54
// Custom object stream with converter
55
val events = ssc.socketStream(
56
"event-server", 8080,
57
(inputStream: InputStream) => {
58
val reader = new BufferedReader(new InputStreamReader(inputStream))
59
Iterator.continually(reader.readLine()).takeWhile(_ != null)
60
.map(line => parseEvent(line))
61
},
62
StorageLevel.MEMORY_AND_DISK
63
)
64
```
65
66
### File Streams
67
68
Monitor file systems and create DStreams from new files appearing in directories.
69
70
```scala { .api }
71
/**
72
* Monitor directory for new text files
73
* @param directory - Directory to monitor
74
* @return DStream of file contents as strings
75
*/
76
def textFileStream(directory: String): DStream[String]
77
78
/**
79
* Monitor directory for files of specific format
80
* @param directory - Directory to monitor
81
* @param filter - Optional file filter function
82
* @param newFilesOnly - Process only new files vs all files
83
*/
84
def fileStream[K, V, F <: NewInputFormat[K, V]](
85
directory: String,
86
filter: Path => Boolean = acceptAllFiles,
87
newFilesOnly: Boolean = true
88
): DStream[(K, V)]
89
90
/**
91
* Read binary records from files
92
* @param directory - Directory to monitor
93
* @param recordLength - Length of each binary record
94
*/
95
def binaryRecordsStream(
96
directory: String,
97
recordLength: Int
98
): DStream[Array[Byte]]
99
```
100
101
**Usage Examples:**
102
103
```scala
104
// Monitor directory for text files
105
val logs = ssc.textFileStream("hdfs://namenode/logs")
106
107
// Monitor for JSON files with custom processing
108
val jsonFiles = ssc.fileStream[LongWritable, Text, TextInputFormat](
109
"hdfs://namenode/json-data",
110
(path: Path) => path.getName.endsWith(".json"),
111
newFilesOnly = true
112
)
113
114
// Process binary log files
115
val binaryLogs = ssc.binaryRecordsStream("hdfs://namenode/binary-logs", 1024)
116
```
117
118
### Queue Streams
119
120
Create DStreams from queues of RDDs, primarily used for testing and debugging streaming applications.
121
122
```scala { .api }
123
/**
124
* Create a DStream from a queue of RDDs
125
* @param queue - Queue containing RDDs to process
126
* @param oneAtATime - Process one RDD per batch vs all available
127
* @param defaultRDD - Default RDD when queue is empty
128
*/
129
def queueStream[T](
130
queue: Queue[RDD[T]],
131
oneAtATime: Boolean = true,
132
defaultRDD: RDD[T] = null
133
): DStream[T]
134
```
135
136
**Usage Examples:**
137
138
```scala
139
import scala.collection.mutable.Queue
140
141
// Create test data queue
142
val rddQueue = new Queue[RDD[Int]]()
143
val queueStream = ssc.queueStream(rddQueue)
144
145
// Add test RDDs to queue
146
for (i <- 1 to 10) {
147
rddQueue += ssc.sparkContext.parallelize(Seq(i, i+1, i+2))
148
}
149
150
// Process the queue stream
151
queueStream.map(_ * 2).print()
152
```
153
154
### Receiver-based Streams
155
156
Framework for creating custom receivers to ingest data from external sources.
157
158
```scala { .api }
159
/**
160
* Create input stream using a custom receiver
161
* @param receiver - Custom receiver implementation
162
*/
163
def receiverStream[T](receiver: Receiver[T]): DStream[T]
164
165
/**
166
* Abstract base class for custom receivers
167
*/
168
abstract class Receiver[T](storageLevel: StorageLevel) extends Serializable {
169
170
/** Called when receiver is started */
171
def onStart(): Unit
172
173
/** Called when receiver is stopped */
174
def onStop(): Unit
175
176
/** Store single data item */
177
def store(dataItem: T): Unit
178
179
/** Store multiple data items */
180
def store(dataBuffer: ArrayBuffer[T]): Unit
181
182
/** Store data from iterator */
183
def store(dataIterator: Iterator[T]): Unit
184
185
/** Store raw bytes */
186
def store(bytes: ByteBuffer): Unit
187
188
/** Stop receiver with message */
189
def stop(message: String): Unit
190
191
/** Restart receiver with message */
192
def restart(message: String): Unit
193
194
/** Report error */
195
def reportError(message: String, throwable: Throwable): Unit
196
197
/** Check if receiver is started */
198
def isStarted(): Boolean
199
200
/** Check if receiver is stopped */
201
def isStopped(): Boolean
202
203
/** Preferred location for receiver */
204
def preferredLocation: Option[String]
205
}
206
```
207
208
**Usage Examples:**
209
210
```scala
211
// Custom receiver for external API
212
class ApiReceiver(apiUrl: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
213
private var client: ApiClient = _
214
215
def onStart(): Unit = {
216
client = new ApiClient(apiUrl)
217
// Start background thread to fetch data
218
new Thread(() => {
219
while (!isStopped()) {
220
try {
221
val data = client.fetchData()
222
store(data)
223
Thread.sleep(1000)
224
} catch {
225
case e: Exception => reportError("API fetch failed", e)
226
}
227
}
228
}).start()
229
}
230
231
def onStop(): Unit = {
232
if (client != null) client.close()
233
}
234
}
235
236
// Use custom receiver
237
val apiStream = ssc.receiverStream(new ApiReceiver("https://api.example.com/stream"))
238
```
239
240
### Input Stream Classes
241
242
Base classes and implementations for different types of input streams.
243
244
```scala { .api }
245
/**
246
* Base class for all input streams
247
*/
248
abstract class InputDStream[T](ssc: StreamingContext) extends DStream[T] {
249
250
/** Duration between batches */
251
override def slideDuration: Duration = ssc.graph.batchDuration
252
253
/** Input streams have no dependencies */
254
override def dependencies: List[DStream[_]] = List()
255
256
/** Compute RDD for given time */
257
override def compute(validTime: Time): Option[RDD[T]]
258
}
259
260
/**
261
* Base class for receiver-based input streams
262
*/
263
abstract class ReceiverInputDStream[T](ssc: StreamingContext) extends InputDStream[T](ssc) {
264
265
/** Get the receiver for this input stream */
266
def getReceiver(): Receiver[T]
267
268
/** Start the receiver */
269
def start(): Unit
270
271
/** Stop the receiver */
272
def stop(): Unit
273
}
274
275
/**
276
* Input stream for file monitoring
277
*/
278
class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
279
ssc: StreamingContext,
280
directory: String,
281
filter: Path => Boolean,
282
newFilesOnly: Boolean
283
) extends InputDStream[(K, V)](ssc)
284
285
/**
286
* Input stream for socket connections
287
*/
288
class SocketInputDStream[T](
289
ssc: StreamingContext,
290
host: String,
291
port: Int,
292
converter: (InputStream) => Iterator[T],
293
storageLevel: StorageLevel
294
) extends ReceiverInputDStream[T](ssc)
295
296
/**
297
* Input stream from RDD queue (for testing)
298
*/
299
class QueueInputDStream[T](
300
ssc: StreamingContext,
301
queue: Queue[RDD[T]],
302
oneAtATime: Boolean,
303
defaultRDD: RDD[T]
304
) extends InputDStream[T](ssc)
305
306
/**
307
* Input stream that generates constant RDD
308
*/
309
class ConstantInputDStream[T](
310
ssc: StreamingContext,
311
rdd: RDD[T]
312
) extends InputDStream[T](ssc)
313
```
314
315
## External Source Integration
316
317
Spark Streaming integrates with many external sources through additional libraries:
318
319
- **Kafka**: `spark-streaming-kafka` for Apache Kafka integration
320
- **Flume**: `spark-streaming-flume` for Apache Flume integration
321
- **Kinesis**: `spark-streaming-kinesis-asl` for Amazon Kinesis
322
- **Twitter**: `spark-streaming-twitter` for Twitter API
323
- **MQTT**: `spark-streaming-mqtt` for MQTT message brokers
324
- **ZeroMQ**: `spark-streaming-zeromq` for ZeroMQ messaging
325
326
Each integration provides specialized input stream methods added to StreamingContext through implicit conversions.