or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-streams.mdevent-monitoring.mdindex.mdinput-sources.mdjava-api.mdkey-value-ops.mdstate-management.mdstreaming-context.mdwindow-ops.md

key-value-ops.mddocs/

0

# Key-Value Operations

1

2

Key-value operations are specialized transformations available on DStreams of (K, V) pairs through implicit conversion to PairDStreamFunctions. These operations provide aggregation, join, and state management capabilities essential for stream processing applications.

3

4

## Capabilities

5

6

### Basic Aggregations

7

8

Core aggregation operations for grouping and reducing data by key.

9

10

```scala { .api }

11

/**

12

* Group values by key in each batch

13

* @returns DStream of (key, iterable of values) pairs

14

*/

15

def groupByKey(): DStream[(K, Iterable[V])]

16

17

/**

18

* Group values by key with custom number of partitions

19

* @param numPartitions - Number of partitions for the result

20

* @returns DStream of (key, iterable of values) pairs

21

*/

22

def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]

23

24

/**

25

* Group values by key with custom partitioner

26

* @param partitioner - Custom partitioner for result distribution

27

* @returns DStream of (key, iterable of values) pairs

28

*/

29

def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]

30

31

/**

32

* Reduce values by key using associative function

33

* @param reduceFunc - Associative and commutative function to combine values

34

* @returns DStream of (key, reduced value) pairs

35

*/

36

def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)]

37

38

/**

39

* Reduce values by key with custom number of partitions

40

* @param reduceFunc - Associative and commutative function to combine values

41

* @param numPartitions - Number of partitions for the result

42

* @returns DStream of (key, reduced value) pairs

43

*/

44

def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)]

45

46

/**

47

* Reduce values by key with custom partitioner

48

* @param reduceFunc - Associative and commutative function to combine values

49

* @param partitioner - Custom partitioner for result distribution

50

* @returns DStream of (key, reduced value) pairs

51

*/

52

def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]

53

```

54

55

**Usage Examples:**

56

57

```scala

58

val pairs = words.map(word => (word, 1))

59

60

// Basic aggregations

61

val grouped = pairs.groupByKey()

62

val wordCounts = pairs.reduceByKey(_ + _)

63

64

// With custom partitioning

65

val wordCountsPartitioned = pairs.reduceByKey(_ + _, 4)

66

```

67

68

### Advanced Aggregations

69

70

More sophisticated aggregation patterns using combineByKey for complex data structures.

71

72

```scala { .api }

73

/**

74

* Generic aggregation using combiner functions

75

* @param createCombiner - Function to create initial combiner from value

76

* @param mergeValue - Function to merge value into combiner

77

* @param mergeCombiner - Function to merge two combiners

78

* @param partitioner - Partitioner for result distribution

79

* @param mapSideCombine - Whether to perform map-side combining (default true)

80

* @returns DStream of (key, combined result) pairs

81

*/

82

def combineByKey[C: ClassTag](

83

createCombiner: V => C,

84

mergeValue: (C, V) => C,

85

mergeCombiner: (C, C) => C,

86

partitioner: Partitioner,

87

mapSideCombine: Boolean = true

88

): DStream[(K, C)]

89

90

/**

91

* Generic aggregation with default partitioner

92

* @param createCombiner - Function to create initial combiner from value

93

* @param mergeValue - Function to merge value into combiner

94

* @param mergeCombiner - Function to merge two combiners

95

* @param numPartitions - Number of partitions for result

96

* @returns DStream of (key, combined result) pairs

97

*/

98

def combineByKey[C: ClassTag](

99

createCombiner: V => C,

100

mergeValue: (C, V) => C,

101

mergeCombiner: (C, C) => C,

102

numPartitions: Int

103

): DStream[(K, C)]

104

105

/**

106

* Aggregate values by key with zero value and combining functions

107

* @param zeroValue - Zero value for the aggregation

108

* @param seqOp - Function to combine value with aggregator

109

* @param combOp - Function to combine two aggregators

110

* @returns DStream of (key, aggregated result) pairs

111

*/

112

def aggregateByKey[U: ClassTag](zeroValue: U)(

113

seqOp: (U, V) => U,

114

combOp: (U, U) => U

115

): DStream[(K, U)]

116

```

117

118

### Value Transformations

119

120

Operations that transform values while preserving keys.

121

122

```scala { .api }

123

/**

124

* Transform values while keeping keys unchanged

125

* @param mapValuesFunc - Function to transform each value

126

* @returns DStream with same keys but transformed values

127

*/

128

def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)]

129

130

/**

131

* Transform each value to multiple values while preserving keys

132

* @param flatMapValuesFunc - Function returning collection of new values

133

* @returns DStream with same keys but flattened values

134

*/

135

def flatMapValues[U: ClassTag](flatMapValuesFunc: V => TraversableOnce[U]): DStream[(K, U)]

136

```

137

138

### Window Aggregations

