CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyspark-streaming

PySpark Streaming module that enables scalable, high-throughput, fault-tolerant stream processing of live data streams in Python

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

input-sources.mddocs/

Input Sources

Methods for ingesting data streams from various external sources including network sockets, file systems, in-memory queues, and custom receivers.

Socket-based Input Streams

Text Socket Streams

Create DStream from socket text data:

def socketTextStream(
  hostname: String,
  port: Int,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]

Example socket text stream:

val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
words.print()

Custom Socket Streams

Create DStream with custom converter function:

def socketStream[T: ClassTag](
  hostname: String,
  port: Int,
  converter: (InputStream) => Iterator[T],
  storageLevel: StorageLevel
): ReceiverInputDStream[T]

Example with custom converter:

import java.io.InputStream
import scala.io.Source

def jsonConverter(inputStream: InputStream): Iterator[MyJsonObject] = {
  Source.fromInputStream(inputStream).getLines().map(parseJson)
}

val jsonStream = ssc.socketStream("localhost", 8080, jsonConverter, StorageLevel.MEMORY_ONLY)

Raw Socket Streams

Create DStream for raw binary data:

def rawSocketStream[T: ClassTag](
  hostname: String,
  port: Int,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[T]

File-based Input Streams

Text File Streams

Monitor directory for new text files:

def textFileStream(directory: String): DStream[String]

Example text file monitoring:

val fileStream = ssc.textFileStream("/data/streaming-input")
val processedLines = fileStream.filter(_.nonEmpty).map(_.toUpperCase)
processedLines.print()

Generic File Streams

Monitor directory with custom input format:

def fileStream[K, V, F <: NewInputFormat[K, V]](
  directory: String
)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): InputDStream[(K, V)]

Overloaded versions:

def fileStream[K, V, F <: NewInputFormat[K, V]](
  directory: String,
  filter: Path => Boolean,
  newFilesOnly: Boolean
)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): InputDStream[(K, V)]

def fileStream[K, V, F <: NewInputFormat[K, V]](
  directory: String,
  filter: Path => Boolean,
  newFilesOnly: Boolean,
  conf: Configuration
)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): InputDStream[(K, V)]

Example with Hadoop input format:

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

val hadoopStream = ssc.fileStream[LongWritable, Text, TextInputFormat]("/data/input")
val textStream = hadoopStream.map(_._2.toString)

Binary Records Streams

Read fixed-length binary records:

def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]

Example binary records:

val binaryStream = ssc.binaryRecordsStream("/data/binary", 1024)
val processedRecords = binaryStream.map { bytes =>
  // Process fixed 1024-byte records
  processRecord(bytes)
}

Queue-based Input Streams

RDD Queue Streams

Create DStream from queue of RDDs:

def queueStream[T: ClassTag](
  queue: Queue[RDD[T]],
  oneAtATime: Boolean = true
): InputDStream[T]

def queueStream[T: ClassTag](
  queue: Queue[RDD[T]],
  oneAtATime: Boolean,
  defaultRDD: RDD[T]
): InputDStream[T]

Example queue stream:

import scala.collection.mutable.Queue

val rddQueue = new Queue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)

// Add RDDs to queue in another thread
new Thread {
  override def run(): Unit = {
    for (i <- 1 to 100) {
      rddQueue += ssc.sparkContext.parallelize(1 to 10)
      Thread.sleep(1000)
    }
  }
}.start()

queueStream.print()

Note: Queue streams do not support checkpointing and should not be used in production for fault tolerance.

Custom Receiver Streams

Receiver-based Streams

Create DStream with custom receiver:

def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]

Example custom receiver:

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import java.util.concurrent.Executors

class CustomReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
  private val executor = Executors.newSingleThreadExecutor()

  def onStart(): Unit = {
    executor.execute(new Runnable {
      def run(): Unit = {
        receive()
      }
    })
  }

  def onStop(): Unit = {
    executor.shutdown()
  }

  private def receive(): Unit = {
    try {
      while (!isStopped()) {
        // Simulate receiving data
        val data = generateData()
        store(data)
        Thread.sleep(100)
      }
    } catch {
      case e: Exception => restart("Error receiving data", e)
    }
  }

  private def generateData(): String = {
    // Custom data generation logic
    s"data-${System.currentTimeMillis()}"
  }
}

val customStream = ssc.receiverStream(new CustomReceiver())
customStream.print()

Pluggable Input Streams

Alternative constructor for custom receivers:

class PluggableInputDStream[T: ClassTag](
  ssc: StreamingContext,
  receiver: Receiver[T]
) extends ReceiverInputDStream[T](ssc)

Input Stream Properties

Storage Levels

Common storage levels for input streams:

  • StorageLevel.MEMORY_ONLY - Store in memory only
  • StorageLevel.MEMORY_AND_DISK - Memory with disk fallback
  • StorageLevel.MEMORY_ONLY_SER - Memory with serialization
  • StorageLevel.MEMORY_AND_DISK_SER - Memory and disk with serialization
  • StorageLevel.MEMORY_AND_DISK_SER_2 - Replicated version

Example with custom storage level:

val stream = ssc.socketTextStream(
  "localhost", 
  9999, 
  StorageLevel.MEMORY_ONLY_SER
)

Input Stream Identification

All input streams have unique identifiers:

abstract class InputDStream[T] extends DStream[T] {
  val id: Int
  val name: String
  def start(): Unit
  def stop(): Unit
}

Access input stream properties:

val fileStream = ssc.textFileStream("/data/input")
println(s"Stream ID: ${fileStream.id}")
println(s"Stream name: ${fileStream.name}")

File Stream Configuration

File Monitoring Behavior

File streams monitor directories with these characteristics:

  • Only files in the monitored directory (not subdirectories) are processed
  • Files are processed based on modification time, not creation time
  • Files must be written atomically (e.g., move operation) to be processed correctly
  • File names should be consistent (lexicographically increasing) for best results

File Processing Guarantees

  • Each file is processed exactly once (assuming no failures)
  • Files are processed in order of modification time
  • Processing latency depends on batch interval and file discovery mechanism

Example atomic file writing pattern:

// Write to temporary file first
val tempFile = new File("/data/streaming-input/.temp-file")
writeDataToFile(tempFile, data)

// Atomically move to final location
val finalFile = new File("/data/streaming-input/data-file.txt")
tempFile.renameTo(finalFile)

Input Stream Reliability

Reliable Receivers

For fault-tolerant processing, use receivers that support write-ahead logs:

// Enable write-ahead logs in Spark configuration
val conf = new SparkConf()
  .set("spark.streaming.receiver.writeAheadLog.enable", "true")
  .set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")

val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("checkpoint-directory")

Unreliable Receivers

Socket streams and some custom receivers do not support write-ahead logs and may lose data on failure. For production use cases requiring fault tolerance, prefer:

  • Kafka integration (external library)
  • File-based inputs with HDFS
  • Custom receivers with reliable storage
  • Message queue systems with acknowledgment

Install with Tessl CLI

npx tessl i tessl/pypi-pyspark-streaming

docs

core-operations.md

index.md

input-sources.md

java-api.md

output-operations.md

stateful-operations.md

transformations.md

tile.json