or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

advanced-transformations.mdcore-streaming.mdindex.mdinput-sources.mdjava-api.md
tile.json

advanced-transformations.mddocs/

Advanced Transformations and Windowing

Advanced streaming operations including windowed computations, stateful transformations, and complex data processing patterns for sophisticated streaming analytics and temporal data analysis.

Capabilities

Window Operations

Apply operations over sliding windows of data for temporal aggregations and time-based analytics.

abstract class DStream[T] {
  /** Create windowed DStream with specified window and slide durations */
  def window(windowDuration: Duration): DStream[T]
  
  /** Create windowed DStream with custom slide duration */
  def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
  
  /** Reduce elements over a sliding window */
  def reduceByWindow(
    reduceFunc: (T, T) => T, 
    windowDuration: Duration, 
    slideDuration: Duration
  ): DStream[T]
  
  /** Incremental reduce with inverse function for efficiency */
  def reduceByWindow(
    reduceFunc: (T, T) => T,
    invReduceFunc: (T, T) => T, 
    windowDuration: Duration,
    slideDuration: Duration
  ): DStream[T]
  
  /** Count elements in sliding window */
  def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
  
  /** Count unique values in sliding window */
  def countByValueAndWindow(
    windowDuration: Duration, 
    slideDuration: Duration
  ): DStream[(T, Long)]
  
  /** Count unique values with custom partitions */
  def countByValueAndWindow(
    windowDuration: Duration, 
    slideDuration: Duration,
    numPartitions: Int
  ): DStream[(T, Long)]
}

Usage Examples:

import org.apache.spark.streaming._

val numbers: DStream[Int] = // stream of integers

// Basic windowing - collect data over 30 seconds, slide every 10 seconds
val windowed = numbers.window(Seconds(30), Seconds(10))

// Reduce over window - sum all numbers in window
val windowSum = numbers.reduceByWindow(_ + _, Seconds(30), Seconds(10))

// Incremental reduce for efficiency
val efficientSum = numbers.reduceByWindow(
  _ + _,        // reduce function
  _ - _,        // inverse reduce function  
  Seconds(30),  // window duration
  Seconds(10)   // slide duration
)

// Count elements in window
val windowCounts = numbers.countByWindow(Seconds(30), Seconds(10))

// Window operations on text
val words: DStream[String] = ssc.socketTextStream("localhost", 9999).flatMap(_.split(" "))
val wordCounts = words.countByValueAndWindow(Seconds(30), Seconds(10))

wordCounts.print()

Windowed Operations on Pair DStreams

Windowed operations specifically for key-value pairs, enabling temporal aggregations by key.

class PairDStreamFunctions[K, V] {
  /** Group by key over sliding window */
  def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
  
  /** Group by key with custom slide duration */
  def groupByKeyAndWindow(
    windowDuration: Duration, 
    slideDuration: Duration
  ): DStream[(K, Iterable[V])]
  
  /** Group by key with custom partitioner */
  def groupByKeyAndWindow(
    windowDuration: Duration,
    slideDuration: Duration, 
    partitioner: Partitioner
  ): DStream[(K, Iterable[V])]
  
  /** Group by key with partition count */
  def groupByKeyAndWindow(
    windowDuration: Duration,
    slideDuration: Duration,
    numPartitions: Int
  ): DStream[(K, Iterable[V])]
  
  /** Reduce by key over sliding window */
  def reduceByKeyAndWindow(
    reduceFunc: (V, V) => V,
    windowDuration: Duration,
    slideDuration: Duration
  ): DStream[(K, V)]
  
  /** Reduce by key with custom partitioner */
  def reduceByKeyAndWindow(
    reduceFunc: (V, V) => V,
    windowDuration: Duration, 
    slideDuration: Duration,
    partitioner: Partitioner
  ): DStream[(K, V)]
  
  /** Incremental reduce by key with inverse function */
  def reduceByKeyAndWindow(
    reduceFunc: (V, V) => V,
    invReduceFunc: (V, V) => V,
    windowDuration: Duration,
    slideDuration: Duration
  ): DStream[(K, V)]
  
  /** Incremental reduce with custom partitioner */
  def reduceByKeyAndWindow(
    reduceFunc: (V, V) => V,
    invReduceFunc: (V, V) => V, 
    windowDuration: Duration,
    slideDuration: Duration,
    partitioner: Partitioner
  ): DStream[(K, V)]
  
  /** Incremental reduce with partition count and filtering */
  def reduceByKeyAndWindow(
    reduceFunc: (V, V) => V,
    invReduceFunc: (V, V) => V,
    windowDuration: Duration, 
    slideDuration: Duration,
    numPartitions: Int,
    filterFunc: ((K, V)) => Boolean
  ): DStream[(K, V)]
}

Usage Examples:

val pairs: DStream[(String, Int)] = words.map(word => (word, 1))

// Group words by key over 1-minute windows
val groupedByWindow = pairs.groupByKeyAndWindow(Minutes(1), Seconds(10))

// Word count over sliding window
val wordCountsWindow = pairs.reduceByKeyAndWindow(_ + _, Minutes(1), Seconds(10))

