Apache Spark Streaming extension for scalable, fault-tolerant stream processing of live data streams
Data ingestion capabilities for reading from various external sources including files, sockets, message queues, and custom receivers.
Create DStreams from TCP socket connections for real-time text or binary data ingestion.
/**
* Create a DStream from TCP socket text stream
* @param hostname - Host to connect to
* @param port - Port number
* @param storageLevel - Storage level for received data
* @return DStream of strings from socket
*/
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[String]
/**
* Create a DStream from TCP socket with custom converter
* @param hostname - Host to connect to
* @param port - Port number
* @param converter - Function to convert bytes to objects
* @param storageLevel - Storage level for received data
*/
def socketStream[T](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): DStream[T]
/**
* Create a DStream of raw bytes from TCP socket
*/
def rawSocketStream[T](
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[T]Usage Examples:
// Basic text stream from socket
val lines = ssc.socketTextStream("localhost", 9999)
// Custom object stream with converter
val events = ssc.socketStream(
"event-server", 8080,
(inputStream: InputStream) => {
val reader = new BufferedReader(new InputStreamReader(inputStream))
Iterator.continually(reader.readLine()).takeWhile(_ != null)
.map(line => parseEvent(line))
},
StorageLevel.MEMORY_AND_DISK
)Monitor file systems and create DStreams from new files appearing in directories.
/**
* Monitor directory for new text files
* @param directory - Directory to monitor
* @return DStream of file contents as strings
*/
def textFileStream(directory: String): DStream[String]
/**
* Monitor directory for files of specific format
* @param directory - Directory to monitor
* @param filter - Optional file filter function
* @param newFilesOnly - Process only new files vs all files
*/
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String,
filter: Path => Boolean = acceptAllFiles,
newFilesOnly: Boolean = true
): DStream[(K, V)]
/**
* Read binary records from files
* @param directory - Directory to monitor
* @param recordLength - Length of each binary record
*/
def binaryRecordsStream(
directory: String,
recordLength: Int
): DStream[Array[Byte]]Usage Examples:
// Monitor directory for text files
val logs = ssc.textFileStream("hdfs://namenode/logs")
// Monitor for JSON files with custom processing
val jsonFiles = ssc.fileStream[LongWritable, Text, TextInputFormat](
"hdfs://namenode/json-data",
(path: Path) => path.getName.endsWith(".json"),
newFilesOnly = true
)
// Process binary log files
val binaryLogs = ssc.binaryRecordsStream("hdfs://namenode/binary-logs", 1024)Create DStreams from queues of RDDs, primarily used for testing and debugging streaming applications.
/**
* Create a DStream from a queue of RDDs
* @param queue - Queue containing RDDs to process
* @param oneAtATime - Process one RDD per batch vs all available
* @param defaultRDD - Default RDD when queue is empty
*/
def queueStream[T](
queue: Queue[RDD[T]],
oneAtATime: Boolean = true,
defaultRDD: RDD[T] = null
): DStream[T]Usage Examples:
import scala.collection.mutable.Queue
// Create test data queue
val rddQueue = new Queue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)
// Add test RDDs to queue
for (i <- 1 to 10) {
rddQueue += ssc.sparkContext.parallelize(Seq(i, i+1, i+2))
}
// Process the queue stream
queueStream.map(_ * 2).print()Framework for creating custom receivers to ingest data from external sources.
/**
* Create input stream using a custom receiver
* @param receiver - Custom receiver implementation
*/
def receiverStream[T](receiver: Receiver[T]): DStream[T]
/**
* Abstract base class for custom receivers
*/
abstract class Receiver[T](storageLevel: StorageLevel) extends Serializable {
/** Called when receiver is started */
def onStart(): Unit
/** Called when receiver is stopped */
def onStop(): Unit
/** Store single data item */
def store(dataItem: T): Unit
/** Store multiple data items */
def store(dataBuffer: ArrayBuffer[T]): Unit
/** Store data from iterator */
def store(dataIterator: Iterator[T]): Unit
/** Store raw bytes */
def store(bytes: ByteBuffer): Unit
/** Stop receiver with message */
def stop(message: String): Unit
/** Restart receiver with message */
def restart(message: String): Unit
/** Report error */
def reportError(message: String, throwable: Throwable): Unit
/** Check if receiver is started */
def isStarted(): Boolean
/** Check if receiver is stopped */
def isStopped(): Boolean
/** Preferred location for receiver */
def preferredLocation: Option[String]
}Usage Examples:
// Custom receiver for external API
class ApiReceiver(apiUrl: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
private var client: ApiClient = _
def onStart(): Unit = {
client = new ApiClient(apiUrl)
// Start background thread to fetch data
new Thread(() => {
while (!isStopped()) {
try {
val data = client.fetchData()
store(data)
Thread.sleep(1000)
} catch {
case e: Exception => reportError("API fetch failed", e)
}
}
}).start()
}
def onStop(): Unit = {
if (client != null) client.close()
}
}
// Use custom receiver
val apiStream = ssc.receiverStream(new ApiReceiver("https://api.example.com/stream"))Base classes and implementations for different types of input streams.
/**
* Base class for all input streams
*/
abstract class InputDStream[T](ssc: StreamingContext) extends DStream[T] {
/** Duration between batches */
override def slideDuration: Duration = ssc.graph.batchDuration
/** Input streams have no dependencies */
override def dependencies: List[DStream[_]] = List()
/** Compute RDD for given time */
override def compute(validTime: Time): Option[RDD[T]]
}
/**
* Base class for receiver-based input streams
*/
abstract class ReceiverInputDStream[T](ssc: StreamingContext) extends InputDStream[T](ssc) {
/** Get the receiver for this input stream */
def getReceiver(): Receiver[T]
/** Start the receiver */
def start(): Unit
/** Stop the receiver */
def stop(): Unit
}
/**
* Input stream for file monitoring
*/
class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
ssc: StreamingContext,
directory: String,
filter: Path => Boolean,
newFilesOnly: Boolean
) extends InputDStream[(K, V)](ssc)
/**
* Input stream for socket connections
*/
class SocketInputDStream[T](
ssc: StreamingContext,
host: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc)
/**
* Input stream from RDD queue (for testing)
*/
class QueueInputDStream[T](
ssc: StreamingContext,
queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
) extends InputDStream[T](ssc)
/**
* Input stream that generates constant RDD
*/
class ConstantInputDStream[T](
ssc: StreamingContext,
rdd: RDD[T]
) extends InputDStream[T](ssc)Spark Streaming integrates with many external sources through additional libraries:
spark-streaming-kafka for Apache Kafka integrationspark-streaming-flume for Apache Flume integrationspark-streaming-kinesis-asl for Amazon Kinesisspark-streaming-twitter for Twitter APIspark-streaming-mqtt for MQTT message brokersspark-streaming-zeromq for ZeroMQ messagingEach integration provides specialized input stream methods added to StreamingContext through implicit conversions.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-streaming-2-13