CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-2-11

Scalable, high-throughput, fault-tolerant stream processing library for real-time data processing on Apache Spark

Overview
Eval results
Files

key-value-ops.mddocs/

Key-Value Operations

Key-value operations are specialized transformations available on DStreams of (K, V) pairs through implicit conversion to PairDStreamFunctions. These operations provide aggregation, join, and state management capabilities essential for stream processing applications.

Capabilities

Basic Aggregations

Core aggregation operations for grouping and reducing data by key.

/**
 * Group values by key in each batch
 * @returns DStream of (key, iterable of values) pairs  
 */
def groupByKey(): DStream[(K, Iterable[V])]

/**
 * Group values by key with custom number of partitions
 * @param numPartitions - Number of partitions for the result
 * @returns DStream of (key, iterable of values) pairs
 */
def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]

/**
 * Group values by key with custom partitioner
 * @param partitioner - Custom partitioner for result distribution
 * @returns DStream of (key, iterable of values) pairs
 */
def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]

/**
 * Reduce values by key using associative function
 * @param reduceFunc - Associative and commutative function to combine values
 * @returns DStream of (key, reduced value) pairs
 */
def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)]

/**
 * Reduce values by key with custom number of partitions
 * @param reduceFunc - Associative and commutative function to combine values
 * @param numPartitions - Number of partitions for the result
 * @returns DStream of (key, reduced value) pairs
 */
def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)]

/**
 * Reduce values by key with custom partitioner
 * @param reduceFunc - Associative and commutative function to combine values
 * @param partitioner - Custom partitioner for result distribution
 * @returns DStream of (key, reduced value) pairs
 */
def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]

Usage Examples:

val pairs = words.map(word => (word, 1))

// Basic aggregations
val grouped = pairs.groupByKey()
val wordCounts = pairs.reduceByKey(_ + _)

// With custom partitioning
val wordCountsPartitioned = pairs.reduceByKey(_ + _, 4)

Advanced Aggregations

More sophisticated aggregation patterns using combineByKey for complex data structures.

/**
 * Generic aggregation using combiner functions
 * @param createCombiner - Function to create initial combiner from value
 * @param mergeValue - Function to merge value into combiner
 * @param mergeCombiner - Function to merge two combiners
 * @param partitioner - Partitioner for result distribution
 * @param mapSideCombine - Whether to perform map-side combining (default true)
 * @returns DStream of (key, combined result) pairs
 */
def combineByKey[C: ClassTag](
  createCombiner: V => C,
  mergeValue: (C, V) => C, 
  mergeCombiner: (C, C) => C,
  partitioner: Partitioner,
  mapSideCombine: Boolean = true
): DStream[(K, C)]

/**
 * Generic aggregation with default partitioner
 * @param createCombiner - Function to create initial combiner from value
 * @param mergeValue - Function to merge value into combiner  
 * @param mergeCombiner - Function to merge two combiners
 * @param numPartitions - Number of partitions for result
 * @returns DStream of (key, combined result) pairs
 */
def combineByKey[C: ClassTag](
  createCombiner: V => C,
  mergeValue: (C, V) => C,
  mergeCombiner: (C, C) => C, 
  numPartitions: Int
): DStream[(K, C)]

/**
 * Aggregate values by key with zero value and combining functions
 * @param zeroValue - Zero value for the aggregation
 * @param seqOp - Function to combine value with aggregator
 * @param combOp - Function to combine two aggregators
 * @returns DStream of (key, aggregated result) pairs  
 */
def aggregateByKey[U: ClassTag](zeroValue: U)(
  seqOp: (U, V) => U,
  combOp: (U, U) => U
): DStream[(K, U)]

Value Transformations

Operations that transform values while preserving keys.

/**
 * Transform values while keeping keys unchanged
 * @param mapValuesFunc - Function to transform each value
 * @returns DStream with same keys but transformed values
 */
def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)]

/**
 * Transform each value to multiple values while preserving keys
 * @param flatMapValuesFunc - Function returning collection of new values
 * @returns DStream with same keys but flattened values
 */
def flatMapValues[U: ClassTag](flatMapValuesFunc: V => TraversableOnce[U]): DStream[(K, U)]

Window Aggregations

