PySpark Streaming module that enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Methods for writing processed stream data to external systems, triggering computations, and performing actions on DStreams.
Print first 10 elements of each RDD:
def print(): UnitPrint first num elements of each RDD:
def print(num: Int): UnitExample print operations:
val wordCounts = ssc.socketTextStream("localhost", 9999)
.flatMap(_.split("\\s+"))
.map((_, 1))
.reduceByKey(_ + _)
wordCounts.print() // Print first 10 elements
wordCounts.print(20) // Print first 20 elementsApply function to each RDD:
def foreachRDD(foreachFunc: RDD[T] => Unit): UnitApply function to each RDD with time information:
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): UnitExample forEach operations:
val lines = ssc.socketTextStream("localhost", 9999)
// Process each RDD
lines.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
println(s"Processing ${rdd.count()} lines")
rdd.collect().foreach(println)
}
}
// Process with time information
lines.foreachRDD { (rdd, time) =>
println(s"Processing batch at ${time.toString()}")
rdd.foreach(line => processLine(line, time))
}Save as text files with prefix:
def saveAsTextFiles(prefix: String, suffix: String = null): UnitExample text file output:
val processedLines = ssc.socketTextStream("localhost", 9999)
.map(_.toUpperCase)
// Save to files like output-123456789.txt
processedLines.saveAsTextFiles("output", "txt")
// Save to files like processed-123456789
processedLines.saveAsTextFiles("processed")Save as serialized object files:
def saveAsObjectFiles(prefix: String, suffix: String = null): UnitExample object file output:
case class LogEntry(timestamp: Long, level: String, message: String)
val logEntries = ssc.socketTextStream("localhost", 9999)
.map(parseLogEntry)
logEntries.saveAsObjectFiles("logs", "obj")Save using Hadoop OutputFormat:
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
prefix: String,
suffix: String = null
)(implicit fm: ClassTag[F]): Unit // On DStream[(K, V)]Save using new Hadoop API:
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
prefix: String,
suffix: String = null,
conf: Configuration = ssc.sparkContext.hadoopConfiguration
)(implicit fm: ClassTag[F]): Unit // On DStream[(K, V)]Example Hadoop output:
import org.apache.hadoop.io.{Text, IntWritable}
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
val keyValuePairs = ssc.socketTextStream("localhost", 9999)
.map(line => (new Text(line), new IntWritable(1)))
// Using old Hadoop API
keyValuePairs.saveAsHadoopFiles[TextOutputFormat[Text, IntWritable]]("hadoop-output")
// Using new Hadoop API
keyValuePairs.saveAsNewAPIHadoopFiles[NewTextOutputFormat[Text, IntWritable]]("new-hadoop-output")Example JDBC output using foreachRDD:
import java.sql.{Connection, DriverManager, PreparedStatement}
val processedData = ssc.socketTextStream("localhost", 9999)
.map(parseData)
processedData.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
// Create connection per partition
val connection = DriverManager.getConnection("jdbc:mysql://localhost/db", "user", "password")
val statement = connection.prepareStatement("INSERT INTO table (col1, col2) VALUES (?, ?)")
partition.foreach { case (col1, col2) =>
statement.setString(1, col1)
statement.setString(2, col2)
statement.addBatch()
}
statement.executeBatch()
connection.close()
}
}Example MongoDB output:
import com.mongodb.spark.MongoSpark
val documents = ssc.socketTextStream("localhost", 9999)
.map(parseToDocument)
documents.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
MongoSpark.save(rdd)
}
}Example Kafka producer output:
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import java.util.Properties
val messages = ssc.socketTextStream("localhost", 9999)
messages.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
partition.foreach { message =>
val record = new ProducerRecord[String, String]("output-topic", message)
producer.send(record)
}
producer.close()
}
}Output with batch time information:
val timestampedData = ssc.socketTextStream("localhost", 9999)
timestampedData.foreachRDD { (rdd, time) =>
val batchTime = time.milliseconds
val outputPath = s"output/batch-$batchTime"
if (!rdd.isEmpty()) {
rdd.coalesce(1).saveAsTextFile(outputPath)
}
}Output only when conditions are met:
val filteredData = ssc.socketTextStream("localhost", 9999)
.filter(_.nonEmpty)
filteredData.foreachRDD { rdd =>
val count = rdd.count()
if (count > 100) {
println(s"Processing large batch: $count records")
rdd.saveAsTextFile(s"large-batches/batch-${System.currentTimeMillis()}")
} else if (count > 0) {
println(s"Processing small batch: $count records")
rdd.collect().foreach(println)
}
}Output to multiple destinations:
val processedData = ssc.socketTextStream("localhost", 9999)
.map(processLine)
processedData.foreachRDD { rdd =>
// Cache for multiple outputs
rdd.cache()
// Output to file system
if (!rdd.isEmpty()) {
rdd.saveAsTextFile(s"output/batch-${System.currentTimeMillis()}")
}
// Output to database
rdd.foreachPartition { partition =>
// Database write logic
writeToDatabase(partition)
}
// Output metrics
val count = rdd.count()
println(s"Processed $count records")
// Unpersist to free memory
rdd.unpersist()
}Ensure exactly-once output semantics:
val idempotentOutput = ssc.socketTextStream("localhost", 9999)
idempotentOutput.foreachRDD { (rdd, time) =>
val batchId = time.milliseconds
// Check if batch already processed
if (!isProcessed(batchId)) {
rdd.foreachPartition { partition =>
// Atomic write operation
writeAtomically(partition, batchId)
}
markProcessed(batchId)
}
}
def isProcessed(batchId: Long): Boolean = {
// Check external system for processing status
checkProcessingStatus(batchId)
}
def markProcessed(batchId: Long): Unit = {
// Mark batch as processed in external system
updateProcessingStatus(batchId)
}Handle output errors gracefully:
val reliableOutput = ssc.socketTextStream("localhost", 9999)
reliableOutput.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
var retries = 3
var success = false
while (retries > 0 && !success) {
try {
writeToExternalSystem(partition)
success = true
} catch {
case e: Exception =>
retries -= 1
if (retries > 0) {
Thread.sleep(1000) // Wait before retry
} else {
// Log error and potentially write to dead letter queue
logError(e, partition)
}
}
}
}
}Optimize output operations:
val optimizedOutput = ssc.socketTextStream("localhost", 9999)
.filter(_.nonEmpty)
optimizedOutput.foreachRDD { rdd =>
// Coalesce to reduce number of output files
val coalescedRDD = if (rdd.getNumPartitions > 10) {
rdd.coalesce(10)
} else {
rdd
}
// Only process non-empty RDDs
if (!coalescedRDD.isEmpty()) {
coalescedRDD.saveAsTextFile(s"output/${System.currentTimeMillis()}")
}
}Manage external connections efficiently:
val dataStream = ssc.socketTextStream("localhost", 9999)
dataStream.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
// Create connection per partition (not per record)
val connection = createConnection()
try {
partition.foreach { record =>
writeRecord(connection, record)
}
} finally {
connection.close()
}
}
}Process records in batches for better performance:
val batchedOutput = ssc.socketTextStream("localhost", 9999)
batchedOutput.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
val batchSize = 1000
val buffer = scala.collection.mutable.ArrayBuffer[String]()
partition.foreach { record =>
buffer += record
if (buffer.size >= batchSize) {
writeBatch(buffer.toList)
buffer.clear()
}
}
// Write remaining records
if (buffer.nonEmpty) {
writeBatch(buffer.toList)
}
}
}Install with Tessl CLI
npx tessl i tessl/pypi-pyspark-streaming