or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-streams.mdevent-monitoring.mdindex.mdinput-sources.mdjava-api.mdkey-value-ops.mdstate-management.mdstreaming-context.mdwindow-ops.md

state-management.mddocs/

0

# State Management

1

2

State management in Spark Streaming allows applications to maintain state information across streaming batches. This is essential for use cases like session tracking, running aggregations, and maintaining counters that persist beyond individual batch boundaries.

3

4

## Capabilities

5

6

### UpdateStateByKey Operations

7

8

Traditional stateful operations that maintain state across batches using update functions.

9

10

```scala { .api }

11

/**

12

* Update state by key across batches using an update function

13

* @param updateFunc - Function that takes new values and current state, returns updated state

14

* @returns DStream of (key, state) pairs

15

*/

16

def updateStateByKey[S: ClassTag](

17

updateFunc: (Seq[V], Option[S]) => Option[S]

18

): DStream[(K, S)]

19

20

/**

21

* Update state by key with custom number of partitions

22

* @param updateFunc - Function that takes new values and current state, returns updated state

23

* @param numPartitions - Number of partitions for state storage

24

* @returns DStream of (key, state) pairs

25

*/

26

def updateStateByKey[S: ClassTag](

27

updateFunc: (Seq[V], Option[S]) => Option[S],

28

numPartitions: Int

29

): DStream[(K, S)]

30

31

/**

32

* Update state by key with custom partitioner

33

* @param updateFunc - Function that takes new values and current state, returns updated state

34

* @param partitioner - Custom partitioner for state distribution

35

* @returns DStream of (key, state) pairs

36

*/

37

def updateStateByKey[S: ClassTag](

38

updateFunc: (Seq[V], Option[S]) => Option[S],

39

partitioner: Partitioner

40

): DStream[(K, S)]

41

42

/**

43

* Update state by key with initial state RDD

44

* @param updateFunc - State update function

45

* @param partitioner - Partitioner for state distribution

46

* @param initialRDD - RDD containing initial state for keys

47

* @returns DStream of (key, state) pairs

48

*/

49

def updateStateByKey[S: ClassTag](

50

updateFunc: (Seq[V], Option[S]) => Option[S],

51

partitioner: Partitioner,

52

initialRDD: RDD[(K, S)]

53

): DStream[(K, S)]

54

```

55

56

**Usage Examples:**

57

58

```scala

59

val wordPairs = lines.flatMap(_.split(" ")).map((_, 1))

60

61

// Running word count

62

val runningCounts = wordPairs.updateStateByKey[Int] { (values, state) =>

63

val currentCount = state.getOrElse(0)

64

val newCount = currentCount + values.sum

65

if (newCount == 0) None else Some(newCount)

66

}

67

68

// Session tracking with timeout

69

val sessionStream = events.map(event => (event.userId, event.timestamp))

70

val sessions = sessionStream.updateStateByKey[Long] { (timestamps, lastSeen) =>

71

val now = System.currentTimeMillis()

72

val latest = timestamps.max

73

74

// Timeout sessions after 30 minutes of inactivity

75

if (now - latest > 30 * 60 * 1000) {

76

None // Remove inactive session

77

} else {

78

Some(latest) // Update last seen time

79

}

80

}

81

```

82

83

### MapWithState Operations (Experimental)

84

85

Advanced stateful operations providing more efficient state management and additional features.

86

87

```scala { .api }

88

/**

89

* Transform stream with state using StateSpec configuration

90

* @param spec - StateSpec defining state mapping behavior

91

* @returns MapWithStateDStream for advanced state operations

92

*/

93

def mapWithState[StateType: ClassTag, MappedType: ClassTag](

94

spec: StateSpec[K, V, StateType, MappedType]

95

): MapWithStateDStream[K, V, StateType, MappedType]

96

```

97

98

### StateSpec Configuration

99

100

Configuration object for mapWithState operations providing fine-grained control over state behavior.

101

102

