or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-streaming.mdindex.mdinput-sources.mdjava-api.mdmonitoring-listeners.mdstate-management.mdweb-ui.md

state-management.mddocs/

0

# State Management

1

2

Stateful processing capabilities for maintaining state across batches, including updateStateByKey and mapWithState operations for building stateful streaming applications.

3

4

## Capabilities

5

6

### State Class

7

8

Abstract class for managing state in mapWithState operations, providing methods to check, get, update, and remove state.

9

10

```scala { .api }

11

/**

12

* Abstract class for managing state in mapWithState operations

13

* @tparam S - Type of the state

14

*/

15

abstract class State[S] {

16

17

/** Check if state exists for the key */

18

def exists(): Boolean

19

20

/** Get the state value (throws exception if not exists) */

21

def get(): S

22

23

/** Get state as Option */

24

def getOption(): Option[S]

25

26

/** Update the state with new value */

27

def update(newState: S): Unit

28

29

/** Remove the state for this key */

30

def remove(): Unit

31

32

/** Check if this state is timing out in current batch */

33

def isTimingOut(): Boolean

34

}

35

```

36

37

### StateSpec

38

39

Specification class for configuring mapWithState operations, including mapping functions, initial state, partitioning, and timeout settings.

40

41

```scala { .api }

42

/**

43

* Specification for mapWithState operation configuration

44

* @tparam KeyType - Type of keys in the DStream

45

* @tparam ValueType - Type of values in the DStream

46

* @tparam StateType - Type of the state being maintained

47

* @tparam MappedType - Type of mapped output

48

*/

49

abstract class StateSpec[KeyType, ValueType, StateType, MappedType] {

50

51

/** Set initial state RDD */

52

def initialState(rdd: RDD[(KeyType, StateType)]): StateSpec[KeyType, ValueType, StateType, MappedType]

53

54

/** Set initial state from JavaPairRDD */

55

def initialState(rdd: JavaPairRDD[KeyType, StateType]): StateSpec[KeyType, ValueType, StateType, MappedType]

56

57

/** Set number of partitions for state */

58

def numPartitions(numPartitions: Int): StateSpec[KeyType, ValueType, StateType, MappedType]

59

60

/** Set custom partitioner for state */

61

def partitioner(partitioner: Partitioner): StateSpec[KeyType, ValueType, StateType, MappedType]

62

63

/** Set timeout duration for inactive keys */

64

def timeout(timeout: Duration): StateSpec[KeyType, ValueType, StateType, MappedType]

65

}

66

67

/**

68

* Factory object for creating StateSpec instances

69

*/

70

object StateSpec {

71

72

/** Create StateSpec with Scala function */

73

def function[KeyType, ValueType, StateType, MappedType](

74

mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType

75

): StateSpec[KeyType, ValueType, StateType, MappedType]

76

77

/** Create StateSpec with Java Function3 */

78

def function[KeyType, ValueType, StateType, MappedType](

79

mappingFunction: Function3[KeyType, Optional[ValueType], State[StateType], MappedType]

80

): StateSpec[KeyType, ValueType, StateType, MappedType]

81

82

/** Create StateSpec with Java Function4 (includes timeout) */

83

def function[KeyType, ValueType, StateType, MappedType](

84

mappingFunction: Function4[Time, KeyType, Optional[ValueType], State[StateType], Optional[MappedType]]

85

): StateSpec[KeyType, ValueType, StateType, MappedType]

86

}

87

```

88

89

**Usage Examples:**

90

91

```scala

92

// Define state mapping function

93

def updateFunction(key: String, value: Option[Int], state: State[Int]): Int = {

94

val currentState = state.getOption().getOrElse(0)

95

val newState = currentState + value.getOrElse(0)

96

state.update(newState)

97

newState

98

}

99

100

// Create StateSpec

101

val stateSpec = StateSpec.function(updateFunction _)

102

.initialState(initialStateRDD)

103

.numPartitions(10)

104

.timeout(Minutes(10))

105

106

// Apply to DStream

107

val stateDStream = pairDStream.mapWithState(stateSpec)

108

```

109

110

### MapWithState Operations

111

112

Advanced stateful operations using StateSpec for efficient state management with configurable timeouts and partitioning.

113

114

```scala { .api }

115

/**

116

* Available on DStream[(K, V)] through implicit conversion

117

*/

118

class PairDStreamFunctions[K, V](self: DStream[(K, V)]) {

119

120

/**

121

* Map with state operation using StateSpec

122

* @param spec - StateSpec configuration

123

* @return MapWithStateDStream for further operations

124

*/

125

def mapWithState[StateType, MappedType](

126

spec: StateSpec[K, V, StateType, MappedType]

127

): MapWithStateDStream[K, V, StateType, MappedType]

128

}

129

130

/**

131

* DStream returned by mapWithState operations

132

*/

133

abstract class MapWithStateDStream[KeyType, ValueType, StateType, MappedType]

134

extends DStream[MappedType] {

135

136

/** Get current state snapshots as DStream */

137

def stateSnapshots(): DStream[(KeyType, StateType)]

138

}

139

```

140

141

**Usage Examples:**

142

143

```scala

144

// Running word count with state

145

val lines = ssc.socketTextStream("localhost", 9999)

146

val words = lines.flatMap(_.split(" ")).map((_, 1))

147

148

// Define state update function

149

def updateWordCount(word: String, count: Option[Int], state: State[Int]): Int = {

150

val currentCount = state.getOption().getOrElse(0)

151

val newCount = currentCount + count.getOrElse(0)

152

state.update(newCount)

153

newCount

154

}

155

156

// Configure and apply mapWithState

157

val wordCountsState = words.mapWithState(

158

StateSpec.function(updateWordCount _)

159

.timeout(Minutes(5))

160

)

161

162

// Get state snapshots

163

val currentCounts = wordCountsState.stateSnapshots()

164

currentCounts.print()

165

```

