Input sources in Spark Streaming provide mechanisms for ingesting data from external systems. These sources create InputDStreams that continuously receive data and convert it into a stream of RDDs for processing.
Input streams that connect to TCP sockets to receive data.
/**
* Create text input stream from TCP socket
* @param hostname - Hostname to connect to
* @param port - Port number to connect to
* @param storageLevel - Storage level for received data (default: MEMORY_AND_DISK_SER_2)
* @returns ReceiverInputDStream of strings
*/
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]
/**
* Create binary input stream from TCP socket with custom converter
* @param hostname - Hostname to connect to
* @param port - Port number to connect to
* @param converter - Function to convert InputStream to Iterator[T]
* @param storageLevel - Storage level for received data
* @returns ReceiverInputDStream of converted type T
*/
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T]
/**
* Create raw TCP socket stream for binary data
* @param hostname - Hostname to connect to
* @param port - Port number to connect to
* @param storageLevel - Storage level for received data (default: MEMORY_AND_DISK_SER_2)
* @returns ReceiverInputDStream of byte arrays
*/
def rawSocketStream[T: ClassTag](
hostname: String,
port: Int,
storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[T]Usage Examples:
// Basic text socket stream
val lines = ssc.socketTextStream("localhost", 9999)
// Custom converter for JSON parsing
val jsonStream = ssc.socketStream(
"data-server", 8080,
(inputStream: InputStream) => {
scala.io.Source.fromInputStream(inputStream)
.getLines()
.map(parseJson)
},
StorageLevel.MEMORY_ONLY
)
// Raw socket for binary protocols
val binaryStream = ssc.rawSocketStream[Array[Byte]]("binary-server", 7777)Input streams that monitor file systems for new files and process them as they arrive.
/**
* Create input stream from text files in a directory
* @param directory - Directory path to monitor for new files
* @returns DStream of strings (file contents line by line)
*/
def textFileStream(directory: String): DStream[String]
/**
* Create input stream from binary files with fixed record length
* @param directory - Directory path to monitor for new files
* @param recordLength - Length of each record in bytes
* @returns DStream of byte arrays
*/
def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]
/**
* Create generic file input stream using Hadoop InputFormat
* @param directory - Directory path to monitor for new files
* @returns InputDStream of key-value pairs based on input format F
*/
def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](
directory: String
): InputDStream[(K, V)]
/**
* Create file input stream with custom key, value, and input format classes
* @param directory - Directory path to monitor
* @param keyClass - Class of keys
* @param valueClass - Class of values
* @param inputFormatClass - Hadoop InputFormat class
* @param filter - Function to filter files (optional)
* @param newFilesOnly - Whether to process only new files (default: true)
* @returns InputDStream of key-value pairs
*/
def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](
directory: String,
keyClass: Class[K],
valueClass: Class[V],
inputFormatClass: Class[F],
filter: Path => Boolean = null,
newFilesOnly: Boolean = true
): InputDStream[(K, V)]Usage Examples:
// Monitor directory for text files
val logFiles = ssc.textFileStream("/var/log/app")
// Process binary data files
val binaryData = ssc.binaryRecordsStream("/data/binary", 1024)
// Process Hadoop sequence files
val sequenceFiles = ssc.fileStream[Text, Text, SequenceFileInputFormat[Text, Text]]("/data/sequence")
// Custom file processing with filter
val csvFiles = ssc.fileStream[LongWritable, Text, TextInputFormat](
"/data/csv",
classOf[LongWritable],
classOf[Text],
classOf[TextInputFormat],
(path: Path) => path.getName.endsWith(".csv")
)Input streams created from queues of RDDs, useful for testing and programmatic data injection.
/**
* Create input stream from queue of RDDs
* @param queue - Queue containing RDDs to process
* @param oneAtATime - Whether to process one RDD per batch (default: true)
* @param defaultRDD - Default RDD when queue is empty (optional)
* @returns InputDStream of queue elements
*/
def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
oneAtATime: Boolean = true,
defaultRDD: RDD[T] = null
): InputDStream[T]Usage Examples:
import scala.collection.mutable.Queue
// Create queue and add RDDs
val rddQueue = Queue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)
// Add data to queue (typically done in another thread)
for (i <- 1 to 10) {
rddQueue += ssc.sparkContext.parallelize(1 to 100, 2)
}
// Process one RDD at a time
val singleRDDStream = ssc.queueStream(rddQueue, oneAtATime = true)Input streams using custom receiver implementations for specialized data sources.
/**
* Create input stream from custom receiver
* @param receiver - Custom receiver implementation extending Receiver[T]
* @returns ReceiverInputDStream from the receiver
*/
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]
/**
* Create input stream from multiple receivers (union of all receivers)
* @param receivers - Sequence of receivers to union
* @returns Combined ReceiverInputDStream from all receivers
*/
def union[T: ClassTag](receivers: Seq[ReceiverInputDStream[T]]): DStream[T]Custom Receiver Implementation:
abstract class Receiver[T](storageLevel: StorageLevel) extends Serializable {
/**
* Start the receiver - implement data receiving logic
*/
def onStart(): Unit
/**
* Stop the receiver - implement cleanup logic
*/
def onStop(): Unit
/**
* Store received data
* @param data - Single data item to store
*/
def store(data: T): Unit
/**
* Store multiple data items
* @param data - Iterator of data items to store
*/
def store(data: Iterator[T]): Unit
/**
* Report error to driver
* @param message - Error message
* @param throwable - Exception that occurred (optional)
*/
def reportError(message: String, throwable: Throwable = null): Unit
/**
* Check if receiver is stopped
* @returns true if receiver has been stopped
*/
def isStopped(): Boolean
}Usage Examples:
// Custom HTTP receiver
class HttpReceiver(url: String, storageLevel: StorageLevel)
extends Receiver[String](storageLevel) {
var httpClient: HttpClient = _
def onStart() {
httpClient = new HttpClient()
// Start background thread to poll HTTP endpoint
new Thread("Http Receiver") {
override def run() {
while (!isStopped()) {
val response = httpClient.get(url)
store(response.body)
Thread.sleep(1000)
}
}
}.start()
}
def onStop() {
if (httpClient != null) httpClient.close()
}
}
// Use custom receiver
val httpStream = ssc.receiverStream(new HttpReceiver("http://api.example.com/data", MEMORY_ONLY))Combine multiple input streams into a single stream.
/**
* Union multiple DStreams of the same type
* @param streams - Sequence of DStreams to union
* @returns Single DStream containing data from all input streams
*/
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]Usage Examples:
// Combine multiple socket streams
val stream1 = ssc.socketTextStream("host1", 9999)
val stream2 = ssc.socketTextStream("host2", 9999)
val stream3 = ssc.socketTextStream("host3", 9999)
val combinedStream = ssc.union(Seq(stream1, stream2, stream3))
// Combine different types of streams
val fileStream = ssc.textFileStream("/data/logs")
val socketStream = ssc.socketTextStream("localhost", 9999)
val allLogs = ssc.union(Seq(fileStream, socketStream))Base class for receiver-based input streams that actively receive data.
abstract class ReceiverInputDStream[T: ClassTag](ssc: StreamingContext)
extends InputDStream[T](ssc) {
/**
* Get the receiver for this input stream
* @returns Receiver instance used by this stream
*/
def getReceiver(): Receiver[T]
}Base class for all input streams.
abstract class InputDStream[T: ClassTag](ssc: StreamingContext)
extends DStream[T](ssc) {
/**
* Start the input stream (called automatically by StreamingContext)
*/
def start(): Unit
/**
* Stop the input stream (called automatically by StreamingContext)
*/
def stop(): Unit
}Constants for controlling how received data is stored and replicated.
import org.apache.spark.storage.StorageLevel
// Common storage levels for input streams
StorageLevel.MEMORY_ONLY // Store in memory only
StorageLevel.MEMORY_ONLY_2 // Store in memory, replicated 2x
StorageLevel.MEMORY_AND_DISK // Store in memory, spill to disk
StorageLevel.MEMORY_AND_DISK_2 // Store in memory and disk, replicated 2x
StorageLevel.MEMORY_AND_DISK_SER // Store serialized in memory, spill to disk
StorageLevel.MEMORY_AND_DISK_SER_2 // Store serialized, replicated 2x (default)Choosing Storage Levels:
// High performance, risk of data loss
val fastStream = ssc.socketTextStream("host", 9999, StorageLevel.MEMORY_ONLY)
// Fault tolerant, slower
val reliableStream = ssc.socketTextStream("host", 9999, StorageLevel.MEMORY_AND_DISK_SER_2)
// Memory efficient
val compactStream = ssc.socketTextStream("host", 9999, StorageLevel.MEMORY_AND_DISK_SER)File streams have specific behavior for monitoring directories:
Receivers can be reliable or unreliable:
// Reliable receiver pattern
class ReliableReceiver extends Receiver[String](MEMORY_AND_DISK_2) {
def onStart() {
// Implementation that can replay data from last acknowledged offset
}
}
// Unreliable receiver (simpler but less fault-tolerant)
class SimpleReceiver extends Receiver[String](MEMORY_ONLY) {
def onStart() {
// Simple implementation, data may be lost on failure
}
}Control the rate of data ingestion to prevent overwhelming the system:
// Enable backpressure (automatically adjusts receiving rate)
val conf = new SparkConf()
.set("spark.streaming.backpressure.enabled", "true")
.set("spark.streaming.backpressure.initialRate", "1000")
// Set maximum rate for receivers
val conf2 = new SparkConf()
.set("spark.streaming.receiver.maxRate", "1000")Configuration Examples:
// Production configuration for reliable file processing
val conf = new SparkConf()
.setAppName("LogProcessor")
.set("spark.streaming.stopGracefullyOnShutdown", "true")
.set("spark.streaming.backpressure.enabled", "true")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("hdfs://checkpoint")
val logStream = ssc.textFileStream("/var/log/app")
.filter(_.contains("ERROR"))
.cache()
logStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
rdd.saveAsTextFile(s"hdfs://processed/${System.currentTimeMillis()}")
}
}