or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-operations.mdindex.mdinput-sources.mdjava-api.mdoutput-operations.mdstateful-operations.mdtransformations.md

stateful-operations.mddocs/

0

# Stateful Operations

1

2

Advanced operations for maintaining state across streaming batches, including updateStateByKey and mapWithState for building stateful streaming applications.

3

4

## UpdateStateByKey Operations

5

6

### Basic UpdateStateByKey

7

8

Maintain state across batches using update function:

9

```scala { .api }

10

def updateStateByKey[S: ClassTag](

11

updateFunc: (Seq[V], Option[S]) => Option[S]

12

): DStream[(K, S)] // On DStream[(K, V)]

13

```

14

15

With custom partitioning:

16

```scala { .api }

17

def updateStateByKey[S: ClassTag](

18

updateFunc: (Seq[V], Option[S]) => Option[S],

19

numPartitions: Int

20

): DStream[(K, S)]

21

22

def updateStateByKey[S: ClassTag](

23

updateFunc: (Seq[V], Option[S]) => Option[S],

24

partitioner: Partitioner,

25

initialRDD: RDD[(K, S)]

26

): DStream[(K, S)]

27

```

28

29

Example word count with state:

30

```scala

31

val lines = ssc.socketTextStream("localhost", 9999)

32

val words = lines.flatMap(_.split("\\s+")).map((_, 1))

33

34

// Running word count across all batches

35

val runningCounts = words.updateStateByKey[Int] { (values, state) =>

36

val currentCount = values.sum

37

val newCount = state.getOrElse(0) + currentCount

38

Some(newCount)

39

}

40

41

runningCounts.print()

42

```

43

44

### Advanced UpdateStateByKey Examples

45

46

User session tracking:

47

```scala

48

case class SessionInfo(loginTime: Long, lastActivity: Long, pageViews: Int)

49

50

val userEvents = ssc.socketTextStream("localhost", 9999)

51

.map(parseUserEvent) // Returns (userId, event)

52

53

val userSessions = userEvents.updateStateByKey[SessionInfo] { (events, sessionOpt) =>

54

val currentTime = System.currentTimeMillis()

55

val session = sessionOpt.getOrElse(SessionInfo(currentTime, currentTime, 0))

56

57

val updatedSession = events.foldLeft(session) { (sess, event) =>

58

event match {

59

case "login" => sess.copy(loginTime = currentTime, lastActivity = currentTime)

60

case "pageview" => sess.copy(lastActivity = currentTime, pageViews = sess.pageViews + 1)

61

case "logout" => return None // Remove session

62

case _ => sess.copy(lastActivity = currentTime)

63

}

64

}

65

66

// Expire sessions after 30 minutes of inactivity

67

if (currentTime - updatedSession.lastActivity > 30 * 60 * 1000) {

68

None

69

} else {

70

Some(updatedSession)

71

}

72

}

73

```

74

75

Real-time analytics with state:

76

```scala

77

case class Analytics(count: Long, sum: Double, min: Double, max: Double) {

78

def avg: Double = if (count > 0) sum / count else 0.0

79

}

80

81

val metrics = ssc.socketTextStream("localhost", 9999)

82

.map(line => (line.split(",")(0), line.split(",")(1).toDouble)) // (metric_name, value)

83

84

val runningAnalytics = metrics.updateStateByKey[Analytics] { (values, stateOpt) =>

85

val current = stateOpt.getOrElse(Analytics(0, 0.0, Double.MaxValue, Double.MinValue))

86

87

val updated = values.foldLeft(current) { (analytics, value) =>

88

Analytics(

89

count = analytics.count + 1,

90

sum = analytics.sum + value,

91

min = math.min(analytics.min, value),

92

max = math.max(analytics.max, value)

93

)

94

}

95

96

Some(updated)

97

}

98

```

99

100

## MapWithState Operations

101

102

### StateSpec Configuration

103

104

Create StateSpec with mapping function:

105

```scala { .api }

106

object StateSpec {

107

def function[K, V, S, T](

108

mappingFunction: (Time, K, Option[V], State[S]) => Option[T]

109

): StateSpec[K, V, S, T]

110

111

def function[K, V, S, T](

112

mappingFunction: (K, Option[V], State[S]) => T

113

): StateSpec[K, V, S, T]

114

}

115

```

116

117

StateSpec configuration methods:

118

