PySpark Streaming module that enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Methods for ingesting data streams from various external sources including network sockets, file systems, in-memory queues, and custom receivers.
Create DStream from socket text data:
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]Example socket text stream:
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
words.print()Create DStream with custom converter function:
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T]Example with custom converter:
import java.io.InputStream
import scala.io.Source
def jsonConverter(inputStream: InputStream): Iterator[MyJsonObject] = {
Source.fromInputStream(inputStream).getLines().map(parseJson)
}
val jsonStream = ssc.socketStream("localhost", 8080, jsonConverter, StorageLevel.MEMORY_ONLY)Create DStream for raw binary data:
def rawSocketStream[T: ClassTag](
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[T]Monitor directory for new text files:
def textFileStream(directory: String): DStream[String]Example text file monitoring:
val fileStream = ssc.textFileStream("/data/streaming-input")
val processedLines = fileStream.filter(_.nonEmpty).map(_.toUpperCase)
processedLines.print()Monitor directory with custom input format:
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String
)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): InputDStream[(K, V)]Overloaded versions:
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String,
filter: Path => Boolean,
newFilesOnly: Boolean
)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): InputDStream[(K, V)]
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String,
filter: Path => Boolean,
newFilesOnly: Boolean,
conf: Configuration
)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): InputDStream[(K, V)]Example with Hadoop input format:
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
val hadoopStream = ssc.fileStream[LongWritable, Text, TextInputFormat]("/data/input")
val textStream = hadoopStream.map(_._2.toString)Read fixed-length binary records:
def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]Example binary records:
val binaryStream = ssc.binaryRecordsStream("/data/binary", 1024)
val processedRecords = binaryStream.map { bytes =>
// Process fixed 1024-byte records
processRecord(bytes)
}Create DStream from queue of RDDs:
def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
oneAtATime: Boolean = true
): InputDStream[T]
def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
): InputDStream[T]Example queue stream:
import scala.collection.mutable.Queue
val rddQueue = new Queue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)
// Add RDDs to queue in another thread
new Thread {
override def run(): Unit = {
for (i <- 1 to 100) {
rddQueue += ssc.sparkContext.parallelize(1 to 10)
Thread.sleep(1000)
}
}
}.start()
queueStream.print()Note: Queue streams do not support checkpointing and should not be used in production for fault tolerance.
Create DStream with custom receiver:
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]Example custom receiver:
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import java.util.concurrent.Executors
class CustomReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
private val executor = Executors.newSingleThreadExecutor()
def onStart(): Unit = {
executor.execute(new Runnable {
def run(): Unit = {
receive()
}
})
}
def onStop(): Unit = {
executor.shutdown()
}
private def receive(): Unit = {
try {
while (!isStopped()) {
// Simulate receiving data
val data = generateData()
store(data)
Thread.sleep(100)
}
} catch {
case e: Exception => restart("Error receiving data", e)
}
}
private def generateData(): String = {
// Custom data generation logic
s"data-${System.currentTimeMillis()}"
}
}
val customStream = ssc.receiverStream(new CustomReceiver())
customStream.print()Alternative constructor for custom receivers:
class PluggableInputDStream[T: ClassTag](
ssc: StreamingContext,
receiver: Receiver[T]
) extends ReceiverInputDStream[T](ssc)Common storage levels for input streams:
StorageLevel.MEMORY_ONLY - Store in memory onlyStorageLevel.MEMORY_AND_DISK - Memory with disk fallbackStorageLevel.MEMORY_ONLY_SER - Memory with serializationStorageLevel.MEMORY_AND_DISK_SER - Memory and disk with serializationStorageLevel.MEMORY_AND_DISK_SER_2 - Replicated versionExample with custom storage level:
val stream = ssc.socketTextStream(
"localhost",
9999,
StorageLevel.MEMORY_ONLY_SER
)All input streams have unique identifiers:
abstract class InputDStream[T] extends DStream[T] {
val id: Int
val name: String
def start(): Unit
def stop(): Unit
}Access input stream properties:
val fileStream = ssc.textFileStream("/data/input")
println(s"Stream ID: ${fileStream.id}")
println(s"Stream name: ${fileStream.name}")File streams monitor directories with these characteristics:
Example atomic file writing pattern:
// Write to temporary file first
val tempFile = new File("/data/streaming-input/.temp-file")
writeDataToFile(tempFile, data)
// Atomically move to final location
val finalFile = new File("/data/streaming-input/data-file.txt")
tempFile.renameTo(finalFile)For fault-tolerant processing, use receivers that support write-ahead logs:
// Enable write-ahead logs in Spark configuration
val conf = new SparkConf()
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("checkpoint-directory")Socket streams and some custom receivers do not support write-ahead logs and may lose data on failure. For production use cases requiring fault tolerance, prefer:
Install with Tessl CLI
npx tessl i tessl/pypi-pyspark-streaming