or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming_2.11@1.6.x

docs

dstream-operations.mdindex.mdinput-sources.mdjava-api.mdmonitoring-listeners.mdpaired-dstream-operations.mdreceiver-framework.mdstreaming-context.mdutility-classes.mdwindow-operations.md
tile.json

tessl/maven-org-apache-spark--spark-streaming_2-11

tessl install tessl/maven-org-apache-spark--spark-streaming_2-11@1.6.0

Apache Spark Streaming library for processing live data streams with fault-tolerance and high throughput.

input-sources.mddocs/

Input Sources

Spark Streaming provides various input sources for ingesting streaming data from external systems. These input sources create InputDStreams that serve as the starting point for stream processing pipelines.

Core Imports

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.storage.StorageLevel
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

Socket-Based Input Sources

Text Socket Stream

def socketTextStream(
  hostname: String,
  port: Int, 
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]

Creates an input stream that reads text data from a TCP socket connection.

Parameters:

  • hostname - Hostname to connect to
  • port - Port number to connect to
  • storageLevel - Storage level for received data

Example:

val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))

Custom Socket Stream

def socketStream[T: ClassTag](
  hostname: String,
  port: Int,
  converter: (InputStream) => Iterator[T],
  storageLevel: StorageLevel
): ReceiverInputDStream[T]

Creates an input stream with custom data conversion from socket.

Parameters:

  • converter - Function to convert InputStream to Iterator[T]

Example:

val customStream = ssc.socketStream[MyData](
  "localhost", 8888,
  inputStream => parseCustomProtocol(inputStream),
  StorageLevel.MEMORY_AND_DISK_SER
)

Raw Socket Stream

def rawSocketStream[T: ClassTag](
  hostname: String,
  port: Int,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[T]

Creates an input stream for binary data from TCP socket.

Example:

val binaryStream = ssc.rawSocketStream[ByteBuffer]("localhost", 8080)

File-Based Input Sources

Text File Stream

def textFileStream(directory: String): DStream[String]

Monitors a directory for new text files and reads them line by line.

Example:

val textFiles = ssc.textFileStream("hdfs://data/input")
val processedLines = textFiles.filter(_.nonEmpty).map(_.toUpperCase)

Generic File Stream

def fileStream[K, V, F <: NewInputFormat[K, V]](
  directory: String,
  kClass: Class[K],
  vClass: Class[V],
  fClass: Class[F],
  filter: Path => Boolean = _ => true,
  newFilesOnly: Boolean = true
): InputDStream[(K, V)]

Generic file input stream using Hadoop InputFormat.

Parameters:

  • filter - Function to filter which files to process
  • newFilesOnly - Whether to process only new files

Example:

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

val csvFiles = ssc.fileStream[LongWritable, Text, TextInputFormat](
  "hdfs://data/csv",
  classOf[LongWritable],
  classOf[Text], 
  classOf[TextInputFormat],
  (path: Path) => path.getName.endsWith(".csv")
)

Binary Records Stream

def binaryRecordsStream(
  directory: String,
  recordLength: Int
): DStream[Array[Byte]]

Reads binary files with fixed-length records.

Example:

val binaryData = ssc.binaryRecordsStream("hdfs://data/binary", 1024)
val parsedRecords = binaryData.map(parseFixedLengthRecord)

Queue-Based Input Sources

Queue Stream

def queueStream[T: ClassTag](
  queue: Queue[RDD[T]],
  oneAtATime: Boolean = true,
  defaultRDD: RDD[T] = null
): InputDStream[T]

Creates stream from a queue of RDDs, primarily for testing.

Parameters:

  • oneAtATime - Whether to process one RDD per batch
  • defaultRDD - RDD to use when queue is empty

Example:

import scala.collection.mutable.Queue

val rddQueue = new Queue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)

// Add RDDs to queue (typically in another thread)
for (i <- 1 to 10) {
  rddQueue += ssc.sparkContext.parallelize(1 to 100)
}

Receiver-Based Input Sources

Custom Receiver Stream

def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]

Creates stream using custom receiver implementation.

Example:

class CustomReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
  def onStart() {
    // Start receiving data
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }
  
  def onStop() {
    // Stop receiving
  }
  
  private def receive() {
    while (!isStopped()) {
      val data = receiveDataFromSomewhere()
      store(data)
    }
  }
}

val customStream = ssc.receiverStream(new CustomReceiver())

Actor Stream

def actorStream[T: ClassTag](
  props: Props,
  name: String,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
  supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
): ReceiverInputDStream[T]

Creates stream using Akka Actor as receiver.

Example:

import akka.actor.{Actor, Props}
import org.apache.spark.streaming.receiver.ActorReceiver

class MyActor extends Actor with ActorReceiver {
  def receive = {
    case data: String => store(data)
  }
}

val actorStream = ssc.actorStream[String](
  Props[MyActor], 
  "MyReceiver"
)

Advanced Input Sources

Union of Streams

def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]

Combines multiple input streams of the same type.

Example:

val stream1 = ssc.socketTextStream("host1", 9999)
val stream2 = ssc.socketTextStream("host2", 9999)
val stream3 = ssc.textFileStream("hdfs://data")

val combinedStream = ssc.union(Seq(stream1, stream2, stream3))

