or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming_2.11@1.6.x

docs

dstream-operations.mdindex.mdinput-sources.mdjava-api.mdmonitoring-listeners.mdpaired-dstream-operations.mdreceiver-framework.mdstreaming-context.mdutility-classes.mdwindow-operations.md
tile.json

tessl/maven-org-apache-spark--spark-streaming_2-11

tessl install tessl/maven-org-apache-spark--spark-streaming_2-11@1.6.0

Apache Spark Streaming library for processing live data streams with fault-tolerance and high throughput.

paired-dstream-operations.mddocs/

Paired DStream Operations

Paired DStreams (DStreams of key-value pairs) provide additional operations for working with keyed data, including joins, grouping, state management, and key-based aggregations.

Core Imports

import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.{HashPartitioner, Partitioner}

Key-Based Grouping

def groupByKey(): DStream[(K, Iterable[V])]
def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]

groupByKey() - Group values by key using default partitioner.

groupByKey(numPartitions) - Group values by key with specified number of partitions.

groupByKey(partitioner) - Group values by key using provided partitioner.

Examples:

val pairs = ssc.socketTextStream("localhost", 9999)
  .flatMap(_.split(" "))
  .map(word => (word.charAt(0), word))

// Group words by first character
val grouped = pairs.groupByKey()
grouped.print()

// Group with specific partitioning
val groupedPartitioned = pairs.groupByKey(new HashPartitioner(4))

Key-Based Reduction

def reduceByKey(func: (V, V) => V): DStream[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]
def reduceByKey(func: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]

reduceByKey(func) - Reduce values for each key using associative function.

reduceByKey(func, numPartitions) - Reduce with specified partitioning.

reduceByKey(func, partitioner) - Reduce using custom partitioner.

Examples:

val wordCounts = lines
  .flatMap(_.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _) // Sum counts for each word

// With custom partitioning for better load balancing
val balancedCounts = wordPairs
  .reduceByKey(_ + _, new HashPartitioner(8))

Advanced Reduction

def combineByKey[C: ClassTag](
  createCombiner: V => C,
  mergeValue: (C, V) => C, 
  mergeCombiners: (C, C) => C
): DStream[(K, C)]
def combineByKey[C: ClassTag](
  createCombiner: V => C,
  mergeValue: (C, V) => C,
  mergeCombiners: (C, C) => C,
  partitioner: Partitioner
): DStream[(K, C)]

combineByKey - Generic key-based aggregation with custom combiner functions.

Examples:

// Calculate average per key
val averages = pairs.combineByKey(
  (value: Int) => (value, 1),              // Create combiner
  (acc: (Int, Int), value: Int) => (acc._1 + value, acc._2 + 1), // Merge value
  (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // Merge combiners
).map { case (key, (sum, count)) => (key, sum.toDouble / count) }

Join Operations

def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]
def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]
def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
def leftOuterJoin[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, Option[W]))]
def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
def rightOuterJoin[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Option[V], W))]
def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]
def fullOuterJoin[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Option[V], Option[W]))]

join - Inner join two DStreams by key, returning pairs where keys exist in both streams.

leftOuterJoin - Left outer join, keeping all keys from left stream.

rightOuterJoin - Right outer join, keeping all keys from right stream.

fullOuterJoin - Full outer join, keeping keys from both streams.

Examples:

val userProfiles = ssc.socketTextStream("localhost", 9999)
  .map(line => { val parts = line.split(","); (parts(0), parts(1)) })

val userActions = ssc.socketTextStream("localhost", 9998)
  .map(line => { val parts = line.split(","); (parts(0), parts(1)) })

// Inner join - only users with both profile and action
val enrichedActions = userActions.join(userProfiles)
enrichedActions.print()

// Left outer join - all actions, with profile if available
val actionsWithProfiles = userActions.leftOuterJoin(userProfiles)
actionsWithProfiles.foreachRDD { rdd =>
  rdd.collect().foreach {
    case (userId, (action, Some(profile))) => println(s"$userId: $action ($profile)")
    case (userId, (action, None)) => println(s"$userId: $action (no profile)")
  }
}

State Management

Update State By Key

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
def updateStateByKey[S: ClassTag](
  updateFunc: (Seq[V], Option[S]) => Option[S],
  partitioner: Partitioner
): DStream[(K, S)]
def updateStateByKey[S: ClassTag](
  updateFunc: (Seq[V], Option[S]) => Option[S],
  partitioner: Partitioner,
  rememberPartitioner: Boolean
): DStream[(K, S)]

updateStateByKey - Maintain state across batches for each key. Requires checkpointing.

Parameters:

  • updateFunc - Function to update state given new values and current state
  • partitioner - Custom partitioner for state distribution
  • rememberPartitioner - Whether to remember partitioner across batches

Examples:

// Running word count totals
val runningCounts = words.map((_, 1))
  .updateStateByKey[Int] { (values: Seq[Int], state: Option[Int]) =>
    Some(state.getOrElse(0) + values.sum)
  }

