Spark Streaming - A scalable fault-tolerant streaming processing system that extends Apache Spark
Comprehensive data ingestion capabilities supporting various input sources including files, sockets, queues, and custom receivers. Provides both receiver-based and direct stream approaches for fault-tolerant data consumption.
Monitor directories for new files and process them as streaming data.
class StreamingContext {
/** Monitor directory for new text files */
def textFileStream(directory: String): DStream[String]
/** Monitor directory for files using Hadoop InputFormat */
def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): InputDStream[(K, V)]
/** Monitor directory with custom key/value/format and filtering function */
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String,
filter: Path => Boolean,
newFilesOnly: Boolean
): InputDStream[(K, V)]
/** Monitor directory with additional configuration */
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String,
filter: Path => Boolean,
newFilesOnly: Boolean,
conf: Configuration
): InputDStream[(K, V)]
/** Monitor directory for binary records with fixed length */
def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]
}Usage Examples:
import org.apache.spark.streaming._
// Monitor directory for text files
val lines = ssc.textFileStream("/path/to/text/files")
// Monitor for JSON files using Hadoop TextInputFormat
val jsonFiles = ssc.fileStream[LongWritable, Text, TextInputFormat]("/path/to/json")
.map(_._2.toString)
// Monitor with custom filter for CSV files only
val csvFiles = ssc.fileStream[LongWritable, Text, TextInputFormat](
"/path/to/files",
(path: Path) => path.getName.endsWith(".csv"),
newFilesOnly = true
)
// Binary data with fixed record length
val binaryData = ssc.binaryRecordsStream("/path/to/binary", recordLength = 1024)Receive streaming data from TCP socket connections.
class StreamingContext {
/** Receive text data from TCP socket */
def socketTextStream(hostname: String, port: Int): ReceiverInputDStream[String]
/** Receive text with custom storage level */
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel
): ReceiverInputDStream[String]
/** Receive data with custom converter function */
def socketStream[T](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T]
/** Receive raw bytes from socket */
def rawSocketStream[T](
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[T]
}Usage Examples:
// Basic text socket stream
val socketLines = ssc.socketTextStream("localhost", 9999)
// Socket stream with persistence configuration
val persistentSocket = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_ONLY)
// Custom data format from socket
val customSocket = ssc.socketStream[MyData](
"localhost",
8888,
inputStream => {
// Custom deserialization logic
val buffer = new Array[Byte](1024)
Iterator.continually {
inputStream.read(buffer)
MyData.deserialize(buffer)
}.takeWhile(_ != null)
},
StorageLevel.MEMORY_ONLY
)Create DStreams from a queue of RDDs, useful for testing and controlled data injection.
class StreamingContext {
/** Create DStream from queue of RDDs */
def queueStream[T](queue: Queue[RDD[T]]): InputDStream[T]
/** Create with oneAtATime processing flag */
def queueStream[T](queue: Queue[RDD[T]], oneAtATime: Boolean): InputDStream[T]
/** Create with default RDD for empty batches */
def queueStream[T](
queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
): InputDStream[T]
}Usage Examples:
import scala.collection.mutable.Queue
// Create queue and RDDs
val rddQueue = new Queue[RDD[Int]]()
val rdd1 = ssc.sparkContext.parallelize(1 to 100)
val rdd2 = ssc.sparkContext.parallelize(101 to 200)
rddQueue.enqueue(rdd1, rdd2)
// Create queue stream
val queueStream = ssc.queueStream(rddQueue)
// Process one RDD at a time
val orderedStream = ssc.queueStream(rddQueue, oneAtATime = true)
// With default RDD for empty periods
val defaultRDD = ssc.sparkContext.parallelize(Seq(0))
val streamWithDefault = ssc.queueStream(rddQueue, oneAtATime = false, defaultRDD)Create DStreams using custom Receiver implementations for specialized data sources.
class StreamingContext {
/** Create DStream from custom Receiver */
def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T]
}
abstract class Receiver[T](storageLevel: StorageLevel) extends Serializable {
/** Called when receiver is started */
def onStart(): Unit
/** Called when receiver is stopped */
def onStop(): Unit
/** Store single data item */
def store(dataItem: T): Unit
/** Store collection of data items */
def store(dataBuffer: ArrayBuffer[T]): Unit
/** Store iterator of data items */
def store(dataIterator: Iterator[T]): Unit
/** Store raw bytes */
def store(bytes: ByteBuffer): Unit
/** Store with metadata */
def store(bytes: ByteBuffer, metadata: Any): Unit
/** Restart receiver with message */
def restart(message: String): Unit
/** Restart with message and error */
def restart(message: String, error: Throwable): Unit
/** Stop receiver with message */
def stop(message: String): Unit
/** Stop with message and error */
def stop(message: String, error: Throwable): Unit
/** Report error */
def reportError(message: String, throwable: Throwable): Unit
/** Check if receiver is started */
def isStarted(): Boolean
/** Check if receiver is stopped */
def isStopped(): Boolean
/** Preferred execution location */
def preferredLocation: Option[String]
/** Associated stream ID */
def streamId: Int
}Usage Examples:
import java.net._
import java.io._
// Custom receiver for UDP data
class UDPReceiver(port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
private var socket: DatagramSocket = _
def onStart(): Unit = {
new Thread("UDP Receiver") {
override def run(): Unit = receive()
}.start()
}
def onStop(): Unit = {
if (socket != null) {
socket.close()
socket = null
}
}
private def receive(): Unit = {
try {
socket = new DatagramSocket(port)
val buffer = new Array[Byte](1024)
while (!isStopped()) {
val packet = new DatagramPacket(buffer, buffer.length)
socket.receive(packet)
val data = new String(packet.getData, 0, packet.getLength)
store(data)
}
} catch {
case e: Exception if !isStopped() =>
restart("Error receiving data", e)
}
}
}
// Use custom receiver
val udpStream = ssc.receiverStream(new UDPReceiver(9999))
// Custom receiver with batching
class BatchedReceiver[T](batchSize: Int) extends Receiver[T](StorageLevel.MEMORY_ONLY) {
private val buffer = new ArrayBuffer[T]()
def onStart(): Unit = {
// Implementation
}
def onStop(): Unit = {
// Flush remaining data
if (buffer.nonEmpty) {
store(buffer)
buffer.clear()
}
}
private def addData(item: T): Unit = {
buffer += item
if (buffer.size >= batchSize) {
store(buffer.toArray)
buffer.clear()
}
}
}Combine multiple input streams into a single DStream.
class StreamingContext {
/** Union multiple DStreams */
def union[T](streams: Seq[DStream[T]]): DStream[T]
}Usage Examples:
// Multiple input sources
val fileStream = ssc.textFileStream("/path/to/files")
val socketStream = ssc.socketTextStream("localhost", 9999)
val queueStream = ssc.queueStream(rddQueue)
// Union all streams
val combinedStream = ssc.union(Seq(fileStream, socketStream, queueStream))
// Process combined data
combinedStream.foreachRDD { rdd =>
println(s"Combined batch size: ${rdd.count()}")
}Base abstractions for implementing custom input streams.
/**
* Base class for input streams that receive data into Spark Streaming
*/
abstract class InputDStream[T](ssc: StreamingContext) extends DStream[T](ssc) {
/** Start the stream */
def start(): Unit
/** Stop the stream */
def stop(): Unit
}
/**
* Input streams that use receivers to receive data
*/
abstract class ReceiverInputDStream[T](ssc: StreamingContext) extends InputDStream[T](ssc) {
/** Get receiver for this input stream */
def getReceiver(): Receiver[T]
}
/**
* File-based input stream implementation
*/
class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
ssc: StreamingContext,
directory: String,
filter: Path => Boolean,
newFilesOnly: Boolean,
conf: Option[Configuration]
) extends InputDStream[(K, V)](ssc)
/**
* Socket-based input stream implementation
*/
class SocketInputDStream[T](
ssc: StreamingContext,
host: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc)
/**
* Queue-based input stream implementation
*/
class QueueInputDStream[T](
ssc: StreamingContext,
queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: Option[RDD[T]]
) extends InputDStream[T](ssc)Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-2-12