Constant Input Stream

// Internal class, typically created indirectly
class ConstantInputDStream[T: ClassTag](
  ssc: StreamingContext,
  rdd: RDD[T]
) extends InputDStream[T](ssc)

Stream that repeatedly produces the same RDD (useful for testing).

Input Source Configuration

Storage Levels

Common storage levels for input streams:

StorageLevel.MEMORY_ONLY          // Store in memory only
StorageLevel.MEMORY_AND_DISK      // Memory first, disk if needed  
StorageLevel.MEMORY_AND_DISK_SER  // Serialized in memory and disk
StorageLevel.MEMORY_ONLY_2        // Memory with 2x replication
StorageLevel.MEMORY_AND_DISK_2    // Memory and disk with 2x replication

Example:

val replicatedStream = ssc.socketTextStream(
  "localhost", 9999,
  StorageLevel.MEMORY_AND_DISK_2  // Replicated for fault tolerance
)

Rate Limiting

Control data ingestion rate:

// Set in SparkConf
val conf = new SparkConf()
  .set("spark.streaming.receiver.maxRate", "1000")  // Max records/sec per receiver
  .set("spark.streaming.backpressure.enabled", "true")  // Enable backpressure

val ssc = new StreamingContext(conf, Seconds(1))

Usage Examples

Multi-Source Data Pipeline

// Multiple input sources
val socketData = ssc.socketTextStream("localhost", 9999)
val fileData = ssc.textFileStream("hdfs://data/input")
val queueData = ssc.queueStream(testQueue)

// Combine and process
val allData = ssc.union(Seq(socketData, fileData, queueData))
val processed = allData
  .filter(_.nonEmpty)
  .map(parseRecord)
  .filter(_.isValid)

processed.print()

Fault-Tolerant File Processing

val reliableFileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
  "hdfs://data/input",
  classOf[LongWritable],
  classOf[Text],
  classOf[TextInputFormat],
  // Only process .log files modified in last hour
  (path: Path) => {
    path.getName.endsWith(".log") && 
    (System.currentTimeMillis() - path.getModificationTime) < 3600000
  },
  newFilesOnly = true
)

val processedLogs = reliableFileStream
  .map(_._2.toString)  // Extract text content
  .filter(isValidLogLine)
  .map(parseLogLine)

processedLogs.foreachRDD { rdd =>
  if (!rdd.isEmpty()) {
    rdd.saveAsTextFile(s"hdfs://output/${System.currentTimeMillis()}")
  }
}

Custom Protocol Receiver

class CustomProtocolReceiver(host: String, port: Int) 
  extends Receiver[MyMessage](StorageLevel.MEMORY_AND_DISK_2) {
  
  private var socket: Socket = _
  private var thread: Thread = _
  
  def onStart() {
    thread = new Thread("Custom Protocol Receiver") {
      override def run() { receive() }
    }
    thread.start()
  }
  
  def onStop() {
    if (socket != null) {
      socket.close()
    }
    if (thread != null) {
      thread.interrupt()
    }
  }
  
  private def receive() {
    try {
      socket = new Socket(host, port)
      val reader = new BufferedReader(new InputStreamReader(socket.getInputStream))
      
      var line: String = null
      while (!isStopped() && { line = reader.readLine(); line != null }) {
        val message = parseCustomProtocol(line)
        store(message)
      }
    } catch {
      case e: Exception =>
        if (!isStopped()) {
          restart("Error receiving data", e)
        }
    } finally {
      if (socket != null) socket.close()
    }
  }
}

// Use custom receiver
val customStream = ssc.receiverStream(new CustomProtocolReceiver("host", 1234))
val processed = customStream.map(processMessage)

Testing with Queue Stream

import scala.collection.mutable.Queue

// Create test data
val testQueue = new Queue[RDD[String]]()
val queueStream = ssc.queueStream(testQueue)

val processed = queueStream
  .flatMap(_.split(" "))
  .map((_, 1))
  .reduceByKey(_ + _)

// Add test data in separate thread
val dataFeeder = new Thread(new Runnable {
  def run() {
    for (i <- 1 to 100) {
      testQueue += ssc.sparkContext.parallelize(Seq(s"data $i", s"test $i"))
      Thread.sleep(1000)
    }
  }
})
dataFeeder.start()

ssc.start()
ssc.awaitTermination()

Dynamic File Processing

val dynamicFileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
  "hdfs://data/input",
  classOf[LongWritable],
  classOf[Text],
  classOf[TextInputFormat],
  // Dynamic filter based on current time and file patterns
  (path: Path) => {
    val fileName = path.getName
    val currentHour = java.time.LocalDateTime.now().getHour
    
    // Process different file types based on time of day
    if (currentHour >= 9 && currentHour <= 17) {
      fileName.contains("business") || fileName.contains("transactions")
    } else {
      fileName.contains("batch") || fileName.contains("maintenance")
    }
  }
)

val smartProcessing = dynamicFileStream
  .map(_._2.toString)
  .transform { (rdd, time) =>
    val hour = new java.util.Date(time.milliseconds).getHours
    if (hour >= 9 && hour <= 17) {
      rdd.map(processBusinessData)
    } else {
      rdd.map(processBatchData)
    }
  }

smartProcessing.print()