// User session tracking with timeout
val userSessions = userEvents
  .updateStateByKey[UserSession] { (events: Seq[Event], session: Option[UserSession]) =>
    val currentTime = System.currentTimeMillis()
    val updatedSession = session.getOrElse(UserSession(currentTime))
    
    if (currentTime - updatedSession.lastActivity > SESSION_TIMEOUT) {
      None // Remove expired session
    } else {
      Some(updatedSession.addEvents(events))
    }
  }

Map With State

def mapWithState[StateType: ClassTag, MappedType: ClassTag](
  spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType]

mapWithState - More efficient stateful processing with fine-grained control.

Examples:

// Track user activity with mapWithState
val mappingFunction = (key: String, value: Option[Activity], state: State[UserState]) => {
  val userState = if (state.exists()) state.get() else UserState()
  
  value match {
    case Some(activity) =>
      val updatedState = userState.addActivity(activity)
      state.update(updatedState)
      Some(ActivityResult(key, updatedState.totalActivities))
    case None =>
      if (state.exists()) Some(TimeoutResult(key))
      else None
  }
}

val spec = StateSpec.function(mappingFunction)
  .initialState(initialUserStates)
  .numPartitions(10)
  .timeout(Seconds(60))

val results = userActivities.mapWithState(spec)

Window Operations for Paired DStreams

Grouped Window Operations

def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(
  windowDuration: Duration, 
  slideDuration: Duration,
  numPartitions: Int
): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(
  windowDuration: Duration,
  slideDuration: Duration, 
  partitioner: Partitioner
): DStream[(K, Iterable[V])]

Reduced Window Operations

def reduceByKeyAndWindow(
  func: (V, V) => V,
  windowDuration: Duration,
  slideDuration: Duration
): DStream[(K, V)]
def reduceByKeyAndWindow(
  func: (V, V) => V,
  invFunc: (V, V) => V,
  windowDuration: Duration, 
  slideDuration: Duration
): DStream[(K, V)]
def reduceByKeyAndWindow(
  func: (V, V) => V,
  invFunc: (V, V) => V,
  windowDuration: Duration,
  slideDuration: Duration,
  numPartitions: Int,
  filterFunc: ((K, V)) => Boolean = null
): DStream[(K, V)]

Count Window Operations

def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Long)]
def countByKeyAndWindow(
  windowDuration: Duration,
  slideDuration: Duration, 
  numPartitions: Int
): DStream[(K, Long)]

Examples:

val wordPairs = lines.flatMap(_.split(" ")).map((_, 1))

// Count words over sliding window  
val windowedCounts = wordPairs
  .reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

// Efficient windowed counts with inverse function
val efficientCounts = wordPairs
  .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60), Seconds(20))

// Group events by key over window
val windowedEvents = eventPairs
  .groupByKeyAndWindow(Minutes(5), Minutes(1))

Additional Operations

Key and Value Operations

def keys: DStream[K]
def values: DStream[V]
def mapValues[U: ClassTag](f: V => U): DStream[(K, U)]
def flatMapValues[U: ClassTag](f: V => TraversableOnce[U]): DStream[(K, U)]
def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]
def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Iterable[V], Iterable[W]))]

Examples:

val pairs = lines.map(line => {
  val parts = line.split(",")
  (parts(0), parts(1))
})

// Extract keys and values
val keys = pairs.keys
val values = pairs.values

// Transform values while preserving keys
val upperValues = pairs.mapValues(_.toUpperCase)
val wordLists = pairs.flatMapValues(_.split(" "))

// Cogroup two streams
val stream1 = ssc.socketTextStream("localhost", 9999)
  .map(line => (line.split(",")(0), line.split(",")(1)))
val stream2 = ssc.socketTextStream("localhost", 9998)  
  .map(line => (line.split(",")(0), line.split(",")(1)))

val cogrouped = stream1.cogroup(stream2)
cogrouped.foreachRDD { rdd =>
  rdd.collect().foreach { case (key, (values1, values2)) =>
    println(s"Key: $key, Stream1: ${values1.mkString("[", ",", "]")}, Stream2: ${values2.mkString("[", ",", "]")}")
  }
}

Complete Usage Example

import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.{SparkConf, HashPartitioner}

val conf = new SparkConf().setAppName("PairedDStreamExample").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint("checkpoint")

// Input streams
val userEvents = ssc.socketTextStream("localhost", 9999)
  .map(parseUserEvent) // Returns (userId, event)

val userProfiles = ssc.socketTextStream("localhost", 9998)
  .map(parseUserProfile) // Returns (userId, profile)

// Join streams to enrich events
val enrichedEvents = userEvents.leftOuterJoin(userProfiles)

// Maintain user session state
val userSessions = enrichedEvents.updateStateByKey[UserSession](
  (events: Seq[(Event, Option[Profile])], session: Option[UserSession]) => {
    val currentSession = session.getOrElse(UserSession())
    Some(currentSession.addEvents(events))
  },
  new HashPartitioner(10)
)

// Window-based analytics
val hourlyStats = enrichedEvents
  .map { case (userId, (event, profile)) => (event.eventType, 1) }
  .reduceByKeyAndWindow(_ + _, _ - _, Seconds(3600), Seconds(60))

// Output results
userSessions.print()
hourlyStats.print()

ssc.start()
ssc.awaitTermination()