// Efficient incremental word counting
val efficientWordCounts = pairs.reduceByKeyAndWindow(
  _ + _,        // add new words
  _ - _,        // subtract old words
  Minutes(1),   // window duration
  Seconds(10)   // slide duration
)

// Window with filtering - only words with count > 5
val filteredCounts = pairs.reduceByKeyAndWindow(
  _ + _,
  _ - _,
  Minutes(1),
  Seconds(10), 
  10, // numPartitions
  { case (word, count) => count > 5 }
)

Stateful Transformations

Maintain state across batches for sophisticated streaming computations that require memory of past events.

class PairDStreamFunctions[K, V] {
  /** Update state by key using update function */
  def updateStateByKey[S](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
  
  /** Update state with custom partitioner */
  def updateStateByKey[S](
    updateFunc: (Seq[V], Option[S]) => Option[S],
    partitioner: Partitioner
  ): DStream[(K, S)]
  
  /** Update state with partition count */
  def updateStateByKey[S](
    updateFunc: (Seq[V], Option[S]) => Option[S], 
    numPartitions: Int
  ): DStream[(K, S)]
  
  /** Update state with initial RDD */
  def updateStateByKey[S](
    updateFunc: (Seq[V], Option[S]) => Option[S],
    partitioner: Partitioner,
    initialRDD: RDD[(K, S)]
  ): DStream[(K, S)]
  
  /** Advanced stateful operations using mapWithState */
  def mapWithState[StateType, MappedType](
    spec: StateSpec[K, V, StateType, MappedType]
  ): MapWithStateDStream[K, V, StateType, MappedType]
}

Usage Examples:

// Running word count with updateStateByKey
val runningCounts = pairs.updateStateByKey[Int] { (values: Seq[Int], state: Option[Int]) =>
  val currentCount = values.sum
  val previousCount = state.getOrElse(0)
  Some(currentCount + previousCount)
}

// Session tracking example
case class Session(startTime: Long, lastSeen: Long, eventCount: Int)

val sessionUpdates = userEvents.updateStateByKey[Session] { 
  (events: Seq[UserEvent], session: Option[Session]) =>
    if (events.nonEmpty) {
      val now = System.currentTimeMillis()
      session match {
        case Some(s) => 
          // Update existing session
          Some(s.copy(lastSeen = now, eventCount = s.eventCount + events.length))
        case None => 
          // New session
          Some(Session(now, now, events.length))
      }
    } else {
      // No new events - check if session should timeout
      session.filter(s => System.currentTimeMillis() - s.lastSeen < 300000) // 5 min timeout
    }
}

// Complex state with custom partitioner
val partitioner = new HashPartitioner(8)
val statefulStream = pairs.updateStateByKey(updateFunc, partitioner)

MapWithState Operations

More efficient and flexible stateful operations with timeout support and better performance.

/**
 * Specification for mapWithState operation
 */
class StateSpec[KeyType, ValueType, StateType, MappedType] private (
  mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
) {
  /** Set initial state RDD */
  def initialState(rdd: RDD[(KeyType, StateType)]): StateSpec[KeyType, ValueType, StateType, MappedType]
  
  /** Set number of partitions */
  def numPartitions(numPartitions: Int): StateSpec[KeyType, ValueType, StateType, MappedType]
  
  /** Set custom partitioner */
  def partitioner(partitioner: Partitioner): StateSpec[KeyType, ValueType, StateType, MappedType]
  
  /** Set timeout duration for idle keys */
  def timeout(idleDuration: Duration): StateSpec[KeyType, ValueType, StateType, MappedType]
}

object StateSpec {
  /** Create StateSpec with mapping function */
  def function[KeyType, ValueType, StateType, MappedType](
    mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
  ): StateSpec[KeyType, ValueType, StateType, MappedType]
}

/**
 * State object for managing state in mapWithState operations
 */
abstract class State[S] {
  /** Check if state exists */
  def exists(): Boolean
  
  /** Get current state value */
  def get(): S
  
  /** Update state with new value */
  def update(newState: S): Unit
  
  /** Remove state */
  def remove(): Unit
  
