CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-2-12

Spark Streaming - A scalable fault-tolerant streaming processing system that extends Apache Spark

Overview
Eval results
Files

input-sources.mddocs/

Input Sources and Data Ingestion

Comprehensive data ingestion capabilities supporting various input sources including files, sockets, queues, and custom receivers. Provides both receiver-based and direct stream approaches for fault-tolerant data consumption.

Capabilities

File-based Input Streams

Monitor directories for new files and process them as streaming data.

class StreamingContext {
  /** Monitor directory for new text files */
  def textFileStream(directory: String): DStream[String]
  
  /** Monitor directory for files using Hadoop InputFormat */
  def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): InputDStream[(K, V)]
  
  /** Monitor directory with custom key/value/format and filtering function */
  def fileStream[K, V, F <: NewInputFormat[K, V]](
    directory: String,
    filter: Path => Boolean,
    newFilesOnly: Boolean
  ): InputDStream[(K, V)]
  
  /** Monitor directory with additional configuration */
  def fileStream[K, V, F <: NewInputFormat[K, V]](
    directory: String,
    filter: Path => Boolean, 
    newFilesOnly: Boolean,
    conf: Configuration
  ): InputDStream[(K, V)]
  
  /** Monitor directory for binary records with fixed length */
  def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]
}

Usage Examples:

import org.apache.spark.streaming._

// Monitor directory for text files
val lines = ssc.textFileStream("/path/to/text/files")

// Monitor for JSON files using Hadoop TextInputFormat
val jsonFiles = ssc.fileStream[LongWritable, Text, TextInputFormat]("/path/to/json")
  .map(_._2.toString)

// Monitor with custom filter for CSV files only
val csvFiles = ssc.fileStream[LongWritable, Text, TextInputFormat](
  "/path/to/files",
  (path: Path) => path.getName.endsWith(".csv"),
  newFilesOnly = true
)

// Binary data with fixed record length
val binaryData = ssc.binaryRecordsStream("/path/to/binary", recordLength = 1024)

Socket-based Input Streams

Receive streaming data from TCP socket connections.

class StreamingContext {
  /** Receive text data from TCP socket */
  def socketTextStream(hostname: String, port: Int): ReceiverInputDStream[String]
  
  /** Receive text with custom storage level */
  def socketTextStream(
    hostname: String, 
    port: Int, 
    storageLevel: StorageLevel
  ): ReceiverInputDStream[String]
  
  /** Receive data with custom converter function */
  def socketStream[T](
    hostname: String,
    port: Int, 
    converter: (InputStream) => Iterator[T],
    storageLevel: StorageLevel
  ): ReceiverInputDStream[T]
  
  /** Receive raw bytes from socket */
  def rawSocketStream[T](
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[T]
}

Usage Examples:

// Basic text socket stream
val socketLines = ssc.socketTextStream("localhost", 9999)

// Socket stream with persistence configuration
val persistentSocket = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_ONLY)

// Custom data format from socket
val customSocket = ssc.socketStream[MyData](
  "localhost", 
  8888,
  inputStream => {
    // Custom deserialization logic
    val buffer = new Array[Byte](1024)
    Iterator.continually {
      inputStream.read(buffer)
      MyData.deserialize(buffer)
    }.takeWhile(_ != null)
  },
  StorageLevel.MEMORY_ONLY
)

Queue-based Input Streams

Create DStreams from a queue of RDDs, useful for testing and controlled data injection.

class StreamingContext {
  /** Create DStream from queue of RDDs */
  def queueStream[T](queue: Queue[RDD[T]]): InputDStream[T]
  
  /** Create with oneAtATime processing flag */
  def queueStream[T](queue: Queue[RDD[T]], oneAtATime: Boolean): InputDStream[T]
  
  /** Create with default RDD for empty batches */
  def queueStream[T](
    queue: Queue[RDD[T]], 
    oneAtATime: Boolean, 
    defaultRDD: RDD[T]
  ): InputDStream[T]
}

Usage Examples:

import scala.collection.mutable.Queue

// Create queue and RDDs
val rddQueue = new Queue[RDD[Int]]()
val rdd1 = ssc.sparkContext.parallelize(1 to 100)
val rdd2 = ssc.sparkContext.parallelize(101 to 200)

rddQueue.enqueue(rdd1, rdd2)

// Create queue stream
val queueStream = ssc.queueStream(rddQueue)

// Process one RDD at a time
val orderedStream = ssc.queueStream(rddQueue, oneAtATime = true)

// With default RDD for empty periods
val defaultRDD = ssc.sparkContext.parallelize(Seq(0))
val streamWithDefault = ssc.queueStream(rddQueue, oneAtATime = false, defaultRDD)

Custom Receiver Input Streams

Create DStreams using custom Receiver implementations for specialized data sources.

class StreamingContext {
  /** Create DStream from custom Receiver */
  def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T]
}

