Python Stateful Stream Processing Framework
npx @tessl/cli install tessl/pypi-bytewax@0.21.00
# Bytewax
1
2
A Python framework that simplifies event and stream processing by coupling the stream and event processing capabilities of systems like Flink, Spark, and Kafka Streams with Python's familiar interface. Built with a Rust distributed processing engine using PyO3 bindings, it provides parallelizable stream processing through a dataflow computational model that supports stateful transformations, windowing operations, joins, and real-time aggregations.
3
4
## Package Information
5
6
- **Package Name**: bytewax
7
- **Language**: Python
8
- **Installation**: `pip install bytewax`
9
- **Optional Dependencies**: `pip install bytewax[kafka]` for Kafka connectors
10
11
## Core Imports
12
13
```python
14
# Core dataflow construction
15
from bytewax.dataflow import Dataflow
16
17
# Built-in operators for stream processing
18
import bytewax.operators as op
19
20
# Windowing operators for time-based processing
21
import bytewax.operators.windowing as win
22
23
# Runtime execution
24
import bytewax.run
25
26
# Testing utilities
27
import bytewax.testing
28
29
# Input and output interfaces
30
from bytewax.inputs import Source
31
from bytewax.outputs import Sink
32
33
# Built-in connectors
34
from bytewax.connectors.stdio import StdOutSink, StdInSource
35
from bytewax.connectors.files import FileSource, FileSink, CSVSource, DirSource, DirSink
36
from bytewax.connectors.kafka import KafkaSource, KafkaSink, KafkaSourceMessage, KafkaSinkMessage
37
from bytewax.connectors.kafka import operators as kop
38
from bytewax.connectors.demo import RandomMetricSource
39
```
40
41
## Basic Usage
42
43
```python
44
from bytewax.dataflow import Dataflow
45
import bytewax.operators as op
46
from bytewax.testing import TestingSource, run_main
47
from bytewax.connectors.stdio import StdOutSink
48
49
# Create a dataflow
50
flow = Dataflow("basic_example")
51
52
# Add input data
53
input_data = [1, 2, 3, 4, 5]
54
stream = op.input("input", flow, TestingSource(input_data))
55
56
# Transform the data
57
doubled = op.map("double", stream, lambda x: x * 2)
58
59
# Filter even results
60
evens = op.filter("evens", doubled, lambda x: x % 4 == 0)
61
62
# Output results
63
op.output("output", evens, StdOutSink())
64
65
# Run the dataflow
66
run_main(flow)
67
```
68
69
## Architecture
70
71
Bytewax's architecture consists of several key components that work together to provide scalable stream processing:
72
73
- **Dataflow**: The main container that defines the processing graph and topology
74
- **Operators**: Processing primitives that transform, filter, aggregate, and route data
75
- **Streams**: Typed data flows between operators that can be keyed for stateful operations
76
- **Sources/Sinks**: Input and output connectors for external systems
77
- **Runtime**: Rust-based execution engine that handles distribution, state management, and recovery
78
- **Recovery System**: Built-in state snapshotting and recovery mechanisms for fault tolerance
79
80
This design enables seamless scaling from single-process development to multi-worker distributed deployments across multiple hosts, with support for exactly-once processing guarantees through state recovery mechanisms.
81
82
## Capabilities
83
84
### Dataflow Construction
85
86
Core functionality for building and configuring stream processing dataflows. Includes the main Dataflow class and Stream abstractions that form the foundation of all processing topologies.
87
88
```python { .api }
89
class Dataflow:
90
def __init__(self, name: str): ...
91
92
class Stream[X]:
93
def __init__(self, id: str, scope): ...
94
```
95
96
[Dataflow Construction](./dataflow.md)
97
98
### Stream Processing Operators
99
100
Essential operators for transforming, filtering, and routing data through processing pipelines. These operators provide the building blocks for most stream processing use cases including map, filter, flat_map, branch, merge, and more.
101
102
```python { .api }
103
def input(step_id: str, flow: Dataflow, source: Source[X]) -> Stream[X]: ...
104
def output(step_id: str, up: Stream[X], sink: Sink[X]) -> None: ...
105
def map(step_id: str, up: Stream[X], mapper: Callable[[X], Y]) -> Stream[Y]: ...
106
def filter(step_id: str, up: Stream[X], predicate: Callable[[X], bool]) -> Stream[X]: ...
107
def flat_map(step_id: str, up: Stream[X], mapper: Callable[[X], Iterable[Y]]) -> Stream[Y]: ...
108
def branch(step_id: str, up: Stream[X], predicate: Callable[[X], bool]) -> BranchOut[X, X]: ...
109
def merge(step_id: str, *ups: Stream[Any]) -> Stream[Any]: ...
110
```
111
112
[Stream Processing Operators](./operators.md)
113
114
### Stateful Processing
115
116
Advanced operators for maintaining state across events, including reduce, fold, join, and custom stateful transformations. These operators enable complex event processing patterns like aggregations, session tracking, and multi-stream correlations.
117
118
```python { .api }
119
def stateful_map(step_id: str, up: KeyedStream[V], mapper: Callable[[Optional[S], V], Tuple[Optional[S], W]]) -> KeyedStream[W]: ...
120
def stateful_flat_map(step_id: str, up: KeyedStream[V], mapper: Callable[[Optional[S], V], Tuple[Optional[S], Iterable[W]]]) -> KeyedStream[W]: ...
121
def reduce_final(step_id: str, up: KeyedStream[V], reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...
122
def fold_final(step_id: str, up: KeyedStream[V], builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...
123
def join(step_id: str, *sides: KeyedStream[Any], insert_mode: JoinInsertMode = "last", emit_mode: JoinEmitMode = "complete") -> KeyedStream[Tuple]: ...
124
def collect(step_id: str, up: KeyedStream[V], timeout: timedelta, max_size: int) -> KeyedStream[List[V]]: ...
125
126
class StatefulLogic[V, W, S]:
127
def on_item(self, value: V) -> Tuple[Iterable[W], bool]: ...
128
def on_notify(self) -> Tuple[Iterable[W], bool]: ...
129
def on_eof(self) -> Tuple[Iterable[W], bool]: ...
130
def notify_at(self) -> Optional[datetime]: ...
131
def snapshot(self) -> S: ...
132
133
JoinInsertMode = Literal["first", "last", "product"]
134
JoinEmitMode = Literal["complete", "final", "running"]
135
```
136
137
[Stateful Processing](./stateful.md)
138
139
### Windowing Operations
140
141
Time-based windowing operators for processing streams in temporal buckets. Supports tumbling, sliding, and session windows with various aggregation functions for real-time analytics and temporal pattern detection.
142
143
```python { .api }
144
def collect_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower) -> KeyedStream[List[V]]: ...
145
def fold_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...
146
def reduce_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...
147
def count_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, key: Callable[[V], str]) -> KeyedStream[int]: ...
148
def max_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...
149
def min_window(step_id: str, up: KeyedStream[V], clock: Clock, windower: Windower, by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...
150
151
class SystemClock:
152
def __init__(self, ts_getter: Callable[[V], datetime], wait_for_system_duration: timedelta = timedelta(0)): ...
153
154
class EventClock[V]:
155
def __init__(self, ts_getter: Callable[[V], datetime], wait_for_system_duration: timedelta = timedelta(0)): ...
156
157
class TumblingWindower:
158
def __init__(self, length: timedelta): ...
159
160
class SlidingWindower:
161
def __init__(self, length: timedelta, offset: timedelta): ...
162
163
class SessionWindower:
164
def __init__(self, gap: timedelta): ...
165
166
class WindowMetadata:
167
open_time: datetime
168
close_time: datetime
169
```
170
171
[Windowing Operations](./windowing.md)
172
173
### Input Sources
174
175
Interfaces and implementations for reading data from external systems. Includes abstract base classes for building custom sources and built-in sources for common data systems.
176
177
```python { .api }
178
class Source[X]: ...
179
class StatefulSourcePartition[X, S]: ...
180
class StatelessSourcePartition[X]: ...
181
class FixedPartitionedSource[X, S]: ...
182
class DynamicSource[X]: ...
183
```
184
185
[Input Sources](./sources.md)
186
187
### Output Sinks
188
189
Interfaces and implementations for writing data to external systems. Includes abstract base classes for building custom sinks and patterns for exactly-once output delivery.
190
191
```python { .api }
192
class Sink[X]: ...
193
class StatefulSinkPartition[X, S]: ...
194
class StatelessSinkPartition[X]: ...
195
class FixedPartitionedSink[X, S]: ...
196
class DynamicSink[X]: ...
197
```
198
199
[Output Sinks](./sinks.md)
200
201
### Built-in Connectors
202
203
Pre-built connectors for common external systems including Kafka, files, stdio, and demo sources. These connectors provide production-ready integration with popular data systems.
204
205
```python { .api }
206
# Kafka connectors
207
class KafkaSource:
208
def __init__(self, brokers: List[str], topics: List[str], **kwargs): ...
209
210
class KafkaSink:
211
def __init__(self, brokers: List[str], topic: str, **kwargs): ...
212
213
class KafkaSourceMessage[K, V]:
214
key: K
215
value: V
216
timestamp: datetime
217
partition: int
218
offset: int
219
220
class KafkaSinkMessage[K, V]:
221
key: K
222
value: V
223
224
# File connectors
225
class FileSource:
226
def __init__(self, path: str, **kwargs): ...
227
228
class FileSink:
229
def __init__(self, path: str, **kwargs): ...
230
231
class CSVSource:
232
def __init__(self, path: str, **kwargs): ...
233
234
class DirSource:
235
def __init__(self, dir_path: str, **kwargs): ...
236
237
class DirSink:
238
def __init__(self, dir_path: str, **kwargs): ...
239
240
# Stdio connectors
241
class StdOutSink: ...
242
class StdInSource: ...
243
244
# Demo connectors
245
class RandomMetricSource:
246
def __init__(self, **kwargs): ...
247
```
248
249
[Built-in Connectors](./connectors.md)
250
251
### Runtime and Execution
252
253
Functions and classes for executing dataflows in various environments, from single-threaded testing to distributed production clusters. Includes configuration for worker processes, recovery, and distributed coordination.
254
255
```python { .api }
256
def cli_main(flow: Dataflow, workers_per_process: int = 1, process_id: Optional[int] = None, addresses: Optional[List[str]] = None, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...
257
258
def run_main(flow: Dataflow, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...
259
260
def cluster_main(flow: Dataflow, addresses: List[str], proc_id: int, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None, worker_count_per_proc: int = 1): ...
261
```
262
263
[Runtime and Execution](./runtime.md)
264
265
### Testing Utilities
266
267
Tools and utilities for testing dataflows including test sources, sinks, and execution helpers. Provides deterministic testing environments and data capture utilities for validating stream processing logic.
268
269
```python { .api }
270
class TestingSource[X]:
271
def __init__(self, data: Iterable[X]): ...
272
class EOF: ...
273
class ABORT: ...
274
class PAUSE:
275
def __init__(self, duration: timedelta): ...
276
277
class TestingSink[X]:
278
def __init__(self): ...
279
def get_output(self) -> List[X]: ...
280
281
class TimeTestingGetter:
282
def __init__(self, now: datetime): ...
283
def advance(self, td: timedelta) -> None: ...
284
def now(self) -> datetime: ...
285
286
def run_main(flow: Dataflow, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None) -> None: ...
287
def cluster_main(flow: Dataflow, addresses: List[str], proc_id: int, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None, worker_count_per_proc: int = 1) -> None: ...
288
def poll_next_batch(partition, timeout: timedelta) -> List[Any]: ...
289
```
290
291
[Testing Utilities](./testing.md)
292
293
### State Recovery
294
295
Recovery mechanisms for fault tolerance including state snapshotting, partition management, and resume capabilities. Enables exactly-once processing guarantees in distributed environments.
296
297
```python { .api }
298
class RecoveryConfig:
299
def __init__(self, db_dir: Path, backup_interval: Optional[timedelta] = None): ...
300
301
def init_db_dir(db_dir: Path, count: int): ...
302
303
class InconsistentPartitionsError(ValueError): ...
304
class MissingPartitionsError(FileNotFoundError): ...
305
class NoPartitionsError(FileNotFoundError): ...
306
```
307
308
[State Recovery](./recovery.md)
309
310
### Tracing and Monitoring
311
312
Logging, tracing, and monitoring capabilities for observing dataflow execution. Supports integration with Jaeger, OpenTelemetry, and other observability platforms for production monitoring.
313
314
```python { .api }
315
class TracingConfig: ...
316
317
class JaegerConfig(TracingConfig):
318
def __init__(self, service_name: str, endpoint: Optional[str] = None, sampling_ratio: float = 1.0): ...
319
320
class OtlpTracingConfig(TracingConfig):
321
def __init__(self, service_name: str, url: Optional[str] = None, sampling_ratio: float = 1.0): ...
322
323
def setup_tracing(tracing_config: Optional[TracingConfig] = None, log_level: Optional[str] = None): ...
324
```
325
326
[Tracing and Monitoring](./tracing.md)