  /** Check if state is timing out */
  def isTimingOut(): Boolean
}

/**
 * DStream returned by mapWithState operation
 */
class MapWithStateDStream[K, V, StateType, MappedType] extends DStream[MappedType] {
  /** Get snapshots of current state */
  def stateSnapshots(): DStream[(K, StateType)]
}

Usage Examples:

import org.apache.spark.streaming._

// Efficient session tracking with mapWithState
val trackSessions = StateSpec.function(
  (userId: String, event: Option[UserEvent], state: State[Session]) => {
    val currentTime = System.currentTimeMillis()
    
    event match {
      case Some(e) =>
        // Update or create session
        if (state.exists()) {
          val session = state.get()
          state.update(session.copy(lastSeen = currentTime, eventCount = session.eventCount + 1))
          (userId, session.eventCount + 1) // return event count
        } else {
          val newSession = Session(currentTime, currentTime, 1)
          state.update(newSession)
          (userId, 1)
        }
      case None =>
        // Timeout case
        if (state.isTimingOut()) {
          val session = state.get()
          state.remove()
          (userId, -1) // indicate session ended
        } else {
          (userId, 0) // no change
        }
    }
  }
).timeout(Minutes(30)) // 30-minute timeout

val sessionEvents: MapWithStateDStream[String, UserEvent, Session, (String, Int)] = userEvents.mapWithState(trackSessions)

// Word count with mapWithState
val wordCountSpec = StateSpec.function(
  (word: String, count: Option[Int], state: State[Int]) => {
    val currentCount = count.getOrElse(0)
    val previousCount = state.getOrElse(0)
    val newCount = currentCount + previousCount
    state.update(newCount)
    (word, newCount)
  }
).numPartitions(10)

val wordCounts: MapWithStateDStream[String, Int, Int, (String, Int)] = pairs.mapWithState(wordCountSpec)

// Get snapshot of current state
val currentState: DStream[(String, Int)] = wordCounts.stateSnapshots()
currentState.print()

Transform Operations

Apply arbitrary RDD transformations to DStreams for custom processing logic.

abstract class DStream[T] {
  /** Transform DStream using RDD operations */
  def transform[U](transformFunc: RDD[T] => RDD[U]): DStream[U]
  
  /** Transform with access to batch time */
  def transform[U](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]
  
  /** Transform with multiple DStreams */
  def transformWith[U, V](
    other: DStream[U], 
    transformFunc: (RDD[T], RDD[U]) => RDD[V]
  ): DStream[V]
  
  /** Transform with multiple DStreams and time access */
  def transformWith[U, V](
    other: DStream[U],
    transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
  ): DStream[V]
}

class StreamingContext {
  /** Transform multiple DStreams together */
  def transform[T](transformFunc: Seq[RDD[_]] => RDD[T]): DStream[T]
}

Usage Examples:

val lines: DStream[String] = ssc.textFileStream("/path/to/files")

// Custom transformation with RDD operations
val processed = lines.transform { rdd =>
  // Use any RDD operation
  rdd.filter(_.nonEmpty)
     .map(_.split(","))
     .filter(_.length >= 3)
     .map(fields => (fields(0), fields(1).toInt))
     .reduceByKey(_ + _)
}

// Transform with time information
val timeAware = lines.transform { (rdd, time) =>
  println(s"Processing batch at time: $time")
  rdd.map(line => s"$time: $line")
}

// Transform two DStreams together
val stream1: DStream[String] = // first stream
val stream2: DStream[Int] = // second stream

val combined = stream1.transformWith(stream2) { (rdd1, rdd2) =>
  val count1 = rdd1.count()
  val count2 = rdd2.count()
  rdd1.sparkContext.parallelize(Seq(s"Stream1: $count1, Stream2: $count2"))
}

// Multi-stream transformation
val multiTransform = ssc.transform(Seq(stream1, stream2)) { rdds =>
  val rdd1 = rdds(0).asInstanceOf[RDD[String]]
  val rdd2 = rdds(1).asInstanceOf[RDD[Int]]
  // Custom logic combining multiple RDDs
  rdd1.zipWithIndex().join(rdd2.zipWithIndex()).map(_._2)
}

Advanced Data Processing Patterns

Complex streaming patterns for sophisticated data processing scenarios.

/**
 * Pattern: Deduplication over time window
 */
def deduplicateOverWindow[T](
  stream: DStream[T], 
  windowDuration: Duration,
  slideDuration: Duration
)(keyFunc: T => String): DStream[T] = {
  stream.window(windowDuration, slideDuration)
        .map(item => (keyFunc(item), item))
        .groupByKey()
        .map(_._2.head) // Take first occurrence
}

/**
 * Pattern: Top-K elements by window
 */
def topKByWindow[T](
  stream: DStream[T],
  k: Int,
  windowDuration: Duration,
  slideDuration: Duration
)(implicit ord: Ordering[T]): DStream[Array[T]] = {
  stream.window(windowDuration, slideDuration)
        .transform(_.takeOrdered(k)(ord.reverse))
}

/**
 * Pattern: Threshold-based alerting
 */
def thresholdAlert[K, V](
  stream: DStream[(K, V)],
  threshold: V
)(implicit ord: Ordering[V]): DStream[(K, V)] = {
  stream.filter { case (_, value) => ord.gteq(value, threshold) }
}

Usage Examples:

// Deduplication example
val events: DStream[Event] = // stream of events
val uniqueEvents = deduplicateOverWindow(events, Minutes(5), Seconds(30))(_.id)

// Top-K pattern
val scores: DStream[Int] = // stream of scores  
val topScores = topKByWindow(scores, 10, Minutes(1), Seconds(10))

// Threshold alerting
val metrics: DStream[(String, Double)] = // stream of metrics
val alerts = thresholdAlert(metrics, 90.0) // alert when > 90

alerts.foreachRDD { rdd =>
  rdd.collect().foreach { case (metric, value) =>
    println(s"ALERT: $metric exceeded threshold with value $value")
  }
}