Data ingestion capabilities for reading from various external sources including files, sockets, message queues, and custom receivers.
Create DStreams from TCP socket connections for real-time text or binary data ingestion.
/**
* Create a DStream from TCP socket text stream
* @param hostname - Host to connect to
* @param port - Port number
* @param storageLevel - Storage level for received data
* @return DStream of strings from socket
*/
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[String]
/**
* Create a DStream from TCP socket with custom converter
* @param hostname - Host to connect to
* @param port - Port number
* @param converter - Function to convert bytes to objects
* @param storageLevel - Storage level for received data
*/
def socketStream[T](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): DStream[T]
/**
* Create a DStream of raw bytes from TCP socket
*/
def rawSocketStream[T](
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[T]Usage Examples:
// Basic text stream from socket
val lines = ssc.socketTextStream("localhost", 9999)
// Custom object stream with converter
val events = ssc.socketStream(
"event-server", 8080,
(inputStream: InputStream) => {
val reader = new BufferedReader(new InputStreamReader(inputStream))
Iterator.continually(reader.readLine()).takeWhile(_ != null)
.map(line => parseEvent(line))
},
StorageLevel.MEMORY_AND_DISK
)Monitor file systems and create DStreams from new files appearing in directories.
/**
* Monitor directory for new text files
* @param directory - Directory to monitor
* @return DStream of file contents as strings
*/
def textFileStream(directory: String): DStream[String]
/**
* Monitor directory for files of specific format
* @param directory - Directory to monitor
* @param filter - Optional file filter function
* @param newFilesOnly - Process only new files vs all files
*/
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String,
filter: Path => Boolean = acceptAllFiles,
newFilesOnly: Boolean = true
): DStream[(K, V)]
/**
* Read binary records from files
* @param directory - Directory to monitor
* @param recordLength - Length of each binary record
*/
def binaryRecordsStream(
directory: String,
recordLength: Int
): DStream[Array[Byte]]Usage Examples:
// Monitor directory for text files
val logs = ssc.textFileStream("hdfs://namenode/logs")
// Monitor for JSON files with custom processing
val jsonFiles = ssc.fileStream[LongWritable, Text, TextInputFormat](
"hdfs://namenode/json-data",
(path: Path) => path.getName.endsWith(".json"),
newFilesOnly = true
)
// Process binary log files
val binaryLogs = ssc.binaryRecordsStream("hdfs://namenode/binary-logs", 1024)Create DStreams from queues of RDDs, primarily used for testing and debugging streaming applications.
/**
* Create a DStream from a queue of RDDs
* @param queue - Queue containing RDDs to process
* @param oneAtATime - Process one RDD per batch vs all available
* @param defaultRDD - Default RDD when queue is empty
*/
def queueStream[T](
queue: Queue[RDD[T]],
oneAtATime: Boolean = true,
defaultRDD: RDD[T] = null
): DStream[T]Usage Examples:
import scala.collection.mutable.Queue
// Create test data queue
val rddQueue = new Queue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)
// Add test RDDs to queue
for (i <- 1 to 10) {
rddQueue += ssc.sparkContext.parallelize(Seq(i, i+1, i+2))
}
// Process the queue stream
queueStream.map(_ * 2).print()Framework for creating custom receivers to ingest data from external sources.
/**
* Create input stream using a custom receiver
* @param receiver - Custom receiver implementation
*/
def receiverStream[T](receiver: Receiver[T]): DStream[T]
/**
* Abstract base class for custom receivers
*/
abstract class Receiver[T](storageLevel: StorageLevel) extends Serializable {
/** Called when receiver is started */
def onStart(): Unit
/** Called when receiver is stopped */
def onStop(): Unit
/** Store single data item */
def store(dataItem: T): Unit
/** Store multiple data items */
def store(dataBuffer: ArrayBuffer[T]): Unit
/** Store data from iterator */
def store(dataIterator: Iterator[T]): Unit
/** Store raw bytes */
def store(bytes: ByteBuffer): Unit
/** Stop receiver with message */
def stop(message: String): Unit
/** Restart receiver with message */
def restart(message: String): Unit
/** Report error */
def reportError(message: String, throwable: Throwable): Unit
/** Check if receiver is started */
def isStarted(): Boolean
/** Check if receiver is stopped */
def isStopped(): Boolean
/** Preferred location for receiver */
def preferredLocation: Option[String]
}Usage Examples:
// Custom receiver for external API
class ApiReceiver(apiUrl: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
private var client: ApiClient = _
def onStart(): Unit = {
client = new ApiClient(apiUrl)
// Start background thread to fetch data
new Thread(() => {
while (!isStopped()) {
try {
val data = client.fetchData()
store(data)
Thread.sleep(1000)
} catch {
case e: Exception => reportError("API fetch failed", e)
}
}
}).start()
}
def onStop(): Unit = {
if (client != null) client.close()
}
}
// Use custom receiver
val apiStream = ssc.receiverStream(new ApiReceiver("https://api.example.com/stream"))Base classes and implementations for different types of input streams.
/**
* Base class for all input streams
*/
abstract class InputDStream[T](ssc: StreamingContext) extends DStream[T] {
/** Duration between batches */
override def slideDuration: Duration = ssc.graph.batchDuration
/** Input streams have no dependencies */
override def dependencies: List[DStream[_]] = List()
/** Compute RDD for given time */
override def compute(validTime: Time): Option[RDD[T]]
}
/**
* Base class for receiver-based input streams
*/
abstract class ReceiverInputDStream[T](ssc: StreamingContext) extends InputDStream[T](ssc) {
/** Get the receiver for this input stream */
def getReceiver(): Receiver[T]
/** Start the receiver */
def start(): Unit
/** Stop the receiver */
def stop(): Unit
}
/**
* Input stream for file monitoring
*/
class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
ssc: StreamingContext,
directory: String,
filter: Path => Boolean,
newFilesOnly: Boolean
) extends InputDStream[(K, V)](ssc)
/**
* Input stream for socket connections
*/
class SocketInputDStream[T](
ssc: StreamingContext,
host: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc)
/**
* Input stream from RDD queue (for testing)
*/
class QueueInputDStream[T](
ssc: StreamingContext,
queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
) extends InputDStream[T](ssc)
/**
* Input stream that generates constant RDD
*/
class ConstantInputDStream[T](
ssc: StreamingContext,
rdd: RDD[T]
) extends InputDStream[T](ssc)Spark Streaming integrates with many external sources through additional libraries:
spark-streaming-kafka for Apache Kafka integrationspark-streaming-flume for Apache Flume integrationspark-streaming-kinesis-asl for Amazon Kinesisspark-streaming-twitter for Twitter APIspark-streaming-mqtt for MQTT message brokersspark-streaming-zeromq for ZeroMQ messagingEach integration provides specialized input stream methods added to StreamingContext through implicit conversions.