166

167

### UpdateStateByKey Operations

168

169

Legacy stateful operations for maintaining state across batches using update functions.

170

171

```scala { .api }

172

/**

173

* Available on DStream[(K, V)] through implicit conversion

174

*/

175

class PairDStreamFunctions[K, V](self: DStream[(K, V)]) {

176

177

/**

178

* Update state by key using update function

179

* @param updateFunc - Function to update state given current values and previous state

180

* @return DStream of (key, state) pairs

181

*/

182

def updateStateByKey[S](

183

updateFunc: (Seq[V], Option[S]) => Option[S]

184

): DStream[(K, S)]

185

186

/**

187

* Update state by key with custom partitioner

188

*/

189

def updateStateByKey[S](

190

updateFunc: (Seq[V], Option[S]) => Option[S],

191

partitioner: Partitioner

192

): DStream[(K, S)]

193

194

/**

195

* Update state by key with specified number of partitions

196

*/

197

def updateStateByKey[S](

198

updateFunc: (Seq[V], Option[S]) => Option[S],

199

numPartitions: Int

200

): DStream[(K, S)]

201

202

/**

203

* Update state by key with initial state RDD

204

*/

205

def updateStateByKey[S](

206

updateFunc: (Seq[V], Option[S]) => Option[S],

207

partitioner: Partitioner,

208

initialRDD: RDD[(K, S)]

209

): DStream[(K, S)]

210

}

211

```

212

213

**Usage Examples:**

214

215

```scala

216

// Simple running count

217

val lines = ssc.socketTextStream("localhost", 9999)

218

val words = lines.flatMap(_.split(" ")).map((_, 1))

219

220

// Update function for word counting

221

def updateWordCount(values: Seq[Int], state: Option[Int]): Option[Int] = {

222

val currentCount = state.getOrElse(0)

223

val newCount = currentCount + values.sum

224

Some(newCount)

225

}

226

227

// Apply updateStateByKey

228

val wordCounts = words.updateStateByKey(updateWordCount)

229

wordCounts.print()

230

231

// With custom partitioner

232

val wordCountsPartitioned = words.updateStateByKey(

233

updateWordCount,

234

new HashPartitioner(4)

235

)

236

237

// With initial state

238

val initialCounts: RDD[(String, Int)] = ssc.sparkContext.parallelize(

239

List(("hello", 10), ("world", 5))

240

)

241

242

val wordCountsWithInitial = words.updateStateByKey(

243

updateWordCount,

244

new HashPartitioner(4),

245

initialCounts

246

)

247

```

248

249

### Checkpointing for State

250

251

Checkpointing configuration required for stateful operations to enable fault tolerance.

252

253

```scala { .api }

254

/**

255

* StreamingContext methods for checkpoint configuration

256

*/

257

class StreamingContext {

258

259

/** Set checkpoint directory for fault tolerance */

260

def checkpoint(directory: String): Unit

261

262

/** Get checkpoint directory if set */

263

def checkpointDir: Option[String]

264

}

265

266

/**

267

* Factory method for creating StreamingContext from checkpoint

268

*/

269

object StreamingContext {

270

271

/** Create StreamingContext from checkpoint data */

272

def getOrCreate(

273

checkpointDirectory: String,

274

creatingFunc: () => StreamingContext

275

): StreamingContext

276

}

277

```

278

279

**Usage Examples:**

280

281

```scala

282

// Enable checkpointing for stateful operations

283

ssc.checkpoint("hdfs://namenode:9000/checkpoints")

284

285

// Or create fault-tolerant streaming context

286

def createStreamingContext(): StreamingContext = {

287

val ssc = new StreamingContext(conf, Seconds(2))

288

ssc.checkpoint("hdfs://namenode:9000/checkpoints")

289

290

// Define streaming computation with stateful operations

291

val lines = ssc.socketTextStream("localhost", 9999)

292

val words = lines.flatMap(_.split(" ")).map((_, 1))

293

val wordCounts = words.updateStateByKey(updateWordCount)

294

wordCounts.print()

295

296

ssc

297

}

298

299

// Get or create from checkpoint

300

val ssc = StreamingContext.getOrCreate(

301

"hdfs://namenode:9000/checkpoints",

302

createStreamingContext _

303

)

304

```

305

306

## State Management Best Practices

307

308

### When to Use MapWithState vs UpdateStateByKey

309

310

**Use mapWithState when:**

311

- Need efficient state management with timeouts

312

- Working with large state that needs partitioning control

313

- Require better performance for state operations

314

- Need access to timing information

315

316

**Use updateStateByKey when:**

317

- Simple state updates without timeouts

318

- Legacy code compatibility

319

- Straightforward aggregation scenarios

320

321

### Memory Management

322

323

```scala

324

// Configure state timeout to prevent memory leaks

325

val stateSpec = StateSpec.function(updateFunc _)

326

.timeout(Minutes(30)) // Remove inactive state after 30 minutes

327

328

// Control partitioning for memory distribution

329

val stateSpec = StateSpec.function(updateFunc _)

330

.numPartitions(100) // Distribute state across 100 partitions

331

```

332

333

### Performance Optimization

334

335

```scala

336

// Use custom partitioner for better locality

337

import org.apache.spark.HashPartitioner

338

339

val stateSpec = StateSpec.function(updateFunc _)

340

.partitioner(new HashPartitioner(50))

341

342

// Provide initial state to avoid cold start

343

val initialState: RDD[(String, Int)] = loadInitialState()

344

val stateSpec = StateSpec.function(updateFunc _)

345

.initialState(initialState)

346

```