CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyspark-streaming

PySpark Streaming module that enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

output-operations.mddocs/

Output Operations

Methods for writing processed stream data to external systems, triggering computations, and performing actions on DStreams.

Basic Output Operations

Print Operations

Print first 10 elements of each RDD:

def print(): Unit

Print first num elements of each RDD:

def print(num: Int): Unit

Example print operations:

val wordCounts = ssc.socketTextStream("localhost", 9999)
  .flatMap(_.split("\\s+"))
  .map((_, 1))
  .reduceByKey(_ + _)

wordCounts.print()      // Print first 10 elements
wordCounts.print(20)    // Print first 20 elements

ForEach Operations

Apply function to each RDD:

def foreachRDD(foreachFunc: RDD[T] => Unit): Unit

Apply function to each RDD with time information:

def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit

Example forEach operations:

val lines = ssc.socketTextStream("localhost", 9999)

// Process each RDD
lines.foreachRDD { rdd =>
  if (!rdd.isEmpty()) {
    println(s"Processing ${rdd.count()} lines")
    rdd.collect().foreach(println)
  }
}

// Process with time information
lines.foreachRDD { (rdd, time) =>
  println(s"Processing batch at ${time.toString()}")
  rdd.foreach(line => processLine(line, time))
}

File Output Operations

Text File Output

Save as text files with prefix:

def saveAsTextFiles(prefix: String, suffix: String = null): Unit

Example text file output:

val processedLines = ssc.socketTextStream("localhost", 9999)
  .map(_.toUpperCase)

// Save to files like output-123456789.txt
processedLines.saveAsTextFiles("output", "txt")

// Save to files like processed-123456789
processedLines.saveAsTextFiles("processed")

Object File Output

Save as serialized object files:

def saveAsObjectFiles(prefix: String, suffix: String = null): Unit

Example object file output:

case class LogEntry(timestamp: Long, level: String, message: String)

val logEntries = ssc.socketTextStream("localhost", 9999)
  .map(parseLogEntry)

logEntries.saveAsObjectFiles("logs", "obj")

Hadoop File Output

Save using Hadoop OutputFormat:

def saveAsHadoopFiles[F <: OutputFormat[K, V]](
  prefix: String,
  suffix: String = null
)(implicit fm: ClassTag[F]): Unit  // On DStream[(K, V)]

Save using new Hadoop API:

def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
  prefix: String,
  suffix: String = null,
  conf: Configuration = ssc.sparkContext.hadoopConfiguration
)(implicit fm: ClassTag[F]): Unit  // On DStream[(K, V)]

Example Hadoop output:

import org.apache.hadoop.io.{Text, IntWritable}
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}

val keyValuePairs = ssc.socketTextStream("localhost", 9999)
  .map(line => (new Text(line), new IntWritable(1)))

// Using old Hadoop API
keyValuePairs.saveAsHadoopFiles[TextOutputFormat[Text, IntWritable]]("hadoop-output")

// Using new Hadoop API  
keyValuePairs.saveAsNewAPIHadoopFiles[NewTextOutputFormat[Text, IntWritable]]("new-hadoop-output")

Database Output Operations

JDBC Output

Example JDBC output using foreachRDD:

import java.sql.{Connection, DriverManager, PreparedStatement}

val processedData = ssc.socketTextStream("localhost", 9999)
  .map(parseData)

processedData.foreachRDD { rdd =>
  rdd.foreachPartition { partition =>
    // Create connection per partition
    val connection = DriverManager.getConnection("jdbc:mysql://localhost/db", "user", "password")
    val statement = connection.prepareStatement("INSERT INTO table (col1, col2) VALUES (?, ?)")
    
    partition.foreach { case (col1, col2) =>
      statement.setString(1, col1)
      statement.setString(2, col2)
      statement.addBatch()
    }
    
    statement.executeBatch()
    connection.close()
  }
}

NoSQL Database Output

Example MongoDB output:

import com.mongodb.spark.MongoSpark

val documents = ssc.socketTextStream("localhost", 9999)
  .map(parseToDocument)

documents.foreachRDD { rdd =>
  if (!rdd.isEmpty()) {
    MongoSpark.save(rdd)
  }
}

Message Queue Output

Kafka Output

Example Kafka producer output:

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import java.util.Properties

val messages = ssc.socketTextStream("localhost", 9999)

messages.foreachRDD { rdd =>
  rdd.foreachPartition { partition =>
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    
    val producer = new KafkaProducer[String, String](props)
    
    partition.foreach { message =>
      val record = new ProducerRecord[String, String]("output-topic", message)
      producer.send(record)
    }
    
    producer.close()
  }
}

Advanced Output Patterns

Batch-aware Output

Output with batch time information:

