or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connectors.mddataflow.mdindex.mdoperators.mdrecovery.mdruntime.mdsinks.mdsources.mdstateful.mdtesting.mdtracing.mdwindowing.md

operators.mddocs/

0

# Stream Processing Operators

1

2

Essential operators for transforming, filtering, and routing data through processing pipelines. These operators provide the building blocks for most stream processing use cases and form the core vocabulary of Bytewax dataflows.

3

4

## Capabilities

5

6

### Input/Output Operations

7

8

Operators for introducing data into dataflows and writing results to external systems.

9

10

```python { .api }

11

def input(step_id: str, flow: Dataflow, source: Source[X]) -> Stream[X]: ...

12

13

def output(step_id: str, up: Stream[X], sink: Sink[X]) -> None: ...

14

```

15

16

**input Parameters:**

17

- `step_id` (str): Unique identifier for this operator step

18

- `flow` (Dataflow): The dataflow to add input to

19

- `source` (Source[X]): Source to read items from

20

21

**output Parameters:**

22

- `step_id` (str): Unique identifier for this operator step

23

- `up` (Stream[X]): Stream of items to write

24

- `sink` (Sink[X]): Sink to write items to

25

26

**Usage Example:**

27

```python

28

from bytewax.testing import TestingSource

29

from bytewax.connectors.stdio import StdOutSink

30

31

# Add input to dataflow

32

stream = op.input("data_input", flow, TestingSource([1, 2, 3]))

33

34

# Output results

35

op.output("results", processed_stream, StdOutSink())

36

```

37

38

### Transformation Operations

39

40

Core operators for transforming data items one-by-one or one-to-many.

41

42

```python { .api }

43

def map(step_id: str, up: Stream[X], mapper: Callable[[X], Y]) -> Stream[Y]: ...

44

45

def map_value(step_id: str, up: KeyedStream[V], mapper: Callable[[V], W]) -> KeyedStream[W]: ...

46

47

def flat_map(step_id: str, up: Stream[X], mapper: Callable[[X], Iterable[Y]]) -> Stream[Y]: ...

48

49

def flat_map_value(step_id: str, up: KeyedStream[V], mapper: Callable[[V], Iterable[W]]) -> KeyedStream[W]: ...

50

51

def flat_map_batch(step_id: str, up: Stream[X], mapper: Callable[[List[X]], Iterable[Y]]) -> Stream[Y]: ...

52

```

53

54

**map Parameters:**

55

- `step_id` (str): Unique identifier

56

- `up` (Stream[X]): Input stream

57

- `mapper` (Callable[[X], Y]): Function to transform each item

58

59

**flat_map Parameters:**

60

- `step_id` (str): Unique identifier

61

- `up` (Stream[X]): Input stream

62

- `mapper` (Callable[[X], Iterable[Y]]): Function returning iterable of items

63

64

**Usage Examples:**

65

```python

66

# Transform each item

67

doubled = op.map("double", numbers, lambda x: x * 2)

68

69

# Transform values in keyed stream

70

processed = op.map_value("process", keyed_stream, lambda val: val.upper())

71

72

# One-to-many transformation

73

words = op.flat_map("split", sentences, lambda s: s.split())

74

75

# Batch processing for efficiency

76

processed = op.flat_map_batch("batch_process", stream, lambda batch: [expensive_transform(x) for x in batch])

77

```

78

79

### Filtering Operations

80

81

Operators for selectively keeping or transforming items based on predicates.

82

83

```python { .api }

84

def filter(step_id: str, up: Stream[X], predicate: Callable[[X], bool]) -> Stream[X]: ...

85

86

def filter_value(step_id: str, up: KeyedStream[V], predicate: Callable[[V], bool]) -> KeyedStream[V]: ...

87

88

def filter_map(step_id: str, up: Stream[X], mapper: Callable[[X], Optional[Y]]) -> Stream[Y]: ...

89

90

def filter_map_value(step_id: str, up: KeyedStream[V], mapper: Callable[[V], Optional[W]]) -> KeyedStream[W]: ...

91

```

92

93

**filter Parameters:**

94

- `step_id` (str): Unique identifier

95

- `up` (Stream[X]): Input stream