```scala { .api }

103

/**

104

* StateSpec factory methods and configuration

105

*/

106

object StateSpec {

107

/**

108

* Create StateSpec with mapping function

109

* @param mappingFunction - Function to map (key, value, state) to output

110

* @returns StateSpec for use with mapWithState

111

*/

112

def function[KeyType, ValueType, StateType, MappedType](

113

mappingFunction: (KeyType, Option[ValueType], State[StateType]) => Option[MappedType]

114

): StateSpec[KeyType, ValueType, StateType, MappedType]

115

}

116

117

abstract class StateSpec[KeyType, ValueType, StateType, MappedType] {

118

/**

119

* Set initial state from RDD

120

* @param rdd - RDD containing initial (key, state) pairs

121

* @returns This StateSpec for method chaining

122

*/

123

def initialState(rdd: RDD[(KeyType, StateType)]): this.type

124

125

/**

126

* Set initial state from Java PairRDD

127

* @param javaPairRDD - Java PairRDD containing initial state

128

* @returns This StateSpec for method chaining

129

*/

130

def initialState(javaPairRDD: JavaPairRDD[KeyType, StateType]): this.type

131

132

/**

133

* Set number of partitions for state storage

134

* @param numPartitions - Number of partitions

135

* @returns This StateSpec for method chaining

136

*/

137

def numPartitions(numPartitions: Int): this.type

138

139

/**

140

* Set custom partitioner for state distribution

141

* @param partitioner - Custom partitioner

142

* @returns This StateSpec for method chaining

143

*/

144

def partitioner(partitioner: Partitioner): this.type

145

146

/**

147

* Set timeout for inactive keys

148

* @param idleDuration - Duration after which inactive keys are timed out

149

* @returns This StateSpec for method chaining

150

*/

151

def timeout(idleDuration: Duration): this.type

152

}

153

```

154

155

### State Object Interface

156

157

Interface for accessing and modifying state within mapWithState operations.

158

159

```scala { .api }

160

/**

161

* State access object for mapWithState operations

162

*/

163

abstract class State[S] {

164

/**

165

* Check if state exists for current key

166

* @returns true if state exists, false otherwise

167

*/

168

def exists(): Boolean

169

170

/**

171

* Get current state value

172

* @returns Current state value (throws exception if state doesn't exist)

173

*/

174

def get(): S

175

176

/**

177

* Get current state as Option

178

* @returns Some(state) if exists, None otherwise

179

*/

180

def getOption(): Option[S]

181

182

/**

183

* Update state with new value

184

* @param newState - New state value to set

185

*/

186

def update(newState: S): Unit

187

188

/**

189

* Remove state for current key

190

*/

191

def remove(): Unit

192

193

/**

194

* Check if this key is timing out in current batch

195

* @returns true if key is timing out, false otherwise

196

*/

197

def isTimingOut(): Boolean

198

}

199

```

200

201

**Usage Examples:**

202

203

```scala

204

// Advanced word counting with mapWithState

205

val wordCounts = wordPairs.mapWithState(

206

StateSpec.function((word: String, count: Option[Int], state: State[Int]) => {

207

val currentCount = state.getOption().getOrElse(0)

208

val newCount = currentCount + count.getOrElse(0)

209

210

state.update(newCount)

211

Some((word, newCount)) // Output current word and count

212

})

213

)

214

215

// Session tracking with timeout

216

val userSessions = userEvents.mapWithState(

217

StateSpec

218

.function((userId: String, event: Option[UserEvent], state: State[SessionInfo]) => {

219

if (state.isTimingOut()) {

220

// Session is timing out, emit final session info

221

val sessionInfo = state.get()

222

state.remove()

223

Some(SessionEnded(userId, sessionInfo))

224

} else {

225

event match {

226

case Some(evt) =>

227

val sessionInfo = state.getOption().getOrElse(SessionInfo.empty)

228

val updatedSession = sessionInfo.addEvent(evt)

229

state.update(updatedSession)

230

Some(SessionUpdate(userId, updatedSession))

231

case None => None

232

}

233

}

234

})

235

.timeout(Minutes(30)) // Timeout inactive sessions after 30 minutes

236

)

237

238

// Complex aggregation state

239

case class AggregateState(count: Long, sum: Double, max: Double, min: Double)

240

241

val aggregates = measurements.mapWithState(

242

StateSpec

243

.function((sensorId: String, value: Option[Double], state: State[AggregateState]) => {

244

value match {

245

case Some(v) =>

246

val current = state.getOption().getOrElse(AggregateState(0, 0.0, Double.MinValue, Double.MaxValue))

247

val updated = AggregateState(

248

count = current.count + 1,

249

sum = current.sum + v,

250

max = math.max(current.max, v),

251

min = math.min(current.min, v)

252

)

253

state.update(updated)

254

Some((sensorId, updated.copy(avg = updated.sum / updated.count)))

255

case None => None

256

}

257

})

258

.initialState(ssc.sparkContext.parallelize(initialAggregates))

259

.numPartitions(4)

260

)

261

```

262

263

### MapWithStateDStream Operations

264

265