139

140

Aggregation operations over sliding time windows.

141

142

```scala { .api }

143

/**

144

* Group values by key over a sliding window

145

* @param windowDuration - Width of the window

146

* @param slideDuration - Sliding interval of the window (optional)

147

* @returns DStream of (key, iterable of values) over windows

148

*/

149

def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]

150

def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]

151

152

/**

153

* Group values by key over window with custom partitioning

154

* @param windowDuration - Width of the window

155

* @param slideDuration - Sliding interval of the window

156

* @param numPartitions - Number of partitions for result

157

* @returns DStream of (key, iterable of values) over windows

158

*/

159

def groupByKeyAndWindow(

160

windowDuration: Duration,

161

slideDuration: Duration,

162

numPartitions: Int

163

): DStream[(K, Iterable[V])]

164

165

/**

166

* Reduce values by key over a sliding window

167

* @param reduceFunc - Associative function to combine values

168

* @param windowDuration - Width of the window

169

* @param slideDuration - Sliding interval of the window (optional)

170

* @returns DStream of (key, reduced value) over windows

171

*/

172

def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration): DStream[(K, V)]

173

def reduceByKeyAndWindow(

174

reduceFunc: (V, V) => V,

175

windowDuration: Duration,

176

slideDuration: Duration

177

): DStream[(K, V)]

178

179

/**

180

* Efficient reduce by key over window with inverse function

181

* @param reduceFunc - Associative function to combine values

182

* @param invReduceFunc - Inverse function to remove old values

183

* @param windowDuration - Width of the window

184

* @param slideDuration - Sliding interval of the window

185

* @param numPartitions - Number of partitions for result (optional)

186

* @param filterFunc - Function to filter results (optional)

187

* @returns DStream of (key, reduced value) over windows

188

*/

189

def reduceByKeyAndWindow(

190

reduceFunc: (V, V) => V,

191

invReduceFunc: (V, V) => V,

192

windowDuration: Duration,

193

slideDuration: Duration,

194

numPartitions: Int = ssc.sc.defaultParallelism,

195

filterFunc: ((K, V)) => Boolean = null

196

): DStream[(K, V)]

197

```

198

199

**Usage Examples:**

200

201

```scala

202

val wordPairs = lines.flatMap(_.split(" ")).map((_, 1))

203

204

// Window aggregations

205

val windowedCounts = wordPairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

206

207

// Efficient windowed counting with inverse function

208

val efficientCounts = wordPairs.reduceByKeyAndWindow(

209

_ + _, // Add new values

210

_ - _, // Remove old values

211

Seconds(30), // Window duration

212

Seconds(10) // Slide duration

213

)

214

215

// Filter low counts

216

val filteredCounts = wordPairs.reduceByKeyAndWindow(

217

_ + _, _ - _, Seconds(30), Seconds(10), 2, _._2 > 5

218

)

219

```

220

221

### State Management

222

223

Stateful operations that maintain state across batches.

224

225

```scala { .api }

226

/**

227

* Update state by key across batches

228

* @param updateFunc - Function to update state given new values and previous state

229

* @returns DStream of (key, state) pairs

230

*/

231

def updateStateByKey[S: ClassTag](

232

updateFunc: (Seq[V], Option[S]) => Option[S]

233

): DStream[(K, S)]

234

235

/**

236

* Update state by key with custom partitioning

237

* @param updateFunc - Function to update state given new values and previous state

238

* @param numPartitions - Number of partitions for state storage

239

* @returns DStream of (key, state) pairs

240

*/

241

def updateStateByKey[S: ClassTag](

242

updateFunc: (Seq[V], Option[S]) => Option[S],

243

numPartitions: Int

244

): DStream[(K, S)]

245

246

/**

247

* Update state by key with custom partitioner

248

* @param updateFunc - Function to update state given new values and previous state

249

* @param partitioner - Custom partitioner for state distribution

250

* @returns DStream of (key, state) pairs

251

*/

252

def updateStateByKey[S: ClassTag](

253

updateFunc: (Seq[V], Option[S]) => Option[S],

254

partitioner: Partitioner

255

): DStream[(K, S)]

256

257

/**

258

* Map with state using StateSpec (experimental API)

259

* @param spec - StateSpec defining state mapping behavior

260

* @returns MapWithStateDStream for advanced state operations

261

*/

262

def mapWithState[StateType: ClassTag, MappedType: ClassTag](

263

spec: StateSpec[K, V, StateType, MappedType]

264

): MapWithStateDStream[K, V, StateType, MappedType]

265

```

266

267

### Join Operations

268

269

Operations for joining two DStreams by key.

270

271