```scala { .api }

119

abstract class StateSpec[K, V, S, T] {

120

def initialState(rdd: RDD[(K, S)]): this.type

121

def initialState(javaPairRDD: JavaPairRDD[K, S]): this.type

122

def numPartitions(numPartitions: Int): this.type

123

def partitioner(partitioner: Partitioner): this.type

124

def timeout(idleDuration: Duration): this.type

125

}

126

```

127

128

### MapWithState Operation

129

130

Apply stateful mapping:

131

```scala { .api }

132

def mapWithState[S: ClassTag, T: ClassTag](

133

spec: StateSpec[K, V, S, T]

134

): MapWithStateDStream[K, V, S, T] // On DStream[(K, V)]

135

```

136

137

### State Management Interface

138

139

State management in mapping functions:

140

```scala { .api }

141

abstract class State[S] {

142

def exists(): Boolean

143

def get(): S

144

def update(newState: S): Unit

145

def remove(): Unit

146

def isTimingOut(): Boolean

147

def getOption(): Option[S]

148

}

149

```

150

151

### Basic MapWithState Example

152

153

Simple counter with mapWithState:

154

```scala

155

val words = ssc.socketTextStream("localhost", 9999)

156

.flatMap(_.split("\\s+"))

157

.map((_, 1))

158

159

val mappingFunction = (word: String, one: Option[Int], state: State[Int]) => {

160

val sum = one.getOrElse(0) + state.getOption().getOrElse(0)

161

val output = (word, sum)

162

state.update(sum)

163

output

164

}

165

166

val stateDStream = words.mapWithState(StateSpec.function(mappingFunction))

167

stateDStream.print()

168

```

169

170

### Advanced MapWithState Examples

171

172

User behavior analysis:

173

```scala

174

case class UserBehavior(

175

totalSessions: Int,

176

totalPageViews: Int,

177

lastActivity: Long,

178

avgSessionDuration: Double

179

)

180

181

val userActions = ssc.socketTextStream("localhost", 9999)

182

.map(parseUserAction) // Returns (userId, action, timestamp)

183

184

val behaviorSpec = StateSpec.function(

185

(userId: String, action: Option[(String, Long)], state: State[UserBehavior]) => {

186

val currentTime = System.currentTimeMillis()

187

val behavior = state.getOption().getOrElse(

188

UserBehavior(0, 0, currentTime, 0.0)

189

)

190

191

action match {

192

case Some(("session_start", timestamp)) =>

193

val newBehavior = behavior.copy(

194

totalSessions = behavior.totalSessions + 1,

195

lastActivity = timestamp

196

)

197

state.update(newBehavior)

198

Some((userId, "session_started", newBehavior))

199

200

case Some(("page_view", timestamp)) =>

201

val newBehavior = behavior.copy(

202

totalPageViews = behavior.totalPageViews + 1,

203

lastActivity = timestamp

204

)

205

state.update(newBehavior)

206

Some((userId, "page_viewed", newBehavior))

207

208

case None if state.isTimingOut() =>

209

// State is timing out, emit final statistics

210

Some((userId, "user_summary", behavior))

211

212

case _ =>

213

None

214

}

215

}

216

).timeout(Minutes(30)) // Timeout inactive users after 30 minutes

217

218

val userBehaviorStream = userActions.mapWithState(behaviorSpec)

219

```

220

221

Real-time anomaly detection:

222

```scala

223

case class MetricState(

224

values: Queue[Double],

225

sum: Double,

226

count: Int,

227

windowSize: Int = 100

228

) {

229

def mean: Double = if (count > 0) sum / count else 0.0

230

def stdDev: Double = {

231

if (count < 2) return 0.0

232

val meanVal = mean

233

val variance = values.map(v => math.pow(v - meanVal, 2)).sum / count

234

math.sqrt(variance)

235

}

236

}

237

238

val metrics = ssc.socketTextStream("localhost", 9999)

239

.map(line => (line.split(",")(0), line.split(",")(1).toDouble))

240

241

val anomalySpec = StateSpec.function(

242

(metric: String, value: Option[Double], state: State[MetricState]) => {

243

value match {

244

case Some(v) =>

245

val currentState = state.getOption().getOrElse(

246

MetricState(Queue.empty, 0.0, 0)

247

)

248

249

val newValues = if (currentState.values.size >= currentState.windowSize) {

250

val (removed, remaining) = currentState.values.dequeue

251

remaining.enqueue(v)

252

} else {

253

currentState.values.enqueue(v)

254

}

255

256

val newState = MetricState(

257

values = newValues,

258

sum = currentState.sum - (if (currentState.values.size >= currentState.windowSize)

259

currentState.values.head else 0.0) + v,

260

count = math.min(currentState.count + 1, currentState.windowSize)

261

)

262

263

state.update(newState)

264

265

// Detect anomaly (value is more than 3 standard deviations from mean)

266

if (newState.count > 10) {

267

val zScore = math.abs(v - newState.mean) / newState.stdDev

268

if (zScore > 3.0) {

269

Some((metric, s"ANOMALY: $v (z-score: $zScore)"))

270

} else {

271

Some((metric, s"NORMAL: $v"))

272

}

273

} else {

274

Some((metric, s"LEARNING: $v"))

275

}

276

277

case None => None

278

}

279

}

280

)

281

282

val anomalies = metrics.mapWithState(anomalySpec)

283

anomalies.print()

284

```

