0
# Stream Processing Operators
1
2
Essential operators for transforming, filtering, and routing data through processing pipelines. These operators provide the building blocks for most stream processing use cases and form the core vocabulary of Bytewax dataflows.
3
4
## Capabilities
5
6
### Input/Output Operations
7
8
Operators for introducing data into dataflows and writing results to external systems.
9
10
```python { .api }
11
def input(step_id: str, flow: Dataflow, source: Source[X]) -> Stream[X]: ...
12
13
def output(step_id: str, up: Stream[X], sink: Sink[X]) -> None: ...
14
```
15
16
**input Parameters:**
17
- `step_id` (str): Unique identifier for this operator step
18
- `flow` (Dataflow): The dataflow to add input to
19
- `source` (Source[X]): Source to read items from
20
21
**output Parameters:**
22
- `step_id` (str): Unique identifier for this operator step
23
- `up` (Stream[X]): Stream of items to write
24
- `sink` (Sink[X]): Sink to write items to
25
26
**Usage Example:**
27
```python
28
from bytewax.testing import TestingSource
29
from bytewax.connectors.stdio import StdOutSink
30
31
# Add input to dataflow
32
stream = op.input("data_input", flow, TestingSource([1, 2, 3]))
33
34
# Output results
35
op.output("results", processed_stream, StdOutSink())
36
```
37
38
### Transformation Operations
39
40
Core operators for transforming data items one-by-one or one-to-many.
41
42
```python { .api }
43
def map(step_id: str, up: Stream[X], mapper: Callable[[X], Y]) -> Stream[Y]: ...
44
45
def map_value(step_id: str, up: KeyedStream[V], mapper: Callable[[V], W]) -> KeyedStream[W]: ...
46
47
def flat_map(step_id: str, up: Stream[X], mapper: Callable[[X], Iterable[Y]]) -> Stream[Y]: ...
48
49
def flat_map_value(step_id: str, up: KeyedStream[V], mapper: Callable[[V], Iterable[W]]) -> KeyedStream[W]: ...
50
51
def flat_map_batch(step_id: str, up: Stream[X], mapper: Callable[[List[X]], Iterable[Y]]) -> Stream[Y]: ...
52
```
53
54
**map Parameters:**
55
- `step_id` (str): Unique identifier
56
- `up` (Stream[X]): Input stream
57
- `mapper` (Callable[[X], Y]): Function to transform each item
58
59
**flat_map Parameters:**
60
- `step_id` (str): Unique identifier
61
- `up` (Stream[X]): Input stream
62
- `mapper` (Callable[[X], Iterable[Y]]): Function returning iterable of items
63
64
**Usage Examples:**
65
```python
66
# Transform each item
67
doubled = op.map("double", numbers, lambda x: x * 2)
68
69
# Transform values in keyed stream
70
processed = op.map_value("process", keyed_stream, lambda val: val.upper())
71
72
# One-to-many transformation
73
words = op.flat_map("split", sentences, lambda s: s.split())
74
75
# Batch processing for efficiency
76
processed = op.flat_map_batch("batch_process", stream, lambda batch: [expensive_transform(x) for x in batch])
77
```
78
79
### Filtering Operations
80
81
Operators for selectively keeping or transforming items based on predicates.
82
83
```python { .api }
84
def filter(step_id: str, up: Stream[X], predicate: Callable[[X], bool]) -> Stream[X]: ...
85
86
def filter_value(step_id: str, up: KeyedStream[V], predicate: Callable[[V], bool]) -> KeyedStream[V]: ...
87
88
def filter_map(step_id: str, up: Stream[X], mapper: Callable[[X], Optional[Y]]) -> Stream[Y]: ...
89
90
def filter_map_value(step_id: str, up: KeyedStream[V], mapper: Callable[[V], Optional[W]]) -> KeyedStream[W]: ...
91
```
92
93
**filter Parameters:**
94
- `step_id` (str): Unique identifier
95
- `up` (Stream[X]): Input stream
96
- `predicate` (Callable[[X], bool]): Function returning True to keep item
97
98
**filter_map Parameters:**
99
- `step_id` (str): Unique identifier
100
- `up` (Stream[X]): Input stream
101
- `mapper` (Callable[[X], Optional[Y]]): Function returning None to filter out, value to keep
102
103
**Usage Examples:**
104
```python
105
# Keep only even numbers
106
evens = op.filter("evens", numbers, lambda x: x % 2 == 0)
107
108
# Filter and transform in one step
109
valid_data = op.filter_map("validate", raw_data, lambda x: x.value if x.is_valid else None)
110
```
111
112
### Stream Management Operations
113
114
Operators for splitting, combining, and redistributing streams.
115
116
```python { .api }
117
def branch(step_id: str, up: Stream[X], predicate: Callable[[X], bool]) -> BranchOut[X, X]: ...
118
119
def merge(step_id: str, *ups: Stream[Any]) -> Stream[Any]: ...
120
121
def redistribute(step_id: str, up: Stream[X]) -> Stream[X]: ...
122
123
class BranchOut[X, Y]:
124
trues: Stream[X]
125
falses: Stream[Y]
126
```
127
128
**branch Parameters:**
129
- `step_id` (str): Unique identifier
130
- `up` (Stream[X]): Input stream to split
131
- `predicate` (Callable[[X], bool]): Function to determine which branch
132
133
**merge Parameters:**
134
- `step_id` (str): Unique identifier
135
- `*ups` (Stream[Any]): Multiple streams to combine
136
137
**Usage Examples:**
138
```python
139
# Split stream based on condition
140
branches = op.branch("split_data", stream, lambda x: x.priority == "high")
141
high_priority = branches.trues
142
low_priority = branches.falses
143
144
# Combine multiple streams
145
combined = op.merge("combine", stream1, stream2, stream3)
146
147
# Redistribute for better parallelization
148
redistributed = op.redistribute("rebalance", stream)
149
```
150
151
### Key Management Operations
152
153
Operators for working with keyed streams required for stateful operations.
154
155
```python { .api }
156
def key_on(step_id: str, up: Stream[X], key: Callable[[X], str]) -> KeyedStream[X]: ...
157
158
def key_rm(step_id: str, up: KeyedStream[X]) -> Stream[X]: ...
159
```
160
161
**key_on Parameters:**
162
- `step_id` (str): Unique identifier
163
- `up` (Stream[X]): Input stream
164
- `key` (Callable[[X], str]): Function to extract key from each item
165
166
**key_rm Parameters:**
167
- `step_id` (str): Unique identifier
168
- `up` (KeyedStream[X]): Keyed stream to remove keys from
169
170
**Usage Examples:**
171
```python
172
# Add keys for stateful operations
173
keyed = op.key_on("by_user", events, lambda e: e.user_id)
174
175
# Remove keys when no longer needed
176
unkeyed = op.key_rm("remove_keys", keyed_results)
177
```
178
179
### Flattening Operations
180
181
Operators for working with nested data structures.
182
183
```python { .api }
184
def flatten(step_id: str, up: Stream[Iterable[X]]) -> Stream[X]: ...
185
```
186
187
**Parameters:**
188
- `step_id` (str): Unique identifier
189
- `up` (Stream[Iterable[X]]): Stream of iterables to flatten
190
191
**Usage Example:**
192
```python
193
# Flatten nested lists
194
flattened = op.flatten("flatten", stream_of_lists)
195
```
196
197
### Utility Operations
198
199
Operators for debugging, inspection, and error handling.
200
201
```python { .api }
202
def inspect(step_id: str, up: Stream[X], inspector: Callable[[str, X], None] = _default_inspector) -> Stream[X]: ...
203
204
def inspect_debug(step_id: str, up: Stream[X], inspector: Callable[[str, X, int, int], None] = _default_debug_inspector) -> Stream[X]: ...
205
206
def raises(step_id: str, up: Stream[Any]) -> None: ...
207
```
208
209
**inspect Parameters:**
210
- `step_id` (str): Unique identifier
211
- `up` (Stream[X]): Stream to observe
212
- `inspector` (Callable): Function called for each item (default prints to stdout)
213
214
**inspect_debug Parameters:**
215
- `step_id` (str): Unique identifier
216
- `up` (Stream[X]): Stream to observe
217
- `inspector` (Callable): Function called with item, epoch, and worker info
218
219
**Usage Examples:**
220
```python
221
# Debug stream contents
222
debugged = op.inspect("debug_point", stream)
223
224
# Detailed debugging with worker info
225
detailed = op.inspect_debug("detailed_debug", stream)
226
227
# Crash on any item (for testing)
228
op.raises("should_be_empty", error_stream)
229
```
230
231
### Enrichment Operations
232
233
Operators for joining streams with external data sources.
234
235
```python { .api }
236
def enrich_cached(step_id: str, up: Stream[X], getter: Callable[[DK], DV], mapper: Callable[[TTLCache[DK, DV], X], Y], ttl: timedelta = timedelta.max, _now_getter: Callable[[], datetime] = _get_system_utc) -> Stream[Y]: ...
237
238
class TTLCache[DK, DV]:
239
def __init__(self, v_getter: Callable[[DK], DV], now_getter: Callable[[], datetime], ttl: timedelta): ...
240
def get(self, k: DK) -> DV: ...
241
def remove(self, k: DK) -> None: ...
242
```
243
244
**enrich_cached Parameters:**
245
- `step_id` (str): Unique identifier
246
- `up` (Stream[X]): Input stream
247
- `getter` (Callable[[DK], DV]): Function to fetch data for cache misses
248
- `mapper` (Callable[[TTLCache, X], Y]): Function to enrich each item using cache
249
- `ttl` (timedelta): Time-to-live for cached values
250
251
**Usage Example:**
252
```python
253
def lookup_user_data(user_id):
254
# Fetch from database or API
255
return user_database.get(user_id)
256
257
def enrich_event(cache, event):
258
user_data = cache.get(event.user_id)
259
return {**event, "user_name": user_data.name}
260
261
enriched = op.enrich_cached("enrich_users", events, lookup_user_data, enrich_event, ttl=timedelta(minutes=5))
262
```
263
264
### Final Aggregation Operations
265
266
Operators for performing final aggregations on finite streams.
267
268
```python { .api }
269
def count_final(step_id: str, up: Stream[X], key: Callable[[X], str]) -> KeyedStream[int]: ...
270
271
def max_final(step_id: str, up: KeyedStream[V], by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...
272
273
def min_final(step_id: str, up: KeyedStream[V], by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...
274
275
def reduce_final(step_id: str, up: KeyedStream[V], reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...
276
277
def fold_final(step_id: str, up: KeyedStream[V], builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...
278
```
279
280
**count_final Parameters:**
281
- `step_id` (str): Unique identifier
282
- `up` (Stream[X]): Stream of items to count
283
- `key` (Callable[[X], str]): Function to convert items to count keys
284
285
**reduce_final Parameters:**
286
- `step_id` (str): Unique identifier
287
- `up` (KeyedStream[V]): Keyed stream to reduce
288
- `reducer` (Callable[[V, V], V]): Function to combine two values
289
290
**fold_final Parameters:**
291
- `step_id` (str): Unique identifier
292
- `up` (KeyedStream[V]): Keyed stream to fold
293
- `builder` (Callable[[], S]): Function to build initial accumulator
294
- `folder` (Callable[[S, V], S]): Function to combine accumulator with value
295
296
**Usage Examples:**
297
```python
298
# Count occurrences by key
299
counts = op.count_final("count_words", words, lambda w: w)
300
301
# Find maximum value per key
302
max_values = op.max_final("find_max", keyed_numbers)
303
304
# Find minimum with custom comparison
305
min_by_priority = op.min_final("min_priority", tasks, lambda t: t.priority)
306
307
# Sum all values per key
308
sums = op.reduce_final("sum", keyed_numbers, lambda a, b: a + b)
309
310
# Build custom accumulator
311
def build_list():
312
return []
313
314
def add_to_list(lst, item):
315
return lst + [item]
316
317
lists = op.fold_final("collect", keyed_items, build_list, add_to_list)
318
```