or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connectors.mddataflow.mdindex.mdoperators.mdrecovery.mdruntime.mdsinks.mdsources.mdstateful.mdtesting.mdtracing.mdwindowing.md

windowing.mddocs/

0

# Windowing Operations

1

2

Time-based windowing operators for processing streams in temporal buckets. Supports tumbling, sliding, and session windows with various aggregation functions for real-time analytics and temporal pattern detection.

3

4

## Capabilities

5

6

### Window Aggregation Operations

7

8

Core windowing operators that apply aggregation functions over time-based windows.

9

10

```python { .api }

11

def collect_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower) -> KeyedStream[List[V]]: ...

12

13

def fold_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...

14

15

def reduce_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...

16

17

def count_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, key: Callable[[V], str]) -> KeyedStream[int]: ...

18

19

def max_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...

20

21

def min_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...

22

```

23

24

**Common Parameters:**

25

- `step_id` (str): Unique identifier

26

- `up` (KeyedStream[V]): Input keyed stream

27

- `clock` (Clock): Clock for extracting timestamps

28

- `windower` (Windower): Window creation strategy

29

30

**collect_window**: Collects all items in each window into lists

31

**fold_window**: Applies fold operation over items in each window

32

**reduce_window**: Applies reduce operation over items in each window

33

**count_window**: Counts items in each window

34

**max_window/min_window**: Finds maximum/minimum in each window

35

36

**Usage Examples:**

37

```python

38

from datetime import timedelta

39

import bytewax.operators.windowing as win

40

41

# Collect events into 5-minute tumbling windows

42

windowed_events = win.collect_window(

43

"5min_windows",

44

keyed_events,

45

win.EventClock(lambda e: e.timestamp),

46

win.TumblingWindower(timedelta(minutes=5))

47

)

48

49

# Calculate running totals in sliding windows

50

def create_total():

51

return 0

52

53

def add_to_total(total, event):

54

return total + event.amount

55

56

sliding_totals = win.fold_window(

57

"sliding_totals",

58

keyed_transactions,

59

win.EventClock(lambda t: t.timestamp),

60

win.SlidingWindower(timedelta(hours=1), timedelta(minutes=15)),

61

create_total,

62

add_to_total

63

)

64

```

65

66

### Window Join Operations

67

68

Join multiple keyed streams within time windows.

69

70

```python { .api }

71

def join_window(step_id: str, *sides: KeyedStream[Any], clock: Clock, windower: Windower, insert_mode: JoinInsertMode = "last", emit_mode: JoinEmitMode = "complete") -> KeyedStream[Tuple]: ...

72

```

73

74

**Parameters:**

75

- `step_id` (str): Unique identifier

76

- `*sides` (KeyedStream[Any]): Multiple keyed streams to join

77

- `clock` (Clock): Clock for extracting timestamps

78

- `windower` (Windower): Window creation strategy

79

- `insert_mode` (JoinInsertMode): How to handle multiple values per side

80

- `emit_mode` (JoinEmitMode): When to emit join results

81

82

**Usage Example:**

83

```python

84

# Join user actions with user profiles within 1-hour windows

85

user_activity = win.join_window(

86

"activity_join",

87

user_actions, user_profiles,

88

clock=win.EventClock(lambda e: e.timestamp),

89

windower=win.TumblingWindower(timedelta(hours=1)),

90

emit_mode="complete"

91

)

92

```

93

94

### Clock Implementations

95

96

Classes for extracting timestamps from events or using system time.

97

98

```python { .api }

99

class Clock:

100

def extract_ts(self, value: Any) -> datetime: ...

101

102

class EventClock(Clock):

103

def __init__(self, ts_getter: Callable[[Any], datetime], wait_for_system_duration: timedelta = timedelta(0)): ...

104

105

class SystemClock(Clock):

106

def __init__(self): ...

107

```

108

109

**EventClock Parameters:**

110

- `ts_getter` (Callable): Function to extract timestamp from event

111

- `wait_for_system_duration` (timedelta): How long to wait for late events (default: no wait)

112

113

**Usage Examples:**

114

