or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-streams.mdevent-monitoring.mdindex.mdinput-sources.mdjava-api.mdkey-value-ops.mdstate-management.mdstreaming-context.mdwindow-ops.md
tile.json

input-sources.mddocs/

Input Sources

Input sources in Spark Streaming provide mechanisms for ingesting data from external systems. These sources create InputDStreams that continuously receive data and convert it into a stream of RDDs for processing.

Capabilities

Socket-Based Input Streams

Input streams that connect to TCP sockets to receive data.

/**
 * Create text input stream from TCP socket
 * @param hostname - Hostname to connect to
 * @param port - Port number to connect to  
 * @param storageLevel - Storage level for received data (default: MEMORY_AND_DISK_SER_2)
 * @returns ReceiverInputDStream of strings
 */
def socketTextStream(
  hostname: String, 
  port: Int, 
  storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]

/**
 * Create binary input stream from TCP socket with custom converter
 * @param hostname - Hostname to connect to
 * @param port - Port number to connect to
 * @param converter - Function to convert InputStream to Iterator[T]
 * @param storageLevel - Storage level for received data
 * @returns ReceiverInputDStream of converted type T
 */
def socketStream[T: ClassTag](
  hostname: String,
  port: Int, 
  converter: (InputStream) => Iterator[T],
  storageLevel: StorageLevel
): ReceiverInputDStream[T]

/**
 * Create raw TCP socket stream for binary data
 * @param hostname - Hostname to connect to
 * @param port - Port number to connect to
 * @param storageLevel - Storage level for received data (default: MEMORY_AND_DISK_SER_2)
 * @returns ReceiverInputDStream of byte arrays  
 */
def rawSocketStream[T: ClassTag](
  hostname: String,
  port: Int,
  storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2  
): ReceiverInputDStream[T]

Usage Examples:

// Basic text socket stream
val lines = ssc.socketTextStream("localhost", 9999)

// Custom converter for JSON parsing
val jsonStream = ssc.socketStream(
  "data-server", 8080,
  (inputStream: InputStream) => {
    scala.io.Source.fromInputStream(inputStream)
      .getLines()
      .map(parseJson)
  },
  StorageLevel.MEMORY_ONLY
)

// Raw socket for binary protocols
val binaryStream = ssc.rawSocketStream[Array[Byte]]("binary-server", 7777)

File System Input Streams

Input streams that monitor file systems for new files and process them as they arrive.

/**
 * Create input stream from text files in a directory
 * @param directory - Directory path to monitor for new files
 * @returns DStream of strings (file contents line by line)
 */
def textFileStream(directory: String): DStream[String]

/**
 * Create input stream from binary files with fixed record length
 * @param directory - Directory path to monitor for new files
 * @param recordLength - Length of each record in bytes
 * @returns DStream of byte arrays
 */
def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]

/**
 * Create generic file input stream using Hadoop InputFormat
 * @param directory - Directory path to monitor for new files
 * @returns InputDStream of key-value pairs based on input format F
 */
def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](
  directory: String
): InputDStream[(K, V)]

/**
 * Create file input stream with custom key, value, and input format classes
 * @param directory - Directory path to monitor
 * @param keyClass - Class of keys
 * @param valueClass - Class of values  
 * @param inputFormatClass - Hadoop InputFormat class
 * @param filter - Function to filter files (optional)
 * @param newFilesOnly - Whether to process only new files (default: true)
 * @returns InputDStream of key-value pairs
 */
def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](
  directory: String,
  keyClass: Class[K],
  valueClass: Class[V],
  inputFormatClass: Class[F],
  filter: Path => Boolean = null,
  newFilesOnly: Boolean = true
): InputDStream[(K, V)]

Usage Examples:

// Monitor directory for text files
val logFiles = ssc.textFileStream("/var/log/app")

// Process binary data files
val binaryData = ssc.binaryRecordsStream("/data/binary", 1024)

// Process Hadoop sequence files
val sequenceFiles = ssc.fileStream[Text, Text, SequenceFileInputFormat[Text, Text]]("/data/sequence")

