or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connectors.mddataflow.mdindex.mdoperators.mdrecovery.mdruntime.mdsinks.mdsources.mdstateful.mdtesting.mdtracing.mdwindowing.md

stateful.mddocs/

0

# Stateful Processing

1

2

Advanced operators for maintaining state across events, enabling complex event processing patterns like aggregations, session tracking, and multi-stream correlations. All stateful operators work on keyed streams to ensure correct state partitioning in distributed environments.

3

4

## Capabilities

5

6

### Basic Stateful Operations

7

8

Simple stateful transformations that maintain state per key and emit results based on state updates.

9

10

```python { .api }

11

def stateful_map(step_id: str, up: KeyedStream[V], mapper: Callable[[Optional[S], V], Tuple[Optional[S], W]]) -> KeyedStream[W]: ...

12

13

def stateful_flat_map(step_id: str, up: KeyedStream[V], mapper: Callable[[Optional[S], V], Tuple[Optional[S], Iterable[W]]]) -> KeyedStream[W]: ...

14

```

15

16

**stateful_map Parameters:**

17

- `step_id` (str): Unique identifier

18

- `up` (KeyedStream[V]): Input keyed stream

19

- `mapper` (Callable): Function receiving current state and value, returning (new_state, output_value)

20

21

**stateful_flat_map Parameters:**

22

- `step_id` (str): Unique identifier

23

- `up` (KeyedStream[V]): Input keyed stream

24

- `mapper` (Callable): Function receiving current state and value, returning (new_state, output_values)

25

26

**Usage Examples:**

27

```python

28

# Running count per key

29

def counter(state, value):

30

count = (state or 0) + 1

31

return count, count

32

33

counts = op.stateful_map("count", keyed_stream, counter)

34

35

# Session tracking with multiple outputs

36

def session_tracker(state, event):

37

if state is None:

38

state = {"start": event.timestamp, "events": []}

39

40

state["events"].append(event)

41

42

if event.type == "session_end":

43

summary = {"duration": event.timestamp - state["start"], "event_count": len(state["events"])}

44

return None, [summary] # Discard state, emit summary

45

else:

46

return state, [] # Keep state, no output yet

47

48

sessions = op.stateful_flat_map("sessions", keyed_events, session_tracker)

49

```

50

51

### Aggregation Operations (Finite Streams)

52

53

Aggregation operators that work on finite streams and emit results only when the upstream completes.

54

55

```python { .api }

56

def reduce_final(step_id: str, up: KeyedStream[V], reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...

57

58

def fold_final(step_id: str, up: KeyedStream[V], builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...

59

60

def count_final(step_id: str, up: Stream[X], key: Callable[[X], str]) -> KeyedStream[int]: ...

61

62

def max_final(step_id: str, up: KeyedStream[V], by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...

63

64

def min_final(step_id: str, up: KeyedStream[V], by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...

65

```

66

67

**reduce_final Parameters:**

68

- `step_id` (str): Unique identifier

69

- `up` (KeyedStream[V]): Input keyed stream

70

- `reducer` (Callable[[V, V], V]): Function to combine two values

71

72

**fold_final Parameters:**

73

- `step_id` (str): Unique identifier

74

- `up` (KeyedStream[V]): Input keyed stream

75

- `builder` (Callable[[], S]): Function to create initial accumulator

76

- `folder` (Callable[[S, V], S]): Function to combine accumulator with new value

77

78

**Usage Examples:**

79

```python

80

# Sum all values per key

81

totals = op.reduce_final("sum", keyed_numbers, lambda acc, val: acc + val)

82

83

# Build complex aggregations

84

def create_stats():

85

return {"sum": 0, "count": 0, "min": float('inf'), "max": float('-inf')}

86

87

def update_stats(stats, value):

88

return {

89

"sum": stats["sum"] + value,

90

"count": stats["count"] + 1,

91

"min": min(stats["min"], value),

92

"max": max(stats["max"], value)

93

}

94

95

stats = op.fold_final("statistics", keyed_numbers, create_stats, update_stats)

96

97

# Count occurrences

98

word_counts = op.count_final("word_count", words, lambda word: word.lower())

99

```

100

101

### Collection Operations

102

103

Operators for collecting items into batches based on size or time constraints.

104

105

```python { .api }

106

def collect(step_id: str, up: KeyedStream[V], timeout: timedelta, max_size: int) -> KeyedStream[List[V]]: ...

107

```

108

109

**Parameters:**

110

- `step_id` (str): Unique identifier

111

- `up` (KeyedStream[V]): Input keyed stream

112

- `timeout` (timedelta): Maximum time to wait before emitting collected items

113

- `max_size` (int): Maximum number of items to collect before emitting

114

115

**Usage Example:**

116

```python

117

from datetime import timedelta

118

119

# Collect events into batches of up to 10 items or every 5 seconds

120

batches = op.collect("batch_events", keyed_events, timedelta(seconds=5), 10)

121

```