96

- `predicate` (Callable[[X], bool]): Function returning True to keep item

97

98

**filter_map Parameters:**

99

- `step_id` (str): Unique identifier

100

- `up` (Stream[X]): Input stream

101

- `mapper` (Callable[[X], Optional[Y]]): Function returning None to filter out, value to keep

102

103

**Usage Examples:**

104

```python

105

# Keep only even numbers

106

evens = op.filter("evens", numbers, lambda x: x % 2 == 0)

107

108

# Filter and transform in one step

109

valid_data = op.filter_map("validate", raw_data, lambda x: x.value if x.is_valid else None)

110

```

111

112

### Stream Management Operations

113

114

Operators for splitting, combining, and redistributing streams.

115

116

```python { .api }

117

def branch(step_id: str, up: Stream[X], predicate: Callable[[X], bool]) -> BranchOut[X, X]: ...

118

119

def merge(step_id: str, *ups: Stream[Any]) -> Stream[Any]: ...

120

121

def redistribute(step_id: str, up: Stream[X]) -> Stream[X]: ...

122

123

class BranchOut[X, Y]:

124

trues: Stream[X]

125

falses: Stream[Y]

126

```

127

128

**branch Parameters:**

129

- `step_id` (str): Unique identifier

130

- `up` (Stream[X]): Input stream to split

131

- `predicate` (Callable[[X], bool]): Function to determine which branch

132

133

**merge Parameters:**

134

- `step_id` (str): Unique identifier

135

- `*ups` (Stream[Any]): Multiple streams to combine

136

137

**Usage Examples:**

138

```python

139

# Split stream based on condition

140

branches = op.branch("split_data", stream, lambda x: x.priority == "high")

141

high_priority = branches.trues

142

low_priority = branches.falses

143

144

# Combine multiple streams

145

combined = op.merge("combine", stream1, stream2, stream3)

146

147

# Redistribute for better parallelization

148

redistributed = op.redistribute("rebalance", stream)

149

```

150

151

### Key Management Operations

152

153

Operators for working with keyed streams required for stateful operations.

154

155

```python { .api }

156

def key_on(step_id: str, up: Stream[X], key: Callable[[X], str]) -> KeyedStream[X]: ...

157

158

def key_rm(step_id: str, up: KeyedStream[X]) -> Stream[X]: ...

159

```

160

161

**key_on Parameters:**

162

- `step_id` (str): Unique identifier

163

- `up` (Stream[X]): Input stream

164

- `key` (Callable[[X], str]): Function to extract key from each item

165

166

**key_rm Parameters:**

167

- `step_id` (str): Unique identifier

168

- `up` (KeyedStream[X]): Keyed stream to remove keys from

169

170

**Usage Examples:**

171

```python

172

# Add keys for stateful operations

173

keyed = op.key_on("by_user", events, lambda e: e.user_id)

174

175

# Remove keys when no longer needed

176

unkeyed = op.key_rm("remove_keys", keyed_results)

177

```

178

179

### Flattening Operations

180

181

Operators for working with nested data structures.

182

183

```python { .api }

184

def flatten(step_id: str, up: Stream[Iterable[X]]) -> Stream[X]: ...

185

```

186

187

**Parameters:**

188

- `step_id` (str): Unique identifier

189

- `up` (Stream[Iterable[X]]): Stream of iterables to flatten

190

191

**Usage Example:**

192

```python

193

# Flatten nested lists

194

flattened = op.flatten("flatten", stream_of_lists)

195

```

196

197

### Utility Operations

198

199

Operators for debugging, inspection, and error handling.

200

201

```python { .api }

202

def inspect(step_id: str, up: Stream[X], inspector: Callable[[str, X], None] = _default_inspector) -> Stream[X]: ...

203

204

def inspect_debug(step_id: str, up: Stream[X], inspector: Callable[[str, X, int, int], None] = _default_debug_inspector) -> Stream[X]: ...

205

206

def raises(step_id: str, up: Stream[Any]) -> None: ...

207

```

208

209

**inspect Parameters:**

210

- `step_id` (str): Unique identifier

211

- `up` (Stream[X]): Stream to observe

212

