or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-streaming.mdindex.mdinput-sources.mdjava-api.mdmonitoring-listeners.mdstate-management.mdweb-ui.md
tile.json

input-sources.mddocs/

Input Sources

Data ingestion capabilities for reading from various external sources including files, sockets, message queues, and custom receivers.

Capabilities

Socket Streams

Create DStreams from TCP socket connections for real-time text or binary data ingestion.

/**
 * Create a DStream from TCP socket text stream
 * @param hostname - Host to connect to
 * @param port - Port number
 * @param storageLevel - Storage level for received data
 * @return DStream of strings from socket
 */
def socketTextStream(
  hostname: String, 
  port: Int, 
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[String]

/**
 * Create a DStream from TCP socket with custom converter
 * @param hostname - Host to connect to  
 * @param port - Port number
 * @param converter - Function to convert bytes to objects
 * @param storageLevel - Storage level for received data
 */
def socketStream[T](
  hostname: String,
  port: Int, 
  converter: (InputStream) => Iterator[T],
  storageLevel: StorageLevel
): DStream[T]

/**
 * Create a DStream of raw bytes from TCP socket
 */
def rawSocketStream[T](
  hostname: String,
  port: Int,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[T]

Usage Examples:

// Basic text stream from socket
val lines = ssc.socketTextStream("localhost", 9999)

// Custom object stream with converter
val events = ssc.socketStream(
  "event-server", 8080,
  (inputStream: InputStream) => {
    val reader = new BufferedReader(new InputStreamReader(inputStream))
    Iterator.continually(reader.readLine()).takeWhile(_ != null)
      .map(line => parseEvent(line))
  },
  StorageLevel.MEMORY_AND_DISK
)

File Streams

Monitor file systems and create DStreams from new files appearing in directories.

/**
 * Monitor directory for new text files
 * @param directory - Directory to monitor
 * @return DStream of file contents as strings
 */
def textFileStream(directory: String): DStream[String]

/**
 * Monitor directory for files of specific format
 * @param directory - Directory to monitor  
 * @param filter - Optional file filter function
 * @param newFilesOnly - Process only new files vs all files
 */
def fileStream[K, V, F <: NewInputFormat[K, V]](
  directory: String,
  filter: Path => Boolean = acceptAllFiles,
  newFilesOnly: Boolean = true
): DStream[(K, V)]

/**
 * Read binary records from files
 * @param directory - Directory to monitor
 * @param recordLength - Length of each binary record
 */
def binaryRecordsStream(
  directory: String, 
  recordLength: Int
): DStream[Array[Byte]]

Usage Examples:

// Monitor directory for text files
val logs = ssc.textFileStream("hdfs://namenode/logs")

// Monitor for JSON files with custom processing
val jsonFiles = ssc.fileStream[LongWritable, Text, TextInputFormat](
  "hdfs://namenode/json-data",
  (path: Path) => path.getName.endsWith(".json"),
  newFilesOnly = true
)

// Process binary log files
val binaryLogs = ssc.binaryRecordsStream("hdfs://namenode/binary-logs", 1024)

Queue Streams

Create DStreams from queues of RDDs, primarily used for testing and debugging streaming applications.

/**
 * Create a DStream from a queue of RDDs
 * @param queue - Queue containing RDDs to process
 * @param oneAtATime - Process one RDD per batch vs all available
 * @param defaultRDD - Default RDD when queue is empty
 */
def queueStream[T](
  queue: Queue[RDD[T]], 
  oneAtATime: Boolean = true, 
  defaultRDD: RDD[T] = null
): DStream[T]

Usage Examples:

import scala.collection.mutable.Queue

// Create test data queue
val rddQueue = new Queue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)

// Add test RDDs to queue
for (i <- 1 to 10) {
  rddQueue += ssc.sparkContext.parallelize(Seq(i, i+1, i+2))
}

// Process the queue stream
queueStream.map(_ * 2).print()

Receiver-based Streams

Framework for creating custom receivers to ingest data from external sources.

/**
 * Create input stream using a custom receiver
 * @param receiver - Custom receiver implementation
 */
def receiverStream[T](receiver: Receiver[T]): DStream[T]

/**
 * Abstract base class for custom receivers
 */
abstract class Receiver[T](storageLevel: StorageLevel) extends Serializable {
  
  /** Called when receiver is started */
  def onStart(): Unit
  
  /** Called when receiver is stopped */  
  def onStop(): Unit
  
  /** Store single data item */
  def store(dataItem: T): Unit
  
  /** Store multiple data items */
  def store(dataBuffer: ArrayBuffer[T]): Unit
  
  /** Store data from iterator */
  def store(dataIterator: Iterator[T]): Unit
  
  /** Store raw bytes */
  def store(bytes: ByteBuffer): Unit
  
  /** Stop receiver with message */
  def stop(message: String): Unit
  
  /** Restart receiver with message */
  def restart(message: String): Unit
  
  /** Report error */
  def reportError(message: String, throwable: Throwable): Unit
  
  /** Check if receiver is started */
  def isStarted(): Boolean
  
  /** Check if receiver is stopped */
  def isStopped(): Boolean
  
  /** Preferred location for receiver */
  def preferredLocation: Option[String]
}

Usage Examples:

// Custom receiver for external API
class ApiReceiver(apiUrl: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
  private var client: ApiClient = _
  
  def onStart(): Unit = {
    client = new ApiClient(apiUrl)
    // Start background thread to fetch data
    new Thread(() => {
      while (!isStopped()) {
        try {
          val data = client.fetchData()
          store(data)
          Thread.sleep(1000)
        } catch {
          case e: Exception => reportError("API fetch failed", e)
        }
      }
    }).start()
  }
  
  def onStop(): Unit = {
    if (client != null) client.close()
  }
}

// Use custom receiver
val apiStream = ssc.receiverStream(new ApiReceiver("https://api.example.com/stream"))

Input Stream Classes

Base classes and implementations for different types of input streams.

/**
 * Base class for all input streams
 */
abstract class InputDStream[T](ssc: StreamingContext) extends DStream[T] {
  
  /** Duration between batches */
  override def slideDuration: Duration = ssc.graph.batchDuration
  
  /** Input streams have no dependencies */
  override def dependencies: List[DStream[_]] = List()
  
  /** Compute RDD for given time */
  override def compute(validTime: Time): Option[RDD[T]]
}

/**
 * Base class for receiver-based input streams
 */
abstract class ReceiverInputDStream[T](ssc: StreamingContext) extends InputDStream[T](ssc) {
  
  /** Get the receiver for this input stream */
  def getReceiver(): Receiver[T]
  
  /** Start the receiver */
  def start(): Unit
  
  /** Stop the receiver */
  def stop(): Unit
}

/**
 * Input stream for file monitoring
 */
class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
  ssc: StreamingContext,
  directory: String,
  filter: Path => Boolean,
  newFilesOnly: Boolean
) extends InputDStream[(K, V)](ssc)

/**
 * Input stream for socket connections
 */
class SocketInputDStream[T](
  ssc: StreamingContext,
  host: String,
  port: Int,
  converter: (InputStream) => Iterator[T],
  storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc)

/**
 * Input stream from RDD queue (for testing)
 */
class QueueInputDStream[T](
  ssc: StreamingContext,
  queue: Queue[RDD[T]],
  oneAtATime: Boolean,
  defaultRDD: RDD[T]
) extends InputDStream[T](ssc)

/**
 * Input stream that generates constant RDD
 */
class ConstantInputDStream[T](
  ssc: StreamingContext,
  rdd: RDD[T]
) extends InputDStream[T](ssc)

External Source Integration

Spark Streaming integrates with many external sources through additional libraries:

  • Kafka: spark-streaming-kafka for Apache Kafka integration
  • Flume: spark-streaming-flume for Apache Flume integration
  • Kinesis: spark-streaming-kinesis-asl for Amazon Kinesis
  • Twitter: spark-streaming-twitter for Twitter API
  • MQTT: spark-streaming-mqtt for MQTT message brokers
  • ZeroMQ: spark-streaming-zeromq for ZeroMQ messaging

Each integration provides specialized input stream methods added to StreamingContext through implicit conversions.