0
# Key-Value Operations
1
2
Key-value operations are specialized transformations available on DStreams of (K, V) pairs through implicit conversion to PairDStreamFunctions. These operations provide aggregation, join, and state management capabilities essential for stream processing applications.
3
4
## Capabilities
5
6
### Basic Aggregations
7
8
Core aggregation operations for grouping and reducing data by key.
9
10
```scala { .api }
11
/**
12
* Group values by key in each batch
13
* @returns DStream of (key, iterable of values) pairs
14
*/
15
def groupByKey(): DStream[(K, Iterable[V])]
16
17
/**
18
* Group values by key with custom number of partitions
19
* @param numPartitions - Number of partitions for the result
20
* @returns DStream of (key, iterable of values) pairs
21
*/
22
def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]
23
24
/**
25
* Group values by key with custom partitioner
26
* @param partitioner - Custom partitioner for result distribution
27
* @returns DStream of (key, iterable of values) pairs
28
*/
29
def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]
30
31
/**
32
* Reduce values by key using associative function
33
* @param reduceFunc - Associative and commutative function to combine values
34
* @returns DStream of (key, reduced value) pairs
35
*/
36
def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)]
37
38
/**
39
* Reduce values by key with custom number of partitions
40
* @param reduceFunc - Associative and commutative function to combine values
41
* @param numPartitions - Number of partitions for the result
42
* @returns DStream of (key, reduced value) pairs
43
*/
44
def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)]
45
46
/**
47
* Reduce values by key with custom partitioner
48
* @param reduceFunc - Associative and commutative function to combine values
49
* @param partitioner - Custom partitioner for result distribution
50
* @returns DStream of (key, reduced value) pairs
51
*/
52
def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]
53
```
54
55
**Usage Examples:**
56
57
```scala
58
val pairs = words.map(word => (word, 1))
59
60
// Basic aggregations
61
val grouped = pairs.groupByKey()
62
val wordCounts = pairs.reduceByKey(_ + _)
63
64
// With custom partitioning
65
val wordCountsPartitioned = pairs.reduceByKey(_ + _, 4)
66
```
67
68
### Advanced Aggregations
69
70
More sophisticated aggregation patterns using combineByKey for complex data structures.
71
72
```scala { .api }
73
/**
74
* Generic aggregation using combiner functions
75
* @param createCombiner - Function to create initial combiner from value
76
* @param mergeValue - Function to merge value into combiner
77
* @param mergeCombiner - Function to merge two combiners
78
* @param partitioner - Partitioner for result distribution
79
* @param mapSideCombine - Whether to perform map-side combining (default true)
80
* @returns DStream of (key, combined result) pairs
81
*/
82
def combineByKey[C: ClassTag](
83
createCombiner: V => C,
84
mergeValue: (C, V) => C,
85
mergeCombiner: (C, C) => C,
86
partitioner: Partitioner,
87
mapSideCombine: Boolean = true
88
): DStream[(K, C)]
89
90
/**
91
* Generic aggregation with default partitioner
92
* @param createCombiner - Function to create initial combiner from value
93
* @param mergeValue - Function to merge value into combiner
94
* @param mergeCombiner - Function to merge two combiners
95
* @param numPartitions - Number of partitions for result
96
* @returns DStream of (key, combined result) pairs
97
*/
98
def combineByKey[C: ClassTag](
99
createCombiner: V => C,
100
mergeValue: (C, V) => C,
101
mergeCombiner: (C, C) => C,
102
numPartitions: Int
103
): DStream[(K, C)]
104
105
/**
106
* Aggregate values by key with zero value and combining functions
107
* @param zeroValue - Zero value for the aggregation
108
* @param seqOp - Function to combine value with aggregator
109
* @param combOp - Function to combine two aggregators
110
* @returns DStream of (key, aggregated result) pairs
111
*/
112
def aggregateByKey[U: ClassTag](zeroValue: U)(
113
seqOp: (U, V) => U,
114
combOp: (U, U) => U
115
): DStream[(K, U)]
116
```
117
118
### Value Transformations
119
120
Operations that transform values while preserving keys.
121
122
```scala { .api }
123
/**
124
* Transform values while keeping keys unchanged
125
* @param mapValuesFunc - Function to transform each value
126
* @returns DStream with same keys but transformed values
127
*/
128
def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)]
129
130
/**
131
* Transform each value to multiple values while preserving keys
132
* @param flatMapValuesFunc - Function returning collection of new values
133
* @returns DStream with same keys but flattened values
134
*/
135
def flatMapValues[U: ClassTag](flatMapValuesFunc: V => TraversableOnce[U]): DStream[(K, U)]
136
```
137
138
### Window Aggregations
139
140
Aggregation operations over sliding time windows.
141
142
```scala { .api }
143
/**
144
* Group values by key over a sliding window
145
* @param windowDuration - Width of the window
146
* @param slideDuration - Sliding interval of the window (optional)
147
* @returns DStream of (key, iterable of values) over windows
148
*/
149
def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
150
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]
151
152
/**
153
* Group values by key over window with custom partitioning
154
* @param windowDuration - Width of the window
155
* @param slideDuration - Sliding interval of the window
156
* @param numPartitions - Number of partitions for result
157
* @returns DStream of (key, iterable of values) over windows
158
*/
159
def groupByKeyAndWindow(
160
windowDuration: Duration,
161
slideDuration: Duration,
162
numPartitions: Int
163
): DStream[(K, Iterable[V])]
164
165
/**
166
* Reduce values by key over a sliding window
167
* @param reduceFunc - Associative function to combine values
168
* @param windowDuration - Width of the window
169
* @param slideDuration - Sliding interval of the window (optional)
170
* @returns DStream of (key, reduced value) over windows
171
*/
172
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration): DStream[(K, V)]
173
def reduceByKeyAndWindow(
174
reduceFunc: (V, V) => V,
175
windowDuration: Duration,
176
slideDuration: Duration
177
): DStream[(K, V)]
178
179
/**
180
* Efficient reduce by key over window with inverse function
181
* @param reduceFunc - Associative function to combine values
182
* @param invReduceFunc - Inverse function to remove old values
183
* @param windowDuration - Width of the window
184
* @param slideDuration - Sliding interval of the window
185
* @param numPartitions - Number of partitions for result (optional)
186
* @param filterFunc - Function to filter results (optional)
187
* @returns DStream of (key, reduced value) over windows
188
*/
189
def reduceByKeyAndWindow(
190
reduceFunc: (V, V) => V,
191
invReduceFunc: (V, V) => V,
192
windowDuration: Duration,
193
slideDuration: Duration,
194
numPartitions: Int = ssc.sc.defaultParallelism,
195
filterFunc: ((K, V)) => Boolean = null
196
): DStream[(K, V)]
197
```
198
199
**Usage Examples:**
200
201
```scala
202
val wordPairs = lines.flatMap(_.split(" ")).map((_, 1))
203
204
// Window aggregations
205
val windowedCounts = wordPairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
206
207
// Efficient windowed counting with inverse function
208
val efficientCounts = wordPairs.reduceByKeyAndWindow(
209
_ + _, // Add new values
210
_ - _, // Remove old values
211
Seconds(30), // Window duration
212
Seconds(10) // Slide duration
213
)
214
215
// Filter low counts
216
val filteredCounts = wordPairs.reduceByKeyAndWindow(
217
_ + _, _ - _, Seconds(30), Seconds(10), 2, _._2 > 5
218
)
219
```
220
221
### State Management
222
223
Stateful operations that maintain state across batches.
224
225
```scala { .api }
226
/**
227
* Update state by key across batches
228
* @param updateFunc - Function to update state given new values and previous state
229
* @returns DStream of (key, state) pairs
230
*/
231
def updateStateByKey[S: ClassTag](
232
updateFunc: (Seq[V], Option[S]) => Option[S]
233
): DStream[(K, S)]
234
235
/**
236
* Update state by key with custom partitioning
237
* @param updateFunc - Function to update state given new values and previous state
238
* @param numPartitions - Number of partitions for state storage
239
* @returns DStream of (key, state) pairs
240
*/
241
def updateStateByKey[S: ClassTag](
242
updateFunc: (Seq[V], Option[S]) => Option[S],
243
numPartitions: Int
244
): DStream[(K, S)]
245
246
/**
247
* Update state by key with custom partitioner
248
* @param updateFunc - Function to update state given new values and previous state
249
* @param partitioner - Custom partitioner for state distribution
250
* @returns DStream of (key, state) pairs
251
*/
252
def updateStateByKey[S: ClassTag](
253
updateFunc: (Seq[V], Option[S]) => Option[S],
254
partitioner: Partitioner
255
): DStream[(K, S)]
256
257
/**
258
* Map with state using StateSpec (experimental API)
259
* @param spec - StateSpec defining state mapping behavior
260
* @returns MapWithStateDStream for advanced state operations
261
*/
262
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
263
spec: StateSpec[K, V, StateType, MappedType]
264
): MapWithStateDStream[K, V, StateType, MappedType]
265
```
266
267
### Join Operations
268
269
Operations for joining two DStreams by key.
270
271
```scala { .api }
272
/**
273
* Inner join with another DStream by key
274
* @param other - DStream to join with
275
* @returns DStream of (key, (leftValue, rightValue)) pairs
276
*/
277
def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]
278
279
/**
280
* Inner join with custom number of partitions
281
* @param other - DStream to join with
282
* @param numPartitions - Number of partitions for result
283
* @returns DStream of (key, (leftValue, rightValue)) pairs
284
*/
285
def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]
286
287
/**
288
* Left outer join with another DStream
289
* @param other - DStream to join with
290
* @returns DStream of (key, (leftValue, Option[rightValue])) pairs
291
*/
292
def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
293
294
/**
295
* Right outer join with another DStream
296
* @param other - DStream to join with
297
* @returns DStream of (key, (Option[leftValue], rightValue)) pairs
298
*/
299
def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
300
301
/**
302
* Full outer join with another DStream
303
* @param other - DStream to join with
304
* @returns DStream of (key, (Option[leftValue], Option[rightValue])) pairs
305
*/
306
def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]
307
308
/**
309
* Cogroup (group together) with another DStream
310
* @param other - DStream to cogroup with
311
* @returns DStream of (key, (Iterable[leftValues], Iterable[rightValues])) pairs
312
*/
313
def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]
314
```
315
316
### Output Operations for Key-Value Streams
317
318
Specialized output operations for key-value data.
319
320
```scala { .api }
321
/**
322
* Save as Hadoop files using old MapReduce API
323
* @param prefix - Prefix for output file names
324
* @param suffix - Suffix for output file names (optional)
325
*/
326
def saveAsHadoopFiles[F <: OutputFormat[K, V]: ClassTag](
327
prefix: String,
328
suffix: String = ""
329
): Unit
330
331
/**
332
* Save as Hadoop files using new MapReduce API
333
* @param prefix - Prefix for output file names
334
* @param suffix - Suffix for output file names (optional)
335
*/
336
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]: ClassTag](
337
prefix: String,
338
suffix: String = ""
339
): Unit
340
341
/**
342
* Save each RDD as Hadoop file with custom configuration
343
* @param prefix - Prefix for output file names
344
* @param suffix - Suffix for output file names
345
* @param keyClass - Key class for Hadoop
346
* @param valueClass - Value class for Hadoop
347
* @param outputFormatClass - OutputFormat class
348
* @param conf - Hadoop job configuration (optional)
349
*/
350
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
351
prefix: String,
352
suffix: String,
353
keyClass: Class[_],
354
valueClass: Class[_],
355
outputFormatClass: Class[F],
356
conf: JobConf = new JobConf()
357
): Unit
358
```
359
360
**Usage Examples:**
361
362
```scala
363
val stream1 = lines1.map(line => (line.split(",")(0), line)) // (key, data)
364
val stream2 = lines2.map(line => (line.split(",")(0), line))
365
366
// Join operations
367
val innerJoined = stream1.join(stream2)
368
val leftJoined = stream1.leftOuterJoin(stream2)
369
val cogrouped = stream1.cogroup(stream2)
370
371
// State management
372
val runningCounts = wordPairs.updateStateByKey[Int] { (newCounts, currentCount) =>
373
val newCount = currentCount.getOrElse(0) + newCounts.sum
374
if (newCount == 0) None else Some(newCount)
375
}
376
377
// Advanced combineByKey for computing averages
378
val averages = stream.combineByKey(
379
(value: Double) => (value, 1), // Create combiner: (sum, count)
380
(acc: (Double, Int), value) => (acc._1 + value, acc._2 + 1), // Add value
381
(acc1: (Double, Int), acc2: (Double, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // Merge
382
).mapValues { case (sum, count) => sum / count }
383
```
384
385
## Advanced State Operations
386
387
```scala { .api }
388
// StateSpec for mapWithState (Experimental)
389
object StateSpec {
390
/**
391
* Create StateSpec with mapping function
392
* @param mappingFunction - Function to map (key, value, state) to output
393
* @returns StateSpec for use with mapWithState
394
*/
395
def function[KeyType, ValueType, StateType, MappedType](
396
mappingFunction: (KeyType, Option[ValueType], State[StateType]) => Option[MappedType]
397
): StateSpec[KeyType, ValueType, StateType, MappedType]
398
}
399
400
abstract class StateSpec[KeyType, ValueType, StateType, MappedType] {
401
/**
402
* Set initial state RDD
403
* @param rdd - RDD containing initial state for keys
404
* @returns This StateSpec for method chaining
405
*/
406
def initialState(rdd: RDD[(KeyType, StateType)]): this.type
407
408
/**
409
* Set number of partitions for state
410
* @param numPartitions - Number of partitions
411
* @returns This StateSpec for method chaining
412
*/
413
def numPartitions(numPartitions: Int): this.type
414
415
/**
416
* Set timeout for inactive keys
417
* @param idleDuration - Duration after which inactive keys are removed
418
* @returns This StateSpec for method chaining
419
*/
420
def timeout(idleDuration: Duration): this.type
421
}
422
```