tessl install tessl/maven-org-apache-spark--spark-streaming_2-11@1.6.0Apache Spark Streaming library for processing live data streams with fault-tolerance and high throughput.
Paired DStreams (DStreams of key-value pairs) provide additional operations for working with keyed data, including joins, grouping, state management, and key-based aggregations.
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.{HashPartitioner, Partitioner}def groupByKey(): DStream[(K, Iterable[V])]
def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]groupByKey() - Group values by key using default partitioner.
groupByKey(numPartitions) - Group values by key with specified number of partitions.
groupByKey(partitioner) - Group values by key using provided partitioner.
Examples:
val pairs = ssc.socketTextStream("localhost", 9999)
.flatMap(_.split(" "))
.map(word => (word.charAt(0), word))
// Group words by first character
val grouped = pairs.groupByKey()
grouped.print()
// Group with specific partitioning
val groupedPartitioned = pairs.groupByKey(new HashPartitioner(4))def reduceByKey(func: (V, V) => V): DStream[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]
def reduceByKey(func: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]reduceByKey(func) - Reduce values for each key using associative function.
reduceByKey(func, numPartitions) - Reduce with specified partitioning.
reduceByKey(func, partitioner) - Reduce using custom partitioner.
Examples:
val wordCounts = lines
.flatMap(_.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _) // Sum counts for each word
// With custom partitioning for better load balancing
val balancedCounts = wordPairs
.reduceByKey(_ + _, new HashPartitioner(8))def combineByKey[C: ClassTag](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C
): DStream[(K, C)]
def combineByKey[C: ClassTag](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner
): DStream[(K, C)]combineByKey - Generic key-based aggregation with custom combiner functions.
Examples:
// Calculate average per key
val averages = pairs.combineByKey(
(value: Int) => (value, 1), // Create combiner
(acc: (Int, Int), value: Int) => (acc._1 + value, acc._2 + 1), // Merge value
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // Merge combiners
).map { case (key, (sum, count)) => (key, sum.toDouble / count) }def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]
def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]
def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
def leftOuterJoin[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, Option[W]))]
def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
def rightOuterJoin[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Option[V], W))]
def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]
def fullOuterJoin[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Option[V], Option[W]))]join - Inner join two DStreams by key, returning pairs where keys exist in both streams.
leftOuterJoin - Left outer join, keeping all keys from left stream.
rightOuterJoin - Right outer join, keeping all keys from right stream.
fullOuterJoin - Full outer join, keeping keys from both streams.
Examples:
val userProfiles = ssc.socketTextStream("localhost", 9999)
.map(line => { val parts = line.split(","); (parts(0), parts(1)) })
val userActions = ssc.socketTextStream("localhost", 9998)
.map(line => { val parts = line.split(","); (parts(0), parts(1)) })
// Inner join - only users with both profile and action
val enrichedActions = userActions.join(userProfiles)
enrichedActions.print()
// Left outer join - all actions, with profile if available
val actionsWithProfiles = userActions.leftOuterJoin(userProfiles)
actionsWithProfiles.foreachRDD { rdd =>
rdd.collect().foreach {
case (userId, (action, Some(profile))) => println(s"$userId: $action ($profile)")
case (userId, (action, None)) => println(s"$userId: $action (no profile)")
}
}def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
): DStream[(K, S)]
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner,
rememberPartitioner: Boolean
): DStream[(K, S)]updateStateByKey - Maintain state across batches for each key. Requires checkpointing.
Parameters:
updateFunc - Function to update state given new values and current statepartitioner - Custom partitioner for state distributionrememberPartitioner - Whether to remember partitioner across batchesExamples:
// Running word count totals
val runningCounts = words.map((_, 1))
.updateStateByKey[Int] { (values: Seq[Int], state: Option[Int]) =>
Some(state.getOrElse(0) + values.sum)
}
// User session tracking with timeout
val userSessions = userEvents
.updateStateByKey[UserSession] { (events: Seq[Event], session: Option[UserSession]) =>
val currentTime = System.currentTimeMillis()
val updatedSession = session.getOrElse(UserSession(currentTime))
if (currentTime - updatedSession.lastActivity > SESSION_TIMEOUT) {
None // Remove expired session
} else {
Some(updatedSession.addEvents(events))
}
}def mapWithState[StateType: ClassTag, MappedType: ClassTag](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType]mapWithState - More efficient stateful processing with fine-grained control.
Examples:
// Track user activity with mapWithState
val mappingFunction = (key: String, value: Option[Activity], state: State[UserState]) => {
val userState = if (state.exists()) state.get() else UserState()
value match {
case Some(activity) =>
val updatedState = userState.addActivity(activity)
state.update(updatedState)
Some(ActivityResult(key, updatedState.totalActivities))
case None =>
if (state.exists()) Some(TimeoutResult(key))
else None
}
}
val spec = StateSpec.function(mappingFunction)
.initialState(initialUserStates)
.numPartitions(10)
.timeout(Seconds(60))
val results = userActivities.mapWithState(spec)def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
): DStream[(K, Iterable[V])]def reduceByKeyAndWindow(
func: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration
): DStream[(K, V)]
def reduceByKeyAndWindow(
func: (V, V) => V,
invFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration
): DStream[(K, V)]
def reduceByKeyAndWindow(
func: (V, V) => V,
invFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int,
filterFunc: ((K, V)) => Boolean = null
): DStream[(K, V)]def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Long)]
def countByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
): DStream[(K, Long)]Examples:
val wordPairs = lines.flatMap(_.split(" ")).map((_, 1))
// Count words over sliding window
val windowedCounts = wordPairs
.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
// Efficient windowed counts with inverse function
val efficientCounts = wordPairs
.reduceByKeyAndWindow(_ + _, _ - _, Seconds(60), Seconds(20))
// Group events by key over window
val windowedEvents = eventPairs
.groupByKeyAndWindow(Minutes(5), Minutes(1))def keys: DStream[K]
def values: DStream[V]
def mapValues[U: ClassTag](f: V => U): DStream[(K, U)]
def flatMapValues[U: ClassTag](f: V => TraversableOnce[U]): DStream[(K, U)]
def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]
def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Iterable[V], Iterable[W]))]Examples:
val pairs = lines.map(line => {
val parts = line.split(",")
(parts(0), parts(1))
})
// Extract keys and values
val keys = pairs.keys
val values = pairs.values
// Transform values while preserving keys
val upperValues = pairs.mapValues(_.toUpperCase)
val wordLists = pairs.flatMapValues(_.split(" "))
// Cogroup two streams
val stream1 = ssc.socketTextStream("localhost", 9999)
.map(line => (line.split(",")(0), line.split(",")(1)))
val stream2 = ssc.socketTextStream("localhost", 9998)
.map(line => (line.split(",")(0), line.split(",")(1)))
val cogrouped = stream1.cogroup(stream2)
cogrouped.foreachRDD { rdd =>
rdd.collect().foreach { case (key, (values1, values2)) =>
println(s"Key: $key, Stream1: ${values1.mkString("[", ",", "]")}, Stream2: ${values2.mkString("[", ",", "]")}")
}
}import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.{SparkConf, HashPartitioner}
val conf = new SparkConf().setAppName("PairedDStreamExample").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint("checkpoint")
// Input streams
val userEvents = ssc.socketTextStream("localhost", 9999)
.map(parseUserEvent) // Returns (userId, event)
val userProfiles = ssc.socketTextStream("localhost", 9998)
.map(parseUserProfile) // Returns (userId, profile)
// Join streams to enrich events
val enrichedEvents = userEvents.leftOuterJoin(userProfiles)
// Maintain user session state
val userSessions = enrichedEvents.updateStateByKey[UserSession](
(events: Seq[(Event, Option[Profile])], session: Option[UserSession]) => {
val currentSession = session.getOrElse(UserSession())
Some(currentSession.addEvents(events))
},
new HashPartitioner(10)
)
// Window-based analytics
val hourlyStats = enrichedEvents
.map { case (userId, (event, profile)) => (event.eventType, 1) }
.reduceByKeyAndWindow(_ + _, _ - _, Seconds(3600), Seconds(60))
// Output results
userSessions.print()
hourlyStats.print()
ssc.start()
ssc.awaitTermination()