Aggregation operations over sliding time windows.

/**
 * Group values by key over a sliding window
 * @param windowDuration - Width of the window
 * @param slideDuration - Sliding interval of the window (optional)
 * @returns DStream of (key, iterable of values) over windows
 */
def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]

/**
 * Group values by key over window with custom partitioning  
 * @param windowDuration - Width of the window
 * @param slideDuration - Sliding interval of the window
 * @param numPartitions - Number of partitions for result
 * @returns DStream of (key, iterable of values) over windows
 */
def groupByKeyAndWindow(
  windowDuration: Duration,
  slideDuration: Duration, 
  numPartitions: Int
): DStream[(K, Iterable[V])]

/**
 * Reduce values by key over a sliding window
 * @param reduceFunc - Associative function to combine values
 * @param windowDuration - Width of the window
 * @param slideDuration - Sliding interval of the window (optional)
 * @returns DStream of (key, reduced value) over windows
 */
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration): DStream[(K, V)]
def reduceByKeyAndWindow(
  reduceFunc: (V, V) => V, 
  windowDuration: Duration,
  slideDuration: Duration
): DStream[(K, V)]

/**
 * Efficient reduce by key over window with inverse function
 * @param reduceFunc - Associative function to combine values
 * @param invReduceFunc - Inverse function to remove old values
 * @param windowDuration - Width of the window
 * @param slideDuration - Sliding interval of the window
 * @param numPartitions - Number of partitions for result (optional)
 * @param filterFunc - Function to filter results (optional)
 * @returns DStream of (key, reduced value) over windows
 */
def reduceByKeyAndWindow(
  reduceFunc: (V, V) => V,
  invReduceFunc: (V, V) => V,
  windowDuration: Duration, 
  slideDuration: Duration,
  numPartitions: Int = ssc.sc.defaultParallelism,
  filterFunc: ((K, V)) => Boolean = null
): DStream[(K, V)]

Usage Examples:

val wordPairs = lines.flatMap(_.split(" ")).map((_, 1))

// Window aggregations
val windowedCounts = wordPairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

// Efficient windowed counting with inverse function
val efficientCounts = wordPairs.reduceByKeyAndWindow(
  _ + _,           // Add new values
  _ - _,           // Remove old values  
  Seconds(30),     // Window duration
  Seconds(10)      // Slide duration
)

// Filter low counts
val filteredCounts = wordPairs.reduceByKeyAndWindow(
  _ + _, _ - _, Seconds(30), Seconds(10), 2, _._2 > 5
)

State Management

Stateful operations that maintain state across batches.

/**
 * Update state by key across batches
 * @param updateFunc - Function to update state given new values and previous state
 * @returns DStream of (key, state) pairs
 */
def updateStateByKey[S: ClassTag](
  updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)]

/**
 * Update state by key with custom partitioning
 * @param updateFunc - Function to update state given new values and previous state  
 * @param numPartitions - Number of partitions for state storage
 * @returns DStream of (key, state) pairs
 */
def updateStateByKey[S: ClassTag](
  updateFunc: (Seq[V], Option[S]) => Option[S],
  numPartitions: Int
): DStream[(K, S)]

/**
 * Update state by key with custom partitioner
 * @param updateFunc - Function to update state given new values and previous state
 * @param partitioner - Custom partitioner for state distribution
 * @returns DStream of (key, state) pairs
 */
def updateStateByKey[S: ClassTag](
  updateFunc: (Seq[V], Option[S]) => Option[S], 
  partitioner: Partitioner
): DStream[(K, S)]

/**
 * Map with state using StateSpec (experimental API)
 * @param spec - StateSpec defining state mapping behavior
 * @returns MapWithStateDStream for advanced state operations
 */
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
  spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType]

Join Operations

Operations for joining two DStreams by key.

/**
 * Inner join with another DStream by key
 * @param other - DStream to join with
 * @returns DStream of (key, (leftValue, rightValue)) pairs
 */
def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]

/**
 * Inner join with custom number of partitions
 * @param other - DStream to join with  
 * @param numPartitions - Number of partitions for result
 * @returns DStream of (key, (leftValue, rightValue)) pairs
 */
def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]