285

286

### MapWithStateDStream Operations

287

288

Access state snapshots:

289

```scala { .api }

290

class MapWithStateDStream[K, V, S, T] extends DStream[T] {

291

def stateSnapshots(): DStream[(K, S)]

292

}

293

```

294

295

Example state snapshots:

296

```scala

297

val stateDStream = words.mapWithState(StateSpec.function(mappingFunction))

298

299

// Get periodic snapshots of all state

300

val snapshots = stateDStream.stateSnapshots()

301

snapshots.foreachRDD { rdd =>

302

println(s"Current state count: ${rdd.count()}")

303

rdd.take(10).foreach { case (key, state) =>

304

println(s"$key -> $state")

305

}

306

}

307

```

308

309

## State Management Best Practices

310

311

### Initial State Setup

312

313

Provide initial state from external source:

314

```scala

315

// Load initial state from database or file

316

val initialState = ssc.sparkContext.parallelize(loadInitialStateFromDB())

317

318

val stateSpec = StateSpec.function(mappingFunction)

319

.initialState(initialState)

320

.numPartitions(10)

321

.timeout(Minutes(60))

322

```

323

324

### Memory Management

325

326

Control state memory usage:

327

```scala

328

val memoryEfficientSpec = StateSpec.function(

329

(key: String, value: Option[String], state: State[Map[String, Int]]) => {

330

val currentMap = state.getOption().getOrElse(Map.empty)

331

332

value match {

333

case Some(v) =>

334

val updated = currentMap + (v -> (currentMap.getOrElse(v, 0) + 1))

335

336

// Limit map size to prevent memory issues

337

val trimmed = if (updated.size > 1000) {

338

updated.toSeq.sortBy(_._2).takeRight(800).toMap

339

} else {

340

updated

341

}

342

343

state.update(trimmed)

344

Some((key, trimmed.size))

345

346

case None if state.isTimingOut() =>

347

// Clean up before timeout

348

Some((key, -1)) // Indicate removal

349

350

case _ => None

351

}

352

}

353

).timeout(Hours(1))

354

```

355

356

### Checkpointing Requirements

357

358

Enable checkpointing for stateful operations:

359

```scala

360

val ssc = new StreamingContext(conf, Seconds(5))

361

ssc.checkpoint("hdfs://namenode:9000/checkpoint")

362

363

// Stateful operations require checkpointing

364

val statefulStream = inputStream.updateStateByKey(updateFunction)

365

```

366

367

### Performance Considerations

368

369

Optimize stateful operations:

370

```scala

371

// Use appropriate partitioning

372

val optimizedState = keyValueStream.updateStateByKey(

373

updateFunction,

374

new HashPartitioner(20) // Match your cluster size

375

)

376

377

// Consider using mapWithState for better performance

378

val efficientState = keyValueStream.mapWithState(

379

StateSpec.function(mappingFunction)

380

.numPartitions(20)

381

.timeout(Minutes(30))

382

)

383

```

384

385

### Error Handling in Stateful Operations

386

387

Handle errors in state updates:

388

```scala

389

val robustStateSpec = StateSpec.function(

390

(key: String, value: Option[String], state: State[Int]) => {

391

try {

392

val current = state.getOption().getOrElse(0)

393

val newValue = current + value.map(_.toInt).getOrElse(0)

394

state.update(newValue)

395

Some((key, newValue))

396

} catch {

397

case e: NumberFormatException =>

398

// Log error and maintain previous state

399

logError(s"Invalid number for key $key: ${value.getOrElse("None")}")

400

Some((key, state.getOption().getOrElse(0)))

401

case e: Exception =>

402

// Handle other errors

403

logError(s"Error updating state for $key", e)

404

None

405

}

406

}

407

)

408

```