val timestampedData = ssc.socketTextStream("localhost", 9999)

timestampedData.foreachRDD { (rdd, time) =>
  val batchTime = time.milliseconds
  val outputPath = s"output/batch-$batchTime"
  
  if (!rdd.isEmpty()) {
    rdd.coalesce(1).saveAsTextFile(outputPath)
  }
}

Conditional Output

Output only when conditions are met:

val filteredData = ssc.socketTextStream("localhost", 9999)
  .filter(_.nonEmpty)

filteredData.foreachRDD { rdd =>
  val count = rdd.count()
  
  if (count > 100) {
    println(s"Processing large batch: $count records")
    rdd.saveAsTextFile(s"large-batches/batch-${System.currentTimeMillis()}")
  } else if (count > 0) {
    println(s"Processing small batch: $count records")
    rdd.collect().foreach(println)
  }
}

Multi-destination Output

Output to multiple destinations:

val processedData = ssc.socketTextStream("localhost", 9999)
  .map(processLine)

processedData.foreachRDD { rdd =>
  // Cache for multiple outputs
  rdd.cache()
  
  // Output to file system
  if (!rdd.isEmpty()) {
    rdd.saveAsTextFile(s"output/batch-${System.currentTimeMillis()}")
  }
  
  // Output to database
  rdd.foreachPartition { partition =>
    // Database write logic
    writeToDatabase(partition)
  }
  
  // Output metrics
  val count = rdd.count()
  println(s"Processed $count records")
  
  // Unpersist to free memory
  rdd.unpersist()
}

Output Operation Properties

Exactly-once Semantics

Ensure exactly-once output semantics:

val idempotentOutput = ssc.socketTextStream("localhost", 9999)

idempotentOutput.foreachRDD { (rdd, time) =>
  val batchId = time.milliseconds
  
  // Check if batch already processed
  if (!isProcessed(batchId)) {
    rdd.foreachPartition { partition =>
      // Atomic write operation
      writeAtomically(partition, batchId)
    }
    markProcessed(batchId)
  }
}

def isProcessed(batchId: Long): Boolean = {
  // Check external system for processing status
  checkProcessingStatus(batchId)
}

def markProcessed(batchId: Long): Unit = {
  // Mark batch as processed in external system
  updateProcessingStatus(batchId)
}

Error Handling in Output

Handle output errors gracefully:

val reliableOutput = ssc.socketTextStream("localhost", 9999)

reliableOutput.foreachRDD { rdd =>
  rdd.foreachPartition { partition =>
    var retries = 3
    var success = false
    
    while (retries > 0 && !success) {
      try {
        writeToExternalSystem(partition)
        success = true
      } catch {
        case e: Exception =>
          retries -= 1
          if (retries > 0) {
            Thread.sleep(1000) // Wait before retry
          } else {
            // Log error and potentially write to dead letter queue
            logError(e, partition)
          }
      }
    }
  }
}

Performance Optimization

Optimize output operations:

val optimizedOutput = ssc.socketTextStream("localhost", 9999)
  .filter(_.nonEmpty)

optimizedOutput.foreachRDD { rdd =>
  // Coalesce to reduce number of output files
  val coalescedRDD = if (rdd.getNumPartitions > 10) {
    rdd.coalesce(10)
  } else {
    rdd
  }
  
  // Only process non-empty RDDs
  if (!coalescedRDD.isEmpty()) {
    coalescedRDD.saveAsTextFile(s"output/${System.currentTimeMillis()}")
  }
}

Output Operation Best Practices

Connection Management

Manage external connections efficiently:

val dataStream = ssc.socketTextStream("localhost", 9999)

dataStream.foreachRDD { rdd =>
  rdd.foreachPartition { partition =>
    // Create connection per partition (not per record)
    val connection = createConnection()
    
    try {
      partition.foreach { record =>
        writeRecord(connection, record)
      }
    } finally {
      connection.close()
    }
  }
}

Batch Processing

Process records in batches for better performance:

val batchedOutput = ssc.socketTextStream("localhost", 9999)

batchedOutput.foreachRDD { rdd =>
  rdd.foreachPartition { partition =>
    val batchSize = 1000
    val buffer = scala.collection.mutable.ArrayBuffer[String]()
    
    partition.foreach { record =>
      buffer += record
      
      if (buffer.size >= batchSize) {
        writeBatch(buffer.toList)
        buffer.clear()
      }
    }
    
    // Write remaining records
    if (buffer.nonEmpty) {
      writeBatch(buffer.toList)
    }
  }
}

Install with Tessl CLI

npx tessl i tessl/pypi-pyspark-streaming@2.4.3

docs

core-operations.md

index.md

input-sources.md

java-api.md

output-operations.md

stateful-operations.md

transformations.md

tile.json