0
# Stateful Processing
1
2
Advanced operators for maintaining state across events, enabling complex event processing patterns like aggregations, session tracking, and multi-stream correlations. All stateful operators work on keyed streams to ensure correct state partitioning in distributed environments.
3
4
## Capabilities
5
6
### Basic Stateful Operations
7
8
Simple stateful transformations that maintain state per key and emit results based on state updates.
9
10
```python { .api }
11
def stateful_map(step_id: str, up: KeyedStream[V], mapper: Callable[[Optional[S], V], Tuple[Optional[S], W]]) -> KeyedStream[W]: ...
12
13
def stateful_flat_map(step_id: str, up: KeyedStream[V], mapper: Callable[[Optional[S], V], Tuple[Optional[S], Iterable[W]]]) -> KeyedStream[W]: ...
14
```
15
16
**stateful_map Parameters:**
17
- `step_id` (str): Unique identifier
18
- `up` (KeyedStream[V]): Input keyed stream
19
- `mapper` (Callable): Function receiving current state and value, returning (new_state, output_value)
20
21
**stateful_flat_map Parameters:**
22
- `step_id` (str): Unique identifier
23
- `up` (KeyedStream[V]): Input keyed stream
24
- `mapper` (Callable): Function receiving current state and value, returning (new_state, output_values)
25
26
**Usage Examples:**
27
```python
28
# Running count per key
29
def counter(state, value):
30
count = (state or 0) + 1
31
return count, count
32
33
counts = op.stateful_map("count", keyed_stream, counter)
34
35
# Session tracking with multiple outputs
36
def session_tracker(state, event):
37
if state is None:
38
state = {"start": event.timestamp, "events": []}
39
40
state["events"].append(event)
41
42
if event.type == "session_end":
43
summary = {"duration": event.timestamp - state["start"], "event_count": len(state["events"])}
44
return None, [summary] # Discard state, emit summary
45
else:
46
return state, [] # Keep state, no output yet
47
48
sessions = op.stateful_flat_map("sessions", keyed_events, session_tracker)
49
```
50
51
### Aggregation Operations (Finite Streams)
52
53
Aggregation operators that work on finite streams and emit results only when the upstream completes.
54
55
```python { .api }
56
def reduce_final(step_id: str, up: KeyedStream[V], reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...
57
58
def fold_final(step_id: str, up: KeyedStream[V], builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...
59
60
def count_final(step_id: str, up: Stream[X], key: Callable[[X], str]) -> KeyedStream[int]: ...
61
62
def max_final(step_id: str, up: KeyedStream[V], by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...
63
64
def min_final(step_id: str, up: KeyedStream[V], by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...
65
```
66
67
**reduce_final Parameters:**
68
- `step_id` (str): Unique identifier
69
- `up` (KeyedStream[V]): Input keyed stream
70
- `reducer` (Callable[[V, V], V]): Function to combine two values
71
72
**fold_final Parameters:**
73
- `step_id` (str): Unique identifier
74
- `up` (KeyedStream[V]): Input keyed stream
75
- `builder` (Callable[[], S]): Function to create initial accumulator
76
- `folder` (Callable[[S, V], S]): Function to combine accumulator with new value
77
78
**Usage Examples:**
79
```python
80
# Sum all values per key
81
totals = op.reduce_final("sum", keyed_numbers, lambda acc, val: acc + val)
82
83
# Build complex aggregations
84
def create_stats():
85
return {"sum": 0, "count": 0, "min": float('inf'), "max": float('-inf')}
86
87
def update_stats(stats, value):
88
return {
89
"sum": stats["sum"] + value,
90
"count": stats["count"] + 1,
91
"min": min(stats["min"], value),
92
"max": max(stats["max"], value)
93
}
94
95
stats = op.fold_final("statistics", keyed_numbers, create_stats, update_stats)
96
97
# Count occurrences
98
word_counts = op.count_final("word_count", words, lambda word: word.lower())
99
```
100
101
### Collection Operations
102
103
Operators for collecting items into batches based on size or time constraints.
104
105
```python { .api }
106
def collect(step_id: str, up: KeyedStream[V], timeout: timedelta, max_size: int) -> KeyedStream[List[V]]: ...
107
```
108
109
**Parameters:**
110
- `step_id` (str): Unique identifier
111
- `up` (KeyedStream[V]): Input keyed stream
112
- `timeout` (timedelta): Maximum time to wait before emitting collected items
113
- `max_size` (int): Maximum number of items to collect before emitting
114
115
**Usage Example:**
116
```python
117
from datetime import timedelta
118
119
# Collect events into batches of up to 10 items or every 5 seconds
120
batches = op.collect("batch_events", keyed_events, timedelta(seconds=5), 10)
121
```
122
123
### Join Operations
124
125
Operators for correlating data across multiple keyed streams.
126
127
```python { .api }
128
def join(step_id: str, *sides: KeyedStream[Any], insert_mode: JoinInsertMode = "last", emit_mode: JoinEmitMode = "complete") -> KeyedStream[Tuple]: ...
129
130
JoinInsertMode = Literal["first", "last", "product"]
131
JoinEmitMode = Literal["complete", "final", "running"]
132
```
133
134
**Parameters:**
135
- `step_id` (str): Unique identifier
136
- `*sides` (KeyedStream[Any]): Multiple keyed streams to join
137
- `insert_mode` (JoinInsertMode): How to handle multiple values from same side
138
- `"first"`: Keep only first value per side
139
- `"last"`: Keep only last value per side
140
- `"product"`: Cross-product of all values
141
- `emit_mode` (JoinEmitMode): When to emit join results
142
- `"complete"`: Emit when all sides have values, then discard state
143
- `"final"`: Emit when upstream ends (finite streams only)
144
- `"running"`: Emit every time any side updates
145
146
**Usage Examples:**
147
```python
148
# Inner join - emit when both sides have data
149
user_orders = op.join("user_order_join", users, orders, emit_mode="complete")
150
151
# Left join - emit final results including partial matches
152
all_users = op.join("left_join", users, orders, emit_mode="final")
153
154
# Running join - emit updates as they arrive
155
live_correlation = op.join("live_join", stream1, stream2, emit_mode="running")
156
```
157
158
### Advanced Stateful Operations
159
160
Low-level stateful operators providing maximum control over state management and lifecycle.
161
162
```python { .api }
163
def stateful(step_id: str, up: KeyedStream[V], builder: Callable[[Optional[S]], StatefulLogic[V, W, S]]) -> KeyedStream[W]: ...
164
165
def stateful_batch(step_id: str, up: KeyedStream[V], builder: Callable[[Optional[S]], StatefulBatchLogic[V, W, S]]) -> KeyedStream[W]: ...
166
167
class StatefulLogic[V, W, S]:
168
RETAIN: bool = False
169
DISCARD: bool = True
170
171
def on_item(self, value: V) -> Tuple[Iterable[W], bool]: ...
172
def on_notify(self) -> Tuple[Iterable[W], bool]: ...
173
def on_eof(self) -> Tuple[Iterable[W], bool]: ...
174
def notify_at(self) -> Optional[datetime]: ...
175
def snapshot(self) -> S: ...
176
177
class StatefulBatchLogic[V, W, S]:
178
RETAIN: bool = False
179
DISCARD: bool = True
180
181
def on_batch(self, values: List[V]) -> Tuple[Iterable[W], bool]: ...
182
def on_notify(self) -> Tuple[Iterable[W], bool]: ...
183
def on_eof(self) -> Tuple[Iterable[W], bool]: ...
184
def notify_at(self) -> Optional[datetime]: ...
185
def snapshot(self) -> S: ...
186
```
187
188
**StatefulLogic Methods:**
189
- `on_item`: Called for each new value, returns (outputs, should_discard_state)
190
- `on_notify`: Called when notification time is reached
191
- `on_eof`: Called when upstream ends
192
- `notify_at`: Returns next notification time or None
193
- `snapshot`: Returns state for recovery (must be pickle-able and immutable)
194
195
**Usage Example:**
196
```python
197
from datetime import datetime, timedelta
198
199
class SlidingWindowLogic(StatefulLogic):
200
def __init__(self, window_size):
201
self.window_size = window_size
202
self.items = []
203
204
def on_item(self, value):
205
now = datetime.now()
206
self.items.append((now, value))
207
208
# Remove old items
209
cutoff = now - self.window_size
210
self.items = [(ts, val) for ts, val in self.items if ts > cutoff]
211
212
# Emit current window sum
213
total = sum(val for _, val in self.items)
214
return [total], StatefulLogic.RETAIN
215
216
def notify_at(self):
217
if self.items:
218
# Next cleanup time
219
return self.items[0][0] + self.window_size
220
return None
221
222
def on_notify(self):
223
# Clean up expired items
224
now = datetime.now()
225
cutoff = now - self.window_size
226
old_count = len(self.items)
227
self.items = [(ts, val) for ts, val in self.items if ts > cutoff]
228
229
if len(self.items) != old_count:
230
total = sum(val for _, val in self.items)
231
return [total], StatefulLogic.RETAIN
232
return [], StatefulLogic.RETAIN
233
234
def snapshot(self):
235
return copy.deepcopy(self.items)
236
237
def build_sliding_window(resume_state):
238
logic = SlidingWindowLogic(timedelta(minutes=5))
239
if resume_state:
240
logic.items = resume_state
241
return logic
242
243
windowed = op.stateful("sliding_window", keyed_stream, build_sliding_window)
244
```