122

123

### Join Operations

124

125

Operators for correlating data across multiple keyed streams.

126

127

```python { .api }

128

def join(step_id: str, *sides: KeyedStream[Any], insert_mode: JoinInsertMode = "last", emit_mode: JoinEmitMode = "complete") -> KeyedStream[Tuple]: ...

129

130

JoinInsertMode = Literal["first", "last", "product"]

131

JoinEmitMode = Literal["complete", "final", "running"]

132

```

133

134

**Parameters:**

135

- `step_id` (str): Unique identifier

136

- `*sides` (KeyedStream[Any]): Multiple keyed streams to join

137

- `insert_mode` (JoinInsertMode): How to handle multiple values from same side

138

- `"first"`: Keep only first value per side

139

- `"last"`: Keep only last value per side

140

- `"product"`: Cross-product of all values

141

- `emit_mode` (JoinEmitMode): When to emit join results

142

- `"complete"`: Emit when all sides have values, then discard state

143

- `"final"`: Emit when upstream ends (finite streams only)

144

- `"running"`: Emit every time any side updates

145

146

**Usage Examples:**

147

```python

148

# Inner join - emit when both sides have data

149

user_orders = op.join("user_order_join", users, orders, emit_mode="complete")

150

151

# Left join - emit final results including partial matches

152

all_users = op.join("left_join", users, orders, emit_mode="final")

153

154

# Running join - emit updates as they arrive

155

live_correlation = op.join("live_join", stream1, stream2, emit_mode="running")

156

```

157

158

### Advanced Stateful Operations

159

160

Low-level stateful operators providing maximum control over state management and lifecycle.

161

162

```python { .api }

163

def stateful(step_id: str, up: KeyedStream[V], builder: Callable[[Optional[S]], StatefulLogic[V, W, S]]) -> KeyedStream[W]: ...

164

165

def stateful_batch(step_id: str, up: KeyedStream[V], builder: Callable[[Optional[S]], StatefulBatchLogic[V, W, S]]) -> KeyedStream[W]: ...

166

167

class StatefulLogic[V, W, S]:

168

RETAIN: bool = False

169

DISCARD: bool = True

170

171

def on_item(self, value: V) -> Tuple[Iterable[W], bool]: ...

172

def on_notify(self) -> Tuple[Iterable[W], bool]: ...

173

def on_eof(self) -> Tuple[Iterable[W], bool]: ...

174

def notify_at(self) -> Optional[datetime]: ...

175

def snapshot(self) -> S: ...

176

177

class StatefulBatchLogic[V, W, S]:

178

RETAIN: bool = False

179

DISCARD: bool = True

180

181

def on_batch(self, values: List[V]) -> Tuple[Iterable[W], bool]: ...

182

def on_notify(self) -> Tuple[Iterable[W], bool]: ...

183

def on_eof(self) -> Tuple[Iterable[W], bool]: ...

184

def notify_at(self) -> Optional[datetime]: ...

185

def snapshot(self) -> S: ...

186

```

187

188

**StatefulLogic Methods:**

189

- `on_item`: Called for each new value, returns (outputs, should_discard_state)

190

- `on_notify`: Called when notification time is reached

191

- `on_eof`: Called when upstream ends

192

- `notify_at`: Returns next notification time or None

193

- `snapshot`: Returns state for recovery (must be pickle-able and immutable)

194

195

**Usage Example:**

196

```python

197

from datetime import datetime, timedelta

198

199

class SlidingWindowLogic(StatefulLogic):

200

def __init__(self, window_size):

201

self.window_size = window_size

202

self.items = []

203

204

def on_item(self, value):

205

now = datetime.now()

206

self.items.append((now, value))

207

208

# Remove old items

209

cutoff = now - self.window_size

210

self.items = [(ts, val) for ts, val in self.items if ts > cutoff]

211

212

# Emit current window sum

213

total = sum(val for _, val in self.items)

214

return [total], StatefulLogic.RETAIN

215

216

def notify_at(self):

217

if self.items:

218

# Next cleanup time

219

return self.items[0][0] + self.window_size

220

return None

221

222

def on_notify(self):

223

# Clean up expired items

224

now = datetime.now()

225

cutoff = now - self.window_size

226

old_count = len(self.items)

227

self.items = [(ts, val) for ts, val in self.items if ts > cutoff]

228

229

if len(self.items) != old_count:

230

total = sum(val for _, val in self.items)

231

return [total], StatefulLogic.RETAIN

232

return [], StatefulLogic.RETAIN

233

234

def snapshot(self):

235

return copy.deepcopy(self.items)

236

237

def build_sliding_window(resume_state):

238

logic = SlidingWindowLogic(timedelta(minutes=5))

239

if resume_state:

240

logic.items = resume_state

241

return logic

242

243

windowed = op.stateful("sliding_window", keyed_stream, build_sliding_window)

244

```