// Custom file processing with filter
val csvFiles = ssc.fileStream[LongWritable, Text, TextInputFormat](
  "/data/csv",
  classOf[LongWritable],
  classOf[Text], 
  classOf[TextInputFormat],
  (path: Path) => path.getName.endsWith(".csv")
)

Queue-Based Input Streams

Input streams created from queues of RDDs, useful for testing and programmatic data injection.

/**
 * Create input stream from queue of RDDs
 * @param queue - Queue containing RDDs to process
 * @param oneAtATime - Whether to process one RDD per batch (default: true)
 * @param defaultRDD - Default RDD when queue is empty (optional)
 * @returns InputDStream of queue elements
 */
def queueStream[T: ClassTag](
  queue: Queue[RDD[T]],
  oneAtATime: Boolean = true,
  defaultRDD: RDD[T] = null
): InputDStream[T]

Usage Examples:

import scala.collection.mutable.Queue

// Create queue and add RDDs
val rddQueue = Queue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)

// Add data to queue (typically done in another thread)
for (i <- 1 to 10) {
  rddQueue += ssc.sparkContext.parallelize(1 to 100, 2)
}

// Process one RDD at a time
val singleRDDStream = ssc.queueStream(rddQueue, oneAtATime = true)

Custom Receiver Input Streams

Input streams using custom receiver implementations for specialized data sources.

/**
 * Create input stream from custom receiver
 * @param receiver - Custom receiver implementation extending Receiver[T]
 * @returns ReceiverInputDStream from the receiver
 */
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]

/**
 * Create input stream from multiple receivers (union of all receivers)
 * @param receivers - Sequence of receivers to union
 * @returns Combined ReceiverInputDStream from all receivers
 */
def union[T: ClassTag](receivers: Seq[ReceiverInputDStream[T]]): DStream[T]

Custom Receiver Implementation:

abstract class Receiver[T](storageLevel: StorageLevel) extends Serializable {
  /**
   * Start the receiver - implement data receiving logic
   */
  def onStart(): Unit
  
  /**
   * Stop the receiver - implement cleanup logic  
   */
  def onStop(): Unit
  
  /**
   * Store received data
   * @param data - Single data item to store
   */
  def store(data: T): Unit
  
  /**
   * Store multiple data items
   * @param data - Iterator of data items to store
   */  
  def store(data: Iterator[T]): Unit
  
  /**
   * Report error to driver
   * @param message - Error message
   * @param throwable - Exception that occurred (optional)
   */
  def reportError(message: String, throwable: Throwable = null): Unit
  
  /**
   * Check if receiver is stopped
   * @returns true if receiver has been stopped
   */
  def isStopped(): Boolean
}

Usage Examples:

// Custom HTTP receiver
class HttpReceiver(url: String, storageLevel: StorageLevel) 
  extends Receiver[String](storageLevel) {
  
  var httpClient: HttpClient = _
  
  def onStart() {
    httpClient = new HttpClient()
    // Start background thread to poll HTTP endpoint
    new Thread("Http Receiver") {
      override def run() {
        while (!isStopped()) {
          val response = httpClient.get(url)
          store(response.body)
          Thread.sleep(1000)
        }
      }
    }.start()
  }
  
  def onStop() {
    if (httpClient != null) httpClient.close()
  }
}

// Use custom receiver
val httpStream = ssc.receiverStream(new HttpReceiver("http://api.example.com/data", MEMORY_ONLY))

Union Operations for Input Streams

Combine multiple input streams into a single stream.

/**
 * Union multiple DStreams of the same type
 * @param streams - Sequence of DStreams to union
 * @returns Single DStream containing data from all input streams
 */
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]

Usage Examples:

// Combine multiple socket streams
val stream1 = ssc.socketTextStream("host1", 9999)
val stream2 = ssc.socketTextStream("host2", 9999)  
val stream3 = ssc.socketTextStream("host3", 9999)

val combinedStream = ssc.union(Seq(stream1, stream2, stream3))

// Combine different types of streams
val fileStream = ssc.textFileStream("/data/logs")
val socketStream = ssc.socketTextStream("localhost", 9999)
val allLogs = ssc.union(Seq(fileStream, socketStream))

