0
# Streaming Operations
1
2
Stateful operations for complex streaming analytics with timeout support and watermark handling. Enables sophisticated event-time processing and stateful transformations in Spark Structured Streaming.
3
4
## Capabilities
5
6
### Group State Management
7
8
Per-group state management for stateful streaming operations.
9
10
```scala { .api }
11
/**
12
* Per-group state for streaming operations
13
* @tparam S State type
14
*/
15
trait GroupState[S] extends LogicalGroupState[S] {
16
/** Check if state exists for this group */
17
def exists: Boolean
18
19
/** Get state value (throws if not exists) */
20
def get: S
21
22
/** Get state as Option */
23
def getOption: Option[S]
24
25
/** Update state with new value */
26
def update(newState: S): Unit
27
28
/** Remove state for this group */
29
def remove(): Unit
30
31
/** Check if group has timed out */
32
def hasTimedOut: Boolean
33
}
34
```
35
36
### Timeout Management
37
38
Processing time and event time timeout configuration for stateful operations.
39
40
```scala { .api }
41
trait GroupState[S] extends LogicalGroupState[S] {
42
// Processing time timeouts
43
/** Set processing time timeout in milliseconds */
44
def setTimeoutDuration(durationMs: Long): Unit
45
46
/** Set processing time timeout with string duration (e.g., "10 minutes") */
47
def setTimeoutDuration(duration: String): Unit
48
49
// Event time timeouts
50
/** Set event time timeout timestamp in milliseconds */
51
def setTimeoutTimestamp(timestampMs: Long): Unit
52
53
/** Set event time timeout with Date */
54
def setTimeoutTimestamp(timestamp: java.sql.Date): Unit
55
}
56
```
57
58
### Time Information Access
59
60
Access to watermark and processing time information within stateful operations.
61
62
```scala { .api }
63
trait GroupState[S] extends LogicalGroupState[S] {
64
/** Get current watermark in milliseconds since epoch */
65
def getCurrentWatermarkMs(): Long
66
67
/** Get current processing time in milliseconds since epoch */
68
def getCurrentProcessingTimeMs(): Long
69
}
70
```
71
72
### Trigger Types
73
74
Different trigger types for controlling streaming query execution timing.
75
76
```scala { .api }
77
// Process all available data once and stop
78
case object OneTimeTrigger extends Trigger
79
80
// Process all available data in multiple batches
81
case object AvailableNowTrigger extends Trigger
82
83
/**
84
* Micro-batch processing with fixed intervals
85
* @param intervalMs Processing interval in milliseconds
86
*/
87
case class ProcessingTimeTrigger(intervalMs: Long) extends Trigger
88
89
object ProcessingTimeTrigger {
90
def apply(interval: String): ProcessingTimeTrigger
91
def apply(interval: java.time.Duration): ProcessingTimeTrigger
92
def create(interval: Long, unit: java.util.concurrent.TimeUnit): ProcessingTimeTrigger
93
}
94
95
/**
96
* Continuous processing with low latency
97
* @param intervalMs Checkpoint interval in milliseconds
98
*/
99
case class ContinuousTrigger(intervalMs: Long) extends Trigger
100
101
object ContinuousTrigger {
102
def apply(interval: String): ContinuousTrigger
103
def apply(interval: java.time.Duration): ContinuousTrigger
104
def create(interval: Long, unit: java.util.concurrent.TimeUnit): ContinuousTrigger
105
}
106
```
107
108
## Usage Examples
109
110
**Basic stateful operations:**
111
112
```scala
113
import org.apache.spark.sql.streaming.GroupState
114
import org.apache.spark.sql.execution.streaming._
115
116
// Define state type
117
case class UserActivityState(
118
loginCount: Int,
119
lastSeenTime: Long,
120
totalSessionTime: Long
121
)
122
123
// Stateful function for mapGroupsWithState
124
def updateUserActivity(
125
userId: String,
126
events: Iterator[UserEvent],
127
state: GroupState[UserActivityState]
128
): UserActivityOutput = {
129
130
val currentState = if (state.exists) {
131
state.get
132
} else {
133
UserActivityState(0, 0L, 0L)
134
}
135
136
// Process events and update state
137
val newEvents = events.toSeq
138
val newLoginCount = currentState.loginCount + newEvents.count(_.eventType == "login")
139
val latestTime = newEvents.map(_.timestamp).maxOption.getOrElse(currentState.lastSeenTime)
140
141
val updatedState = currentState.copy(
142
loginCount = newLoginCount,
143
lastSeenTime = latestTime,
144
totalSessionTime = currentState.totalSessionTime + calculateSessionTime(newEvents)
145
)
146
147
// Update state
148
state.update(updatedState)
149
150
// Return output
151
UserActivityOutput(userId, updatedState.loginCount, updatedState.lastSeenTime)
152
}
153
```
154
155
**Timeout-based state expiration:**
156
157
```scala
158
def updateUserActivityWithTimeout(
159
userId: String,
160
events: Iterator[UserEvent],
161
state: GroupState[UserActivityState]
162
): Option[UserActivityOutput] = {
163
164
// Handle timeout
165
if (state.hasTimedOut) {
166
val finalState = state.get
167
state.remove() // Clean up expired state
168
return Some(UserActivityOutput(userId, finalState.loginCount, finalState.lastSeenTime, expired = true))
169
}
170
171
val currentState = if (state.exists) {
172
state.get
173
} else {
174
UserActivityState(0, 0L, 0L)
175
}
176
177
// Process new events
178
val newEvents = events.toSeq
179
if (newEvents.nonEmpty) {
180
val updatedState = processEvents(currentState, newEvents)
181
state.update(updatedState)
182
183
// Set timeout for 1 hour of inactivity
184
state.setTimeoutDuration("1 hour")
185
186
Some(UserActivityOutput(userId, updatedState.loginCount, updatedState.lastSeenTime))
187
} else {
188
None // No output for this batch
189
}
190
}
191
```
192
193
**Event time timeout management:**
194
195
```scala
196
def updateSessionsWithEventTime(
197
sessionId: String,
198
events: Iterator[SessionEvent],
199
state: GroupState[SessionState]
200
): Option[SessionOutput] = {
201
202
if (state.hasTimedOut) {
203
val expiredSession = state.get
204
state.remove()
205
return Some(SessionOutput(sessionId, expiredSession, expired = true))
206
}
207
208
val currentState = state.getOption.getOrElse(SessionState.empty)
209
val newEvents = events.toSeq.sortBy(_.eventTime)
210
211
if (newEvents.nonEmpty) {
212
val latestEventTime = newEvents.last.eventTime
213
val updatedState = processSessionEvents(currentState, newEvents)
214
215
// Set event time timeout to 30 minutes after latest event
216
val timeoutTimestamp = latestEventTime + (30 * 60 * 1000) // 30 minutes in ms
217
state.setTimeoutTimestamp(timeoutTimestamp)
218
219
state.update(updatedState)
220
Some(SessionOutput(sessionId, updatedState))
221
} else {
222
None
223
}
224
}
225
```
226
227
**Working with watermarks:**
228
229
```scala
230
def updateWithWatermarkAwareness(
231
key: String,
232
events: Iterator[Event],
233
state: GroupState[EventState]
234
): Option[EventOutput] = {
235
236
val currentWatermark = state.getCurrentWatermarkMs()
237
val processingTime = state.getCurrentProcessingTimeMs()
238
239
println(s"Current watermark: $currentWatermark, Processing time: $processingTime")
240
241
// Filter out late events based on watermark
242
val validEvents = events.filter(_.timestamp >= currentWatermark).toSeq
243
244
if (validEvents.nonEmpty) {
245
val currentState = state.getOption.getOrElse(EventState.empty)
246
val updatedState = processValidEvents(currentState, validEvents)
247
248
state.update(updatedState)
249
Some(EventOutput(key, updatedState.count, updatedState.lastTimestamp))
250
} else {
251
None // All events were too late
252
}
253
}
254
```
255
256
**Trigger configuration examples:**
257
258
```scala
259
import org.apache.spark.sql.execution.streaming._
260
import java.util.concurrent.TimeUnit
261
262
// One-time processing (batch-like)
263
val oneTimeTrigger = OneTimeTrigger
264
265
// Process all available data in batches
266
val availableNowTrigger = AvailableNowTrigger
267
268
// Micro-batch with fixed intervals
269
val processingTrigger1 = ProcessingTimeTrigger("30 seconds")
270
val processingTrigger2 = ProcessingTimeTrigger(java.time.Duration.ofMinutes(5))
271
val processingTrigger3 = ProcessingTimeTrigger.create(10, TimeUnit.SECONDS)
272
273
// Continuous processing (low latency)
274
val continuousTrigger1 = ContinuousTrigger("1 second")
275
val continuousTrigger2 = ContinuousTrigger(java.time.Duration.ofMillis(500))
276
val continuousTrigger3 = ContinuousTrigger.create(100, TimeUnit.MILLISECONDS)
277
```
278
279
**State lifecycle management:**
280
281
```scala
282
def manageStateLifecycle(
283
groupKey: String,
284
values: Iterator[DataPoint],
285
state: GroupState[AggregationState]
286
): AggregationResult = {
287
288
// Initialize state if first time seeing this group
289
val currentState = if (state.exists) {
290
state.get
291
} else {
292
AggregationState.initialize()
293
}
294
295
val dataPoints = values.toSeq
296
297
if (dataPoints.isEmpty && currentState.isEmpty) {
298
// No data and no existing state - remove if exists
299
if (state.exists) state.remove()
300
AggregationResult.empty(groupKey)
301
} else if (dataPoints.nonEmpty) {
302
// Update state with new data
303
val updatedState = currentState.aggregate(dataPoints)
304
305
if (updatedState.shouldKeep) {
306
state.update(updatedState)
307
// Set reasonable timeout
308
state.setTimeoutDuration("2 hours")
309
} else {
310
// State no longer needed
311
state.remove()
312
}
313
314
AggregationResult(groupKey, updatedState.result)
315
} else {
316
// No new data, return current result
317
AggregationResult(groupKey, currentState.result)
318
}
319
}
320
```
321
322
**Error handling in stateful operations:**
323
324
```scala
325
def robustStatefulUpdate(
326
key: String,
327
events: Iterator[Event],
328
state: GroupState[MyState]
329
): Option[Output] = {
330
331
try {
332
// Handle timeout first
333
if (state.hasTimedOut) {
334
handleTimeout(key, state)
335
} else {
336
processEvents(key, events, state)
337
}
338
} catch {
339
case ex: Exception =>
340
// Log error but don't fail the entire stream
341
logger.error(s"Error processing group $key", ex)
342
343
// Optionally reset state on error
344
if (state.exists) state.remove()
345
346
None // Skip output for this batch
347
}
348
}
349
350
def handleTimeout(key: String, state: GroupState[MyState]): Option[Output] = {
351
val finalState = state.getOption
352
state.remove() // Always clean up on timeout
353
354
finalState.map(s => Output(key, s.finalResult, timedOut = true))
355
}
356
```