```scala { .api }

272

/**

273

* Inner join with another DStream by key

274

* @param other - DStream to join with

275

* @returns DStream of (key, (leftValue, rightValue)) pairs

276

*/

277

def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]

278

279

/**

280

* Inner join with custom number of partitions

281

* @param other - DStream to join with

282

* @param numPartitions - Number of partitions for result

283

* @returns DStream of (key, (leftValue, rightValue)) pairs

284

*/

285

def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]

286

287

/**

288

* Left outer join with another DStream

289

* @param other - DStream to join with

290

* @returns DStream of (key, (leftValue, Option[rightValue])) pairs

291

*/

292

def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]

293

294

/**

295

* Right outer join with another DStream

296

* @param other - DStream to join with

297

* @returns DStream of (key, (Option[leftValue], rightValue)) pairs

298

*/

299

def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]

300

301

/**

302

* Full outer join with another DStream

303

* @param other - DStream to join with

304

* @returns DStream of (key, (Option[leftValue], Option[rightValue])) pairs

305

*/

306

def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]

307

308

/**

309

* Cogroup (group together) with another DStream

310

* @param other - DStream to cogroup with

311

* @returns DStream of (key, (Iterable[leftValues], Iterable[rightValues])) pairs

312

*/

313

def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]

314

```

315

316

### Output Operations for Key-Value Streams

317

318

Specialized output operations for key-value data.

319

320

```scala { .api }

321

/**

322

* Save as Hadoop files using old MapReduce API

323

* @param prefix - Prefix for output file names

324

* @param suffix - Suffix for output file names (optional)

325

*/

326

def saveAsHadoopFiles[F <: OutputFormat[K, V]: ClassTag](

327

prefix: String,

328

suffix: String = ""

329

): Unit

330

331

/**

332

* Save as Hadoop files using new MapReduce API

333

* @param prefix - Prefix for output file names

334

* @param suffix - Suffix for output file names (optional)

335

*/

336

def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]: ClassTag](

337

prefix: String,

338

suffix: String = ""

339

): Unit

340

341

/**

342

* Save each RDD as Hadoop file with custom configuration

343

* @param prefix - Prefix for output file names

344

* @param suffix - Suffix for output file names

345

* @param keyClass - Key class for Hadoop

346

* @param valueClass - Value class for Hadoop

347

* @param outputFormatClass - OutputFormat class

348

* @param conf - Hadoop job configuration (optional)

349

*/

350

def saveAsHadoopFiles[F <: OutputFormat[K, V]](

351

prefix: String,

352

suffix: String,

353

keyClass: Class[_],

354

valueClass: Class[_],

355

outputFormatClass: Class[F],

356

conf: JobConf = new JobConf()

357

): Unit

358

```

359

360

**Usage Examples:**

361

362

```scala

363

val stream1 = lines1.map(line => (line.split(",")(0), line)) // (key, data)

364

val stream2 = lines2.map(line => (line.split(",")(0), line))

365

366

// Join operations

367

val innerJoined = stream1.join(stream2)

368

val leftJoined = stream1.leftOuterJoin(stream2)

369

val cogrouped = stream1.cogroup(stream2)

370

371

// State management

372

val runningCounts = wordPairs.updateStateByKey[Int] { (newCounts, currentCount) =>

373

val newCount = currentCount.getOrElse(0) + newCounts.sum

374

if (newCount == 0) None else Some(newCount)

375

}

376

377

// Advanced combineByKey for computing averages

378

val averages = stream.combineByKey(

379

(value: Double) => (value, 1), // Create combiner: (sum, count)

380

(acc: (Double, Int), value) => (acc._1 + value, acc._2 + 1), // Add value

381

(acc1: (Double, Int), acc2: (Double, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // Merge

382

).mapValues { case (sum, count) => sum / count }

383

```

384

385

## Advanced State Operations

386

387

```scala { .api }

388

// StateSpec for mapWithState (Experimental)

389

object StateSpec {

390

/**

391

* Create StateSpec with mapping function

392

* @param mappingFunction - Function to map (key, value, state) to output

393

* @returns StateSpec for use with mapWithState

394

*/

395

def function[KeyType, ValueType, StateType, MappedType](

396

mappingFunction: (KeyType, Option[ValueType], State[StateType]) => Option[MappedType]

397

): StateSpec[KeyType, ValueType, StateType, MappedType]

398

}

399

400

abstract class StateSpec[KeyType, ValueType, StateType, MappedType] {

401

/**

402

* Set initial state RDD

403

* @param rdd - RDD containing initial state for keys

404

* @returns This StateSpec for method chaining

405

*/

406

def initialState(rdd: RDD[(KeyType, StateType)]): this.type

407

408

/**

409

* Set number of partitions for state

410

* @param numPartitions - Number of partitions

411

* @returns This StateSpec for method chaining

412

*/

413

def numPartitions(numPartitions: Int): this.type

414

415

/**

416

* Set timeout for inactive keys

417

* @param idleDuration - Duration after which inactive keys are removed

418

* @returns This StateSpec for method chaining

419

*/

420

def timeout(idleDuration: Duration): this.type

421

}

422

```