Additional operations available on the result of mapWithState.

266

267

```scala { .api }

268

abstract class MapWithStateDStream[K, V, S, E] extends DStream[E] {

269

/**

270

* Get stream of state snapshots (all current key-state pairs)

271

* @returns DStream of (key, state) pairs representing current state

272

*/

273

def stateSnapshots(): DStream[(K, S)]

274

}

275

```

276

277

**Usage Examples:**

278

279

```scala

280

val statefulStream = wordPairs.mapWithState(/* StateSpec */)

281

282

// Get periodic snapshots of all state

283

val stateSnapshots = statefulStream.stateSnapshots()

284

285

// Save state snapshots periodically

286

stateSnapshots.foreachRDD { rdd =>

287

rdd.saveAsTextFile(s"hdfs://state-backup/${System.currentTimeMillis()}")

288

}

289

290

// Monitor state size

291

stateSnapshots.foreachRDD { rdd =>

292

val stateSize = rdd.count()

293

println(s"Current state contains $stateSize keys")

294

}

295

```

296

297

## State Management Best Practices

298

299

### Checkpointing Requirements

300

301

State operations require checkpointing for fault tolerance:

302

303

```scala

304

// State operations require checkpointing

305

ssc.checkpoint("hdfs://checkpoint-dir")

306

307

val statefulStream = keyValueStream.updateStateByKey(updateFunction)

308

// This will fail without checkpoint directory

309

```

310

311

### Memory Management

312

313

State operations can consume significant memory:

314

315

```scala

316

// Efficient state cleanup

317

val cleanupState = keyValueStream.updateStateByKey[MyState] { (values, state) =>

318

val current = state.getOrElse(MyState.empty)

319

val updated = current.update(values)

320

321

// Remove state for inactive keys to save memory

322

if (updated.shouldRemove()) {

323

None

324

} else {

325

Some(updated)

326

}

327

}

328

329

// Use timeout with mapWithState for automatic cleanup

330

val timeoutState = keyValueStream.mapWithState(

331

StateSpec

332

.function(mappingFunction)

333

.timeout(Hours(24)) // Automatically remove state after 24 hours

334

)

335

```

336

337

### Performance Optimization

338

339

```scala

340

// Optimize partitioning for state operations

341

val optimizedState = keyValueStream.updateStateByKey(

342

updateFunction,

343

new HashPartitioner(numPartitions = ssc.sparkContext.defaultParallelism * 2)

344

)

345

346

// Use mapWithState for better performance

347

val efficientState = keyValueStream.mapWithState(

348

StateSpec

349

.function(mappingFunction)

350

.numPartitions(ssc.sparkContext.defaultParallelism * 2)

351

)

352

```

353

354

### Initial State Setup

355

356

```scala

357

// Set up initial state from historical data

358

val historicalData: RDD[(String, Int)] = ssc.sparkContext.textFile("hdfs://historical")

359

.map(line => {

360

val parts = line.split(",")

361

(parts(0), parts(1).toInt)

362

})

363

364

val statefulStream = currentStream.updateStateByKey(

365

updateFunction,

366

new HashPartitioner(4),

367

historicalData

368

)

369

370

// With mapWithState

371

val mapWithStateStream = currentStream.mapWithState(

372

StateSpec

373

.function(mappingFunction)

374

.initialState(historicalData)

375

.numPartitions(4)

376

)

377

```

378

379

## Comparing State Management Approaches

380

381

### UpdateStateByKey vs MapWithState

382

383

```scala

384

// updateStateByKey - traditional approach

385

val updateStateApproach = stream.updateStateByKey[Int] { (values, state) =>

386

val sum = values.sum + state.getOrElse(0)

387

if (sum == 0) None else Some(sum)

388

}

389

390

// mapWithState - more efficient and flexible

391

val mapWithStateApproach = stream.mapWithState(

392

StateSpec.function((key: String, value: Option[Int], state: State[Int]) => {

393

val currentSum = state.getOption().getOrElse(0)

394

val newSum = currentSum + value.getOrElse(0)

395

396

if (newSum == 0) {

397

state.remove()

398

None

399

} else {

400

state.update(newSum)

401

Some((key, newSum))

402

}

403

}).timeout(Minutes(30))

404

)

405

```

406

407

**Key Differences:**

408

- **Performance**: mapWithState is generally more efficient

409

- **Flexibility**: mapWithState provides more control over output and timeouts

410

- **Memory**: mapWithState allows more precise memory management

411

- **API**: updateStateByKey is simpler but less powerful