```python

115

# Extract timestamp from event field

116

event_clock = win.EventClock(lambda event: event.created_at)

117

118

# Extract from nested structure with late event tolerance

119

complex_clock = win.EventClock(

120

lambda msg: datetime.fromisoformat(msg.metadata.timestamp),

121

wait_for_system_duration=timedelta(seconds=30)

122

)

123

124

# Use system time (processing time)

125

system_clock = win.SystemClock()

126

```

127

128

### Windower Implementations

129

130

Classes defining different window creation strategies.

131

132

```python { .api }

133

class Windower:

134

def windows_for_time(self, timestamp: datetime) -> List[Tuple[str, datetime, datetime]]: ...

135

136

class TumblingWindower(Windower):

137

def __init__(self, length: timedelta): ...

138

139

class SlidingWindower(Windower):

140

def __init__(self, length: timedelta, offset: timedelta): ...

141

142

class SessionWindower(Windower):

143

def __init__(self, gap: timedelta): ...

144

```

145

146

**TumblingWindower Parameters:**

147

- `length` (timedelta): Size of each window (non-overlapping)

148

149

**SlidingWindower Parameters:**

150

- `length` (timedelta): Size of each window

151

- `offset` (timedelta): Slide distance between windows

152

153

**SessionWindower Parameters:**

154

- `gap` (timedelta): Maximum gap between events in same session

155

156

**Usage Examples:**

157

```python

158

# 15-minute tumbling windows (no overlap)

159

tumbling = win.TumblingWindower(timedelta(minutes=15))

160

161

# 1-hour windows sliding every 15 minutes (overlap)

162

sliding = win.SlidingWindower(

163

length=timedelta(hours=1),

164

offset=timedelta(minutes=15)

165

)

166

167

# Session windows with 30-minute timeout

168

sessions = win.SessionWindower(timedelta(minutes=30))

169

```

170

171

### Window Metadata Types

172

173

Types and structures for working with window information.

174

175

```python { .api }

176

WindowMetadata = Tuple[str, datetime, datetime] # (window_id, start_time, end_time)

177

178

class WindowConfig:

179

def __init__(self, clock: Clock, windower: Windower): ...

180

```

181

182

### Advanced Windowing Patterns

183

184

**Tumbling Windows Example:**

185

```python

186

# Non-overlapping 5-minute windows for real-time metrics

187

metrics = win.fold_window(

188

"realtime_metrics",

189

sensor_data,

190

win.EventClock(lambda reading: reading.timestamp),

191

win.TumblingWindower(timedelta(minutes=5)),

192

lambda: {"sum": 0, "count": 0, "min": float('inf'), "max": float('-inf')},

193

lambda acc, reading: {

194

"sum": acc["sum"] + reading.value,

195

"count": acc["count"] + 1,

196

"min": min(acc["min"], reading.value),

197

"max": max(acc["max"], reading.value)

198

}

199

)

200

```

201

202

**Sliding Windows Example:**

203

```python

204

# Overlapping windows for trend analysis

205

trends = win.collect_window(

206

"trend_analysis",

207

stock_prices,

208

win.EventClock(lambda price: price.timestamp),

209

win.SlidingWindower(

210

length=timedelta(hours=4), # 4-hour windows

211

offset=timedelta(hours=1) # New window every hour

212

)

213

)

214

```

215

216

**Session Windows Example:**

217

```python

218

# Group user interactions into sessions

219

user_sessions = win.collect_window(

220

"user_sessions",

221

user_events,

222

win.EventClock(lambda event: event.timestamp),

223

win.SessionWindower(timedelta(minutes=30)) # 30-minute session timeout

224

)

225

```

226

227

**Late Event Handling:**

228

```python

229

# Handle events arriving up to 1 minute late

230

tolerant_clock = win.EventClock(

231

lambda event: event.event_time,

232

wait_for_system_duration=timedelta(minutes=1)

233

)

234

235

late_tolerant_windows = win.collect_window(

236

"late_tolerant",

237

events,

238

tolerant_clock,

239

win.TumblingWindower(timedelta(minutes=5))

240

)

241

```