or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-types.mdencoders.mderror-handling.mdindex.mdrow-operations.mdstreaming-operations.mdutilities.md

streaming-operations.mddocs/

0

# Streaming Operations

1

2

Stateful operations for complex streaming analytics with timeout support and watermark handling. Enables sophisticated event-time processing and stateful transformations in Spark Structured Streaming.

3

4

## Capabilities

5

6

### Group State Management

7

8

Per-group state management for stateful streaming operations.

9

10

```scala { .api }

11

/**

12

* Per-group state for streaming operations

13

* @tparam S State type

14

*/

15

trait GroupState[S] extends LogicalGroupState[S] {

16

/** Check if state exists for this group */

17

def exists: Boolean

18

19

/** Get state value (throws if not exists) */

20

def get: S

21

22

/** Get state as Option */

23

def getOption: Option[S]

24

25

/** Update state with new value */

26

def update(newState: S): Unit

27

28

/** Remove state for this group */

29

def remove(): Unit

30

31

/** Check if group has timed out */

32

def hasTimedOut: Boolean

33

}

34

```

35

36

### Timeout Management

37

38

Processing time and event time timeout configuration for stateful operations.

39

40

```scala { .api }

41

trait GroupState[S] extends LogicalGroupState[S] {

42

// Processing time timeouts

43

/** Set processing time timeout in milliseconds */

44

def setTimeoutDuration(durationMs: Long): Unit

45

46

/** Set processing time timeout with string duration (e.g., "10 minutes") */

47

def setTimeoutDuration(duration: String): Unit

48

49

// Event time timeouts

50

/** Set event time timeout timestamp in milliseconds */

51

def setTimeoutTimestamp(timestampMs: Long): Unit

52

53

/** Set event time timeout with Date */

54

def setTimeoutTimestamp(timestamp: java.sql.Date): Unit

55

}

56

```

57

58

### Time Information Access

59

60

Access to watermark and processing time information within stateful operations.

61

62

```scala { .api }

63

trait GroupState[S] extends LogicalGroupState[S] {

64

/** Get current watermark in milliseconds since epoch */

65

def getCurrentWatermarkMs(): Long

66

67

/** Get current processing time in milliseconds since epoch */

68

def getCurrentProcessingTimeMs(): Long

69

}

70

```

71

72

### Trigger Types

73

74

Different trigger types for controlling streaming query execution timing.

75

76

```scala { .api }

77

// Process all available data once and stop

78

case object OneTimeTrigger extends Trigger

79

80

// Process all available data in multiple batches

81

case object AvailableNowTrigger extends Trigger

82

83

/**

84

* Micro-batch processing with fixed intervals

85

* @param intervalMs Processing interval in milliseconds

86

*/

87

case class ProcessingTimeTrigger(intervalMs: Long) extends Trigger

88

89

object ProcessingTimeTrigger {

90

def apply(interval: String): ProcessingTimeTrigger

91

def apply(interval: java.time.Duration): ProcessingTimeTrigger

92

def create(interval: Long, unit: java.util.concurrent.TimeUnit): ProcessingTimeTrigger

93

}

94

95

/**

96

* Continuous processing with low latency

97

* @param intervalMs Checkpoint interval in milliseconds

98

*/

99

case class ContinuousTrigger(intervalMs: Long) extends Trigger

100

101

object ContinuousTrigger {

102

def apply(interval: String): ContinuousTrigger

103

def apply(interval: java.time.Duration): ContinuousTrigger

104

def create(interval: Long, unit: java.util.concurrent.TimeUnit): ContinuousTrigger

105

}

106

```

107

108

## Usage Examples

109

110

**Basic stateful operations:**

111

112

```scala

113

import org.apache.spark.sql.streaming.GroupState

114

import org.apache.spark.sql.execution.streaming._

115

116

// Define state type

117

case class UserActivityState(

118

loginCount: Int,

119

lastSeenTime: Long,

120

totalSessionTime: Long

121

)

122

123

// Stateful function for mapGroupsWithState

124

def updateUserActivity(

125

userId: String,

126

events: Iterator[UserEvent],

127

state: GroupState[UserActivityState]

128

): UserActivityOutput = {

129

130

val currentState = if (state.exists) {

131

state.get

132

} else {

133

UserActivityState(0, 0L, 0L)

134

}

135

136

// Process events and update state

137

val newEvents = events.toSeq

138

val newLoginCount = currentState.loginCount + newEvents.count(_.eventType == "login")

139

val latestTime = newEvents.map(_.timestamp).maxOption.getOrElse(currentState.lastSeenTime)

140

141

val updatedState = currentState.copy(

142

loginCount = newLoginCount,

143

lastSeenTime = latestTime,

144

totalSessionTime = currentState.totalSessionTime + calculateSessionTime(newEvents)

145

)

146

147

// Update state

148

state.update(updatedState)

149

150

// Return output

151

UserActivityOutput(userId, updatedState.loginCount, updatedState.lastSeenTime)

152

}

153

```

154

155

**Timeout-based state expiration:**

156

157

```scala

158

def updateUserActivityWithTimeout(

159

userId: String,

160

events: Iterator[UserEvent],

161

state: GroupState[UserActivityState]

162

): Option[UserActivityOutput] = {

163

164

// Handle timeout

165

if (state.hasTimedOut) {

166

val finalState = state.get

167

state.remove() // Clean up expired state

168

return Some(UserActivityOutput(userId, finalState.loginCount, finalState.lastSeenTime, expired = true))

169

}

170

171

val currentState = if (state.exists) {

172

state.get

173

} else {

174

UserActivityState(0, 0L, 0L)

175

}

176

177

// Process new events

178

val newEvents = events.toSeq

179

if (newEvents.nonEmpty) {

180

val updatedState = processEvents(currentState, newEvents)

181

state.update(updatedState)

182

183

// Set timeout for 1 hour of inactivity

184

state.setTimeoutDuration("1 hour")

185

186

Some(UserActivityOutput(userId, updatedState.loginCount, updatedState.lastSeenTime))

187

} else {

188

None // No output for this batch

189

}

190

}

191

```