abstract class Receiver[T](storageLevel: StorageLevel) extends Serializable {
  /** Called when receiver is started */
  def onStart(): Unit
  
  /** Called when receiver is stopped */  
  def onStop(): Unit
  
  /** Store single data item */
  def store(dataItem: T): Unit
  
  /** Store collection of data items */
  def store(dataBuffer: ArrayBuffer[T]): Unit
  
  /** Store iterator of data items */
  def store(dataIterator: Iterator[T]): Unit
  
  /** Store raw bytes */
  def store(bytes: ByteBuffer): Unit
  
  /** Store with metadata */
  def store(bytes: ByteBuffer, metadata: Any): Unit
  
  /** Restart receiver with message */
  def restart(message: String): Unit
  
  /** Restart with message and error */
  def restart(message: String, error: Throwable): Unit
  
  /** Stop receiver with message */
  def stop(message: String): Unit
  
  /** Stop with message and error */
  def stop(message: String, error: Throwable): Unit
  
  /** Report error */
  def reportError(message: String, throwable: Throwable): Unit
  
  /** Check if receiver is started */
  def isStarted(): Boolean
  
  /** Check if receiver is stopped */
  def isStopped(): Boolean
  
  /** Preferred execution location */
  def preferredLocation: Option[String]
  
  /** Associated stream ID */
  def streamId: Int
}

Usage Examples:

import java.net._
import java.io._

// Custom receiver for UDP data
class UDPReceiver(port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
  private var socket: DatagramSocket = _
  
  def onStart(): Unit = {
    new Thread("UDP Receiver") {
      override def run(): Unit = receive()
    }.start()
  }
  
  def onStop(): Unit = {
    if (socket != null) {
      socket.close()
      socket = null
    }
  }
  
  private def receive(): Unit = {
    try {
      socket = new DatagramSocket(port)
      val buffer = new Array[Byte](1024)
      
      while (!isStopped()) {
        val packet = new DatagramPacket(buffer, buffer.length)
        socket.receive(packet)
        val data = new String(packet.getData, 0, packet.getLength)
        store(data)
      }
    } catch {
      case e: Exception if !isStopped() =>
        restart("Error receiving data", e)
    }
  }
}

// Use custom receiver
val udpStream = ssc.receiverStream(new UDPReceiver(9999))

// Custom receiver with batching
class BatchedReceiver[T](batchSize: Int) extends Receiver[T](StorageLevel.MEMORY_ONLY) {
  private val buffer = new ArrayBuffer[T]()
  
  def onStart(): Unit = {
    // Implementation
  }
  
  def onStop(): Unit = {
    // Flush remaining data
    if (buffer.nonEmpty) {
      store(buffer)
      buffer.clear()
    }
  }
  
  private def addData(item: T): Unit = {
    buffer += item
    if (buffer.size >= batchSize) {
      store(buffer.toArray)
      buffer.clear()
    }
  }
}

Union Operations

Combine multiple input streams into a single DStream.

class StreamingContext {
  /** Union multiple DStreams */
  def union[T](streams: Seq[DStream[T]]): DStream[T]
}

Usage Examples:

// Multiple input sources
val fileStream = ssc.textFileStream("/path/to/files")  
val socketStream = ssc.socketTextStream("localhost", 9999)
val queueStream = ssc.queueStream(rddQueue)

// Union all streams
val combinedStream = ssc.union(Seq(fileStream, socketStream, queueStream))

// Process combined data
combinedStream.foreachRDD { rdd =>
  println(s"Combined batch size: ${rdd.count()}")
}

Input Stream Base Classes

Base abstractions for implementing custom input streams.

/**
 * Base class for input streams that receive data into Spark Streaming
 */
abstract class InputDStream[T](ssc: StreamingContext) extends DStream[T](ssc) {
  /** Start the stream */
  def start(): Unit
  
  /** Stop the stream */
  def stop(): Unit
}

/**
 * Input streams that use receivers to receive data
 */
abstract class ReceiverInputDStream[T](ssc: StreamingContext) extends InputDStream[T](ssc) {
  /** Get receiver for this input stream */
  def getReceiver(): Receiver[T]
}

/**
 * File-based input stream implementation
 */
class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
  ssc: StreamingContext,
  directory: String,
  filter: Path => Boolean,
  newFilesOnly: Boolean,
  conf: Option[Configuration]
) extends InputDStream[(K, V)](ssc)

/**
 * Socket-based input stream implementation  
 */
class SocketInputDStream[T](
  ssc: StreamingContext,
  host: String,
  port: Int,
  converter: (InputStream) => Iterator[T],
  storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc)

/**
 * Queue-based input stream implementation
 */
class QueueInputDStream[T](
  ssc: StreamingContext,
  queue: Queue[RDD[T]],
  oneAtATime: Boolean,
  defaultRDD: Option[RDD[T]]
) extends InputDStream[T](ssc)

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-2-12

docs

advanced-transformations.md

core-streaming.md

index.md

input-sources.md

java-api.md

tile.json