Scalable, high-throughput, fault-tolerant stream processing library for real-time data processing on Apache Spark
Input sources in Spark Streaming provide mechanisms for ingesting data from external systems. These sources create InputDStreams that continuously receive data and convert it into a stream of RDDs for processing.
Input streams that connect to TCP sockets to receive data.
/**
* Create text input stream from TCP socket
* @param hostname - Hostname to connect to
* @param port - Port number to connect to
* @param storageLevel - Storage level for received data (default: MEMORY_AND_DISK_SER_2)
* @returns ReceiverInputDStream of strings
*/
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]
/**
* Create binary input stream from TCP socket with custom converter
* @param hostname - Hostname to connect to
* @param port - Port number to connect to
* @param converter - Function to convert InputStream to Iterator[T]
* @param storageLevel - Storage level for received data
* @returns ReceiverInputDStream of converted type T
*/
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T]
/**
* Create raw TCP socket stream for binary data
* @param hostname - Hostname to connect to
* @param port - Port number to connect to
* @param storageLevel - Storage level for received data (default: MEMORY_AND_DISK_SER_2)
* @returns ReceiverInputDStream of byte arrays
*/
def rawSocketStream[T: ClassTag](
hostname: String,
port: Int,
storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[T]Usage Examples:
// Basic text socket stream
val lines = ssc.socketTextStream("localhost", 9999)
// Custom converter for JSON parsing
val jsonStream = ssc.socketStream(
"data-server", 8080,
(inputStream: InputStream) => {
scala.io.Source.fromInputStream(inputStream)
.getLines()
.map(parseJson)
},
StorageLevel.MEMORY_ONLY
)
// Raw socket for binary protocols
val binaryStream = ssc.rawSocketStream[Array[Byte]]("binary-server", 7777)Input streams that monitor file systems for new files and process them as they arrive.
/**
* Create input stream from text files in a directory
* @param directory - Directory path to monitor for new files
* @returns DStream of strings (file contents line by line)
*/
def textFileStream(directory: String): DStream[String]
/**
* Create input stream from binary files with fixed record length
* @param directory - Directory path to monitor for new files
* @param recordLength - Length of each record in bytes
* @returns DStream of byte arrays
*/
def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]
/**
* Create generic file input stream using Hadoop InputFormat
* @param directory - Directory path to monitor for new files
* @returns InputDStream of key-value pairs based on input format F
*/
def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](
directory: String
): InputDStream[(K, V)]
/**
* Create file input stream with custom key, value, and input format classes
* @param directory - Directory path to monitor
* @param keyClass - Class of keys
* @param valueClass - Class of values
* @param inputFormatClass - Hadoop InputFormat class
* @param filter - Function to filter files (optional)
* @param newFilesOnly - Whether to process only new files (default: true)
* @returns InputDStream of key-value pairs
*/
def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](
directory: String,
keyClass: Class[K],
valueClass: Class[V],
inputFormatClass: Class[F],
filter: Path => Boolean = null,
newFilesOnly: Boolean = true
): InputDStream[(K, V)]Usage Examples:
// Monitor directory for text files
val logFiles = ssc.textFileStream("/var/log/app")
// Process binary data files
val binaryData = ssc.binaryRecordsStream("/data/binary", 1024)
// Process Hadoop sequence files
val sequenceFiles = ssc.fileStream[Text, Text, SequenceFileInputFormat[Text, Text]]("/data/sequence")
// Custom file processing with filter
val csvFiles = ssc.fileStream[LongWritable, Text, TextInputFormat](
"/data/csv",
classOf[LongWritable],
classOf[Text],
classOf[TextInputFormat],
(path: Path) => path.getName.endsWith(".csv")
)Input streams created from queues of RDDs, useful for testing and programmatic data injection.
/**
* Create input stream from queue of RDDs
* @param queue - Queue containing RDDs to process
* @param oneAtATime - Whether to process one RDD per batch (default: true)
* @param defaultRDD - Default RDD when queue is empty (optional)
* @returns InputDStream of queue elements
*/
def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
oneAtATime: Boolean = true,
defaultRDD: RDD[T] = null
): InputDStream[T]Usage Examples:
import scala.collection.mutable.Queue
// Create queue and add RDDs
val rddQueue = Queue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)
// Add data to queue (typically done in another thread)
for (i <- 1 to 10) {
rddQueue += ssc.sparkContext.parallelize(1 to 100, 2)
}
// Process one RDD at a time
val singleRDDStream = ssc.queueStream(rddQueue, oneAtATime = true)Input streams using custom receiver implementations for specialized data sources.
/**
* Create input stream from custom receiver
* @param receiver - Custom receiver implementation extending Receiver[T]
* @returns ReceiverInputDStream from the receiver
*/
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]
/**
* Create input stream from multiple receivers (union of all receivers)
* @param receivers - Sequence of receivers to union
* @returns Combined ReceiverInputDStream from all receivers
*/
def union[T: ClassTag](receivers: Seq[ReceiverInputDStream[T]]): DStream[T]Custom Receiver Implementation:
abstract class Receiver[T](storageLevel: StorageLevel) extends Serializable {
/**
* Start the receiver - implement data receiving logic
*/
def onStart(): Unit
/**
* Stop the receiver - implement cleanup logic
*/
def onStop(): Unit
/**
* Store received data
* @param data - Single data item to store
*/
def store(data: T): Unit
/**
* Store multiple data items
* @param data - Iterator of data items to store
*/
def store(data: Iterator[T]): Unit
/**
* Report error to driver
* @param message - Error message
* @param throwable - Exception that occurred (optional)
*/
def reportError(message: String, throwable: Throwable = null): Unit
/**
* Check if receiver is stopped
* @returns true if receiver has been stopped
*/
def isStopped(): Boolean
}Usage Examples:
// Custom HTTP receiver
class HttpReceiver(url: String, storageLevel: StorageLevel)
extends Receiver[String](storageLevel) {
var httpClient: HttpClient = _
def onStart() {
httpClient = new HttpClient()
// Start background thread to poll HTTP endpoint
new Thread("Http Receiver") {
override def run() {
while (!isStopped()) {
val response = httpClient.get(url)
store(response.body)
Thread.sleep(1000)
}
}
}.start()
}
def onStop() {
if (httpClient != null) httpClient.close()
}
}
// Use custom receiver
val httpStream = ssc.receiverStream(new HttpReceiver("http://api.example.com/data", MEMORY_ONLY))Combine multiple input streams into a single stream.
/**
* Union multiple DStreams of the same type
* @param streams - Sequence of DStreams to union
* @returns Single DStream containing data from all input streams
*/
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]Usage Examples:
// Combine multiple socket streams
val stream1 = ssc.socketTextStream("host1", 9999)
val stream2 = ssc.socketTextStream("host2", 9999)
val stream3 = ssc.socketTextStream("host3", 9999)
val combinedStream = ssc.union(Seq(stream1, stream2, stream3))
// Combine different types of streams
val fileStream = ssc.textFileStream("/data/logs")
val socketStream = ssc.socketTextStream("localhost", 9999)
val allLogs = ssc.union(Seq(fileStream, socketStream))Base class for receiver-based input streams that actively receive data.
abstract class ReceiverInputDStream[T: ClassTag](ssc: StreamingContext)
extends InputDStream[T](ssc) {
/**
* Get the receiver for this input stream
* @returns Receiver instance used by this stream
*/
def getReceiver(): Receiver[T]
}Base class for all input streams.
abstract class InputDStream[T: ClassTag](ssc: StreamingContext)
extends DStream[T](ssc) {
/**
* Start the input stream (called automatically by StreamingContext)
*/
def start(): Unit
/**
* Stop the input stream (called automatically by StreamingContext)
*/
def stop(): Unit
}Constants for controlling how received data is stored and replicated.
import org.apache.spark.storage.StorageLevel
// Common storage levels for input streams
StorageLevel.MEMORY_ONLY // Store in memory only
StorageLevel.MEMORY_ONLY_2 // Store in memory, replicated 2x
StorageLevel.MEMORY_AND_DISK // Store in memory, spill to disk
StorageLevel.MEMORY_AND_DISK_2 // Store in memory and disk, replicated 2x
StorageLevel.MEMORY_AND_DISK_SER // Store serialized in memory, spill to disk
StorageLevel.MEMORY_AND_DISK_SER_2 // Store serialized, replicated 2x (default)Choosing Storage Levels:
// High performance, risk of data loss
val fastStream = ssc.socketTextStream("host", 9999, StorageLevel.MEMORY_ONLY)
// Fault tolerant, slower
val reliableStream = ssc.socketTextStream("host", 9999, StorageLevel.MEMORY_AND_DISK_SER_2)
// Memory efficient
val compactStream = ssc.socketTextStream("host", 9999, StorageLevel.MEMORY_AND_DISK_SER)File streams have specific behavior for monitoring directories:
Receivers can be reliable or unreliable:
// Reliable receiver pattern
class ReliableReceiver extends Receiver[String](MEMORY_AND_DISK_2) {
def onStart() {
// Implementation that can replay data from last acknowledged offset
}
}
// Unreliable receiver (simpler but less fault-tolerant)
class SimpleReceiver extends Receiver[String](MEMORY_ONLY) {
def onStart() {
// Simple implementation, data may be lost on failure
}
}Control the rate of data ingestion to prevent overwhelming the system:
// Enable backpressure (automatically adjusts receiving rate)
val conf = new SparkConf()
.set("spark.streaming.backpressure.enabled", "true")
.set("spark.streaming.backpressure.initialRate", "1000")
// Set maximum rate for receivers
val conf2 = new SparkConf()
.set("spark.streaming.receiver.maxRate", "1000")Configuration Examples:
// Production configuration for reliable file processing
val conf = new SparkConf()
.setAppName("LogProcessor")
.set("spark.streaming.stopGracefullyOnShutdown", "true")
.set("spark.streaming.backpressure.enabled", "true")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("hdfs://checkpoint")
val logStream = ssc.textFileStream("/var/log/app")
.filter(_.contains("ERROR"))
.cache()
logStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
rdd.saveAsTextFile(s"hdfs://processed/${System.currentTimeMillis()}")
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-2-11