/**
 * Left outer join with another DStream
 * @param other - DStream to join with
 * @returns DStream of (key, (leftValue, Option[rightValue])) pairs
 */
def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]

/**
 * Right outer join with another DStream  
 * @param other - DStream to join with
 * @returns DStream of (key, (Option[leftValue], rightValue)) pairs
 */
def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]

/**
 * Full outer join with another DStream
 * @param other - DStream to join with
 * @returns DStream of (key, (Option[leftValue], Option[rightValue])) pairs
 */
def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]

/**
 * Cogroup (group together) with another DStream
 * @param other - DStream to cogroup with
 * @returns DStream of (key, (Iterable[leftValues], Iterable[rightValues])) pairs
 */
def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]

Output Operations for Key-Value Streams

Specialized output operations for key-value data.

/**
 * Save as Hadoop files using old MapReduce API
 * @param prefix - Prefix for output file names
 * @param suffix - Suffix for output file names (optional)
 */
def saveAsHadoopFiles[F <: OutputFormat[K, V]: ClassTag](
  prefix: String,
  suffix: String = ""
): Unit

/**
 * Save as Hadoop files using new MapReduce API  
 * @param prefix - Prefix for output file names
 * @param suffix - Suffix for output file names (optional)
 */
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]: ClassTag](
  prefix: String,
  suffix: String = ""
): Unit

/**
 * Save each RDD as Hadoop file with custom configuration
 * @param prefix - Prefix for output file names
 * @param suffix - Suffix for output file names
 * @param keyClass - Key class for Hadoop
 * @param valueClass - Value class for Hadoop  
 * @param outputFormatClass - OutputFormat class
 * @param conf - Hadoop job configuration (optional)
 */
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
  prefix: String,
  suffix: String,
  keyClass: Class[_],
  valueClass: Class[_], 
  outputFormatClass: Class[F],
  conf: JobConf = new JobConf()
): Unit

Usage Examples:

val stream1 = lines1.map(line => (line.split(",")(0), line))  // (key, data)
val stream2 = lines2.map(line => (line.split(",")(0), line))

// Join operations
val innerJoined = stream1.join(stream2)
val leftJoined = stream1.leftOuterJoin(stream2)
val cogrouped = stream1.cogroup(stream2)

// State management
val runningCounts = wordPairs.updateStateByKey[Int] { (newCounts, currentCount) =>
  val newCount = currentCount.getOrElse(0) + newCounts.sum
  if (newCount == 0) None else Some(newCount)
}

// Advanced combineByKey for computing averages
val averages = stream.combineByKey(
  (value: Double) => (value, 1),           // Create combiner: (sum, count)
  (acc: (Double, Int), value) => (acc._1 + value, acc._2 + 1),  // Add value
  (acc1: (Double, Int), acc2: (Double, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // Merge
).mapValues { case (sum, count) => sum / count }

Advanced State Operations

// StateSpec for mapWithState (Experimental)
object StateSpec {
  /**
   * Create StateSpec with mapping function
   * @param mappingFunction - Function to map (key, value, state) to output  
   * @returns StateSpec for use with mapWithState
   */
  def function[KeyType, ValueType, StateType, MappedType](
    mappingFunction: (KeyType, Option[ValueType], State[StateType]) => Option[MappedType]
  ): StateSpec[KeyType, ValueType, StateType, MappedType]
}

abstract class StateSpec[KeyType, ValueType, StateType, MappedType] {
  /**
   * Set initial state RDD
   * @param rdd - RDD containing initial state for keys
   * @returns This StateSpec for method chaining
   */
  def initialState(rdd: RDD[(KeyType, StateType)]): this.type
  
  /**
   * Set number of partitions for state
   * @param numPartitions - Number of partitions
   * @returns This StateSpec for method chaining
   */
  def numPartitions(numPartitions: Int): this.type
  
  /**
   * Set timeout for inactive keys
   * @param idleDuration - Duration after which inactive keys are removed
   * @returns This StateSpec for method chaining  
   */
  def timeout(idleDuration: Duration): this.type
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-streaming-2-11

docs

data-streams.md

event-monitoring.md

index.md

input-sources.md

java-api.md

key-value-ops.md

state-management.md

streaming-context.md

window-ops.md

tile.json