Input Stream Types and Properties

ReceiverInputDStream

Base class for receiver-based input streams that actively receive data.

abstract class ReceiverInputDStream[T: ClassTag](ssc: StreamingContext)
  extends InputDStream[T](ssc) {
  
  /**
   * Get the receiver for this input stream
   * @returns Receiver instance used by this stream
   */
  def getReceiver(): Receiver[T]
}

InputDStream

Base class for all input streams.

abstract class InputDStream[T: ClassTag](ssc: StreamingContext)
  extends DStream[T](ssc) {
  
  /**
   * Start the input stream (called automatically by StreamingContext)
   */
  def start(): Unit
  
  /**
   * Stop the input stream (called automatically by StreamingContext)
   */
  def stop(): Unit
}

Storage Levels

Constants for controlling how received data is stored and replicated.

import org.apache.spark.storage.StorageLevel

// Common storage levels for input streams
StorageLevel.MEMORY_ONLY              // Store in memory only
StorageLevel.MEMORY_ONLY_2            // Store in memory, replicated 2x  
StorageLevel.MEMORY_AND_DISK          // Store in memory, spill to disk
StorageLevel.MEMORY_AND_DISK_2        // Store in memory and disk, replicated 2x
StorageLevel.MEMORY_AND_DISK_SER      // Store serialized in memory, spill to disk
StorageLevel.MEMORY_AND_DISK_SER_2    // Store serialized, replicated 2x (default)

Choosing Storage Levels:

// High performance, risk of data loss
val fastStream = ssc.socketTextStream("host", 9999, StorageLevel.MEMORY_ONLY)

// Fault tolerant, slower
val reliableStream = ssc.socketTextStream("host", 9999, StorageLevel.MEMORY_AND_DISK_SER_2)

// Memory efficient
val compactStream = ssc.socketTextStream("host", 9999, StorageLevel.MEMORY_AND_DISK_SER)

Advanced Input Stream Configuration

File Stream Monitoring Behavior

File streams have specific behavior for monitoring directories:

  • New Files Only: By default, only files created after the stream started are processed
  • File Atomicity: Files should be moved into the directory atomically (rename operation)
  • File Formats: Support for text files, binary files, and Hadoop InputFormats
  • Nested Directories: Recursive monitoring of subdirectories is not supported

Receiver Reliability

Receivers can be reliable or unreliable:

  • Reliable Receivers: Acknowledge data receipt and can replay data on failure
  • Unreliable Receivers: Do not acknowledge receipt, data may be lost on failure
// Reliable receiver pattern
class ReliableReceiver extends Receiver[String](MEMORY_AND_DISK_2) {
  def onStart() {
    // Implementation that can replay data from last acknowledged offset
  }
}

// Unreliable receiver (simpler but less fault-tolerant)
class SimpleReceiver extends Receiver[String](MEMORY_ONLY) {
  def onStart() {
    // Simple implementation, data may be lost on failure
  }
}

Backpressure and Rate Limiting

Control the rate of data ingestion to prevent overwhelming the system:

// Enable backpressure (automatically adjusts receiving rate)
val conf = new SparkConf()
  .set("spark.streaming.backpressure.enabled", "true")
  .set("spark.streaming.backpressure.initialRate", "1000")

// Set maximum rate for receivers  
val conf2 = new SparkConf()
  .set("spark.streaming.receiver.maxRate", "1000")

Configuration Examples:

// Production configuration for reliable file processing
val conf = new SparkConf()
  .setAppName("LogProcessor")
  .set("spark.streaming.stopGracefullyOnShutdown", "true")
  .set("spark.streaming.backpressure.enabled", "true")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("hdfs://checkpoint")

val logStream = ssc.textFileStream("/var/log/app")
  .filter(_.contains("ERROR"))
  .cache()

logStream.foreachRDD { rdd =>
  if (!rdd.isEmpty()) {
    rdd.saveAsTextFile(s"hdfs://processed/${System.currentTimeMillis()}")
  }
}