192

193

**Event time timeout management:**

194

195

```scala

196

def updateSessionsWithEventTime(

197

sessionId: String,

198

events: Iterator[SessionEvent],

199

state: GroupState[SessionState]

200

): Option[SessionOutput] = {

201

202

if (state.hasTimedOut) {

203

val expiredSession = state.get

204

state.remove()

205

return Some(SessionOutput(sessionId, expiredSession, expired = true))

206

}

207

208

val currentState = state.getOption.getOrElse(SessionState.empty)

209

val newEvents = events.toSeq.sortBy(_.eventTime)

210

211

if (newEvents.nonEmpty) {

212

val latestEventTime = newEvents.last.eventTime

213

val updatedState = processSessionEvents(currentState, newEvents)

214

215

// Set event time timeout to 30 minutes after latest event

216

val timeoutTimestamp = latestEventTime + (30 * 60 * 1000) // 30 minutes in ms

217

state.setTimeoutTimestamp(timeoutTimestamp)

218

219

state.update(updatedState)

220

Some(SessionOutput(sessionId, updatedState))

221

} else {

222

None

223

}

224

}

225

```

226

227

**Working with watermarks:**

228

229

```scala

230

def updateWithWatermarkAwareness(

231

key: String,

232

events: Iterator[Event],

233

state: GroupState[EventState]

234

): Option[EventOutput] = {

235

236

val currentWatermark = state.getCurrentWatermarkMs()

237

val processingTime = state.getCurrentProcessingTimeMs()

238

239

println(s"Current watermark: $currentWatermark, Processing time: $processingTime")

240

241

// Filter out late events based on watermark

242

val validEvents = events.filter(_.timestamp >= currentWatermark).toSeq

243

244

if (validEvents.nonEmpty) {

245

val currentState = state.getOption.getOrElse(EventState.empty)

246

val updatedState = processValidEvents(currentState, validEvents)

247

248

state.update(updatedState)

249

Some(EventOutput(key, updatedState.count, updatedState.lastTimestamp))

250

} else {

251

None // All events were too late

252

}

253

}

254

```

255

256

**Trigger configuration examples:**

257

258

```scala

259

import org.apache.spark.sql.execution.streaming._

260

import java.util.concurrent.TimeUnit

261

262

// One-time processing (batch-like)

263

val oneTimeTrigger = OneTimeTrigger

264

265

// Process all available data in batches

266

val availableNowTrigger = AvailableNowTrigger

267

268

// Micro-batch with fixed intervals

269

val processingTrigger1 = ProcessingTimeTrigger("30 seconds")

270

val processingTrigger2 = ProcessingTimeTrigger(java.time.Duration.ofMinutes(5))

271

val processingTrigger3 = ProcessingTimeTrigger.create(10, TimeUnit.SECONDS)

272

273

// Continuous processing (low latency)

274

val continuousTrigger1 = ContinuousTrigger("1 second")

275

val continuousTrigger2 = ContinuousTrigger(java.time.Duration.ofMillis(500))

276

val continuousTrigger3 = ContinuousTrigger.create(100, TimeUnit.MILLISECONDS)

277

```

278

279

**State lifecycle management:**

280

281

```scala

282

def manageStateLifecycle(

283

groupKey: String,

284

values: Iterator[DataPoint],

285

state: GroupState[AggregationState]

286

): AggregationResult = {

287

288

// Initialize state if first time seeing this group

289

val currentState = if (state.exists) {

290

state.get

291

} else {

292

AggregationState.initialize()

293

}

294

295

val dataPoints = values.toSeq

296

297

if (dataPoints.isEmpty && currentState.isEmpty) {

298

// No data and no existing state - remove if exists

299

if (state.exists) state.remove()

300

AggregationResult.empty(groupKey)

301

} else if (dataPoints.nonEmpty) {

302

// Update state with new data

303

val updatedState = currentState.aggregate(dataPoints)

304

305

if (updatedState.shouldKeep) {

306

state.update(updatedState)

307

// Set reasonable timeout

308

state.setTimeoutDuration("2 hours")

309

} else {

310

// State no longer needed

311

state.remove()

312

}

313

314

AggregationResult(groupKey, updatedState.result)

315

} else {

316

// No new data, return current result

317

AggregationResult(groupKey, currentState.result)

318

}

319

}

320

```

321

322

**Error handling in stateful operations:**

323

324

```scala

325

def robustStatefulUpdate(

326

key: String,

327

events: Iterator[Event],

328

state: GroupState[MyState]

329

): Option[Output] = {

330

331

try {

332

// Handle timeout first

333

if (state.hasTimedOut) {

334

handleTimeout(key, state)

335

} else {

336

processEvents(key, events, state)

337

}

338

} catch {

339

case ex: Exception =>

340

// Log error but don't fail the entire stream

341

logger.error(s"Error processing group $key", ex)

342

343

// Optionally reset state on error

344

if (state.exists) state.remove()

345

346

None // Skip output for this batch

347

}

348

}

349

350

def handleTimeout(key: String, state: GroupState[MyState]): Option[Output] = {

351

val finalState = state.getOption

352

state.remove() // Always clean up on timeout

353

354

finalState.map(s => Output(key, s.finalResult, timedOut = true))

355

}

356

```