- `inspector` (Callable): Function called for each item (default prints to stdout)

213

214

**inspect_debug Parameters:**

215

- `step_id` (str): Unique identifier

216

- `up` (Stream[X]): Stream to observe

217

- `inspector` (Callable): Function called with item, epoch, and worker info

218

219

**Usage Examples:**

220

```python

221

# Debug stream contents

222

debugged = op.inspect("debug_point", stream)

223

224

# Detailed debugging with worker info

225

detailed = op.inspect_debug("detailed_debug", stream)

226

227

# Crash on any item (for testing)

228

op.raises("should_be_empty", error_stream)

229

```

230

231

### Enrichment Operations

232

233

Operators for joining streams with external data sources.

234

235

```python { .api }

236

def enrich_cached(step_id: str, up: Stream[X], getter: Callable[[DK], DV], mapper: Callable[[TTLCache[DK, DV], X], Y], ttl: timedelta = timedelta.max, _now_getter: Callable[[], datetime] = _get_system_utc) -> Stream[Y]: ...

237

238

class TTLCache[DK, DV]:

239

def __init__(self, v_getter: Callable[[DK], DV], now_getter: Callable[[], datetime], ttl: timedelta): ...

240

def get(self, k: DK) -> DV: ...

241

def remove(self, k: DK) -> None: ...

242

```

243

244

**enrich_cached Parameters:**

245

- `step_id` (str): Unique identifier

246

- `up` (Stream[X]): Input stream

247

- `getter` (Callable[[DK], DV]): Function to fetch data for cache misses

248

- `mapper` (Callable[[TTLCache, X], Y]): Function to enrich each item using cache

249

- `ttl` (timedelta): Time-to-live for cached values

250

251

**Usage Example:**

252

```python

253

def lookup_user_data(user_id):

254

# Fetch from database or API

255

return user_database.get(user_id)

256

257

def enrich_event(cache, event):

258

user_data = cache.get(event.user_id)

259

return {**event, "user_name": user_data.name}

260

261

enriched = op.enrich_cached("enrich_users", events, lookup_user_data, enrich_event, ttl=timedelta(minutes=5))

262

```

263

264

### Final Aggregation Operations

265

266

Operators for performing final aggregations on finite streams.

267

268

```python { .api }

269

def count_final(step_id: str, up: Stream[X], key: Callable[[X], str]) -> KeyedStream[int]: ...

270

271

def max_final(step_id: str, up: KeyedStream[V], by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...

272

273

def min_final(step_id: str, up: KeyedStream[V], by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...

274

275

def reduce_final(step_id: str, up: KeyedStream[V], reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...

276

277

def fold_final(step_id: str, up: KeyedStream[V], builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...

278

```

279

280

**count_final Parameters:**

281

- `step_id` (str): Unique identifier

282

- `up` (Stream[X]): Stream of items to count

283

- `key` (Callable[[X], str]): Function to convert items to count keys

284

285

**reduce_final Parameters:**

286

- `step_id` (str): Unique identifier

287

- `up` (KeyedStream[V]): Keyed stream to reduce

288

- `reducer` (Callable[[V, V], V]): Function to combine two values

289

290

**fold_final Parameters:**

291

- `step_id` (str): Unique identifier

292

- `up` (KeyedStream[V]): Keyed stream to fold

293

- `builder` (Callable[[], S]): Function to build initial accumulator

294

- `folder` (Callable[[S, V], S]): Function to combine accumulator with value

295

296

**Usage Examples:**

297

```python

298

# Count occurrences by key

299

counts = op.count_final("count_words", words, lambda w: w)

300

301

# Find maximum value per key

302

max_values = op.max_final("find_max", keyed_numbers)

303

304

# Find minimum with custom comparison

305

min_by_priority = op.min_final("min_priority", tasks, lambda t: t.priority)

306

307

# Sum all values per key

308

sums = op.reduce_final("sum", keyed_numbers, lambda a, b: a + b)

309

310

# Build custom accumulator

311

def build_list():

312

return []

313

314

def add_to_list(lst, item):

315

return lst + [item]

316

317

lists = op.fold_final("collect", keyed_items, build_list, add_to_list)

318

```