0
# Runtime and Execution
1
2
Functions and classes for executing dataflows in various environments, from single-threaded testing to distributed production clusters. The runtime handles worker coordination, state management, and fault recovery.
3
4
## Capabilities
5
6
### Execution Functions
7
8
Core functions for running dataflows in different execution modes.
9
10
```python { .api }
11
def cli_main(flow: Dataflow, workers_per_process: int = 1, process_id: Optional[int] = None, addresses: Optional[List[str]] = None, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...
12
13
def run_main(flow: Dataflow, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...
14
15
def cluster_main(flow: Dataflow, addresses: List[str], proc_id: int, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None, worker_count_per_proc: int = 1): ...
16
```
17
18
**cli_main Parameters:**
19
- `flow` (Dataflow): Dataflow to execute
20
- `workers_per_process` (int): Number of worker threads per process
21
- `process_id` (int): Process ID in cluster (None for single process)
22
- `addresses` (List[str]): Addresses of all processes in cluster
23
- `epoch_interval` (timedelta): Duration of each processing epoch
24
- `recovery_config` (RecoveryConfig): State recovery configuration
25
26
**run_main Parameters:**
27
- `flow` (Dataflow): Dataflow to execute in single thread
28
- `epoch_interval` (timedelta): Duration of each processing epoch
29
- `recovery_config` (RecoveryConfig): State recovery configuration
30
31
**cluster_main Parameters:**
32
- `flow` (Dataflow): Dataflow to execute
33
- `addresses` (List[str]): Host:port addresses of all cluster processes
34
- `proc_id` (int): Index of this process in cluster (0-based)
35
- `epoch_interval` (timedelta): Duration of each processing epoch
36
- `recovery_config` (RecoveryConfig): State recovery configuration
37
- `worker_count_per_proc` (int): Worker threads per process
38
39
**Usage Examples:**
40
```python
41
from bytewax.dataflow import Dataflow
42
from bytewax.recovery import RecoveryConfig
43
from bytewax._bytewax import cli_main, run_main, cluster_main
44
from datetime import timedelta
45
from pathlib import Path
46
47
# Single-threaded execution (for testing)
48
flow = Dataflow("test_flow")
49
# ... build dataflow ...
50
run_main(flow)
51
52
# Multi-worker single process
53
cli_main(flow, workers_per_process=4)
54
55
# Distributed cluster execution
56
addresses = ["worker1:9999", "worker2:9999", "worker3:9999"]
57
recovery = RecoveryConfig(Path("/data/recovery"), timedelta(minutes=5))
58
59
# Run on worker 0
60
cluster_main(
61
flow,
62
addresses=addresses,
63
proc_id=0,
64
epoch_interval=timedelta(seconds=10),
65
recovery_config=recovery,
66
worker_count_per_proc=2
67
)
68
```
69
70
### Command Line Interface
71
72
When using `python -m bytewax.run`, the CLI provides additional options for dataflow execution.
73
74
**CLI Usage:**
75
```bash
76
# Single process execution
77
python -m bytewax.run my_module:flow
78
79
# Multi-worker execution
80
python -m bytewax.run -w4 my_module:flow
81
82
# Distributed execution
83
python -m bytewax.run -w2 -i0 -a worker1:9999 -a worker2:9999 my_module:flow
84
85
# With recovery
86
python -m bytewax.run -w2 --recovery-dir /data/recovery my_module:flow
87
88
# Custom epoch interval
89
python -m bytewax.run --epoch-interval 5s my_module:flow
90
```
91
92
**CLI Parameters:**
93
- `-w, --workers-per-process`: Number of worker threads
94
- `-i, --process-id`: Process ID in cluster
95
- `-a, --addresses`: Process addresses (repeat for each)
96
- `--recovery-dir`: Directory for recovery state
97
- `--epoch-interval`: Duration of each epoch (e.g., 10s, 2m)
98
99
### Dataflow Location
100
101
The runtime can locate and import dataflows from Python modules.
102
103
```python { .api }
104
def _locate_dataflow(module_name: str, dataflow_name: str): ...
105
```
106
107
**Parameters:**
108
- `module_name` (str): Python module containing the dataflow
109
- `dataflow_name` (str): Name of dataflow variable or function
110
111
**Usage Pattern:**
112
```python
113
# my_flows.py
114
from bytewax.dataflow import Dataflow
115
import bytewax.operators as op
116
117
# Dataflow as variable
118
my_flow = Dataflow("my_flow")
119
# ... build dataflow ...
120
121
# Dataflow as function
122
def create_flow():
123
flow = Dataflow("dynamic_flow")
124
# ... build dataflow ...
125
return flow
126
127
# Run with: python -m bytewax.run my_flows:my_flow
128
# Or: python -m bytewax.run my_flows:create_flow
129
```
130
131
### Execution Patterns
132
133
**Development/Testing Pattern:**
134
```python
135
from bytewax.testing import run_main
136
137
# Simple test execution
138
flow = Dataflow("test")
139
# ... build dataflow ...
140
141
# Run synchronously in current thread
142
run_main(flow)
143
```
144
145
**Production Single-Node Pattern:**
146
```python
147
from bytewax._bytewax import cli_main
148
from bytewax.recovery import RecoveryConfig
149
150
# Production single-node with recovery
151
recovery_config = RecoveryConfig(
152
db_dir=Path("/data/bytewax/recovery"),
153
backup_interval=timedelta(minutes=5)
154
)
155
156
cli_main(
157
flow,
158
workers_per_process=8, # Use all CPU cores
159
epoch_interval=timedelta(seconds=10),
160
recovery_config=recovery_config
161
)
162
```
163
164
**Production Distributed Pattern:**
165
```python
166
import os
167
from bytewax._bytewax import cluster_main
168
169
# Configuration from environment
170
addresses = os.environ["BYTEWAX_ADDRESSES"].split(",")
171
process_id = int(os.environ["BYTEWAX_PROCESS_ID"])
172
workers_per_proc = int(os.environ.get("BYTEWAX_WORKERS", "4"))
173
174
recovery_config = RecoveryConfig(
175
db_dir=Path(os.environ["BYTEWAX_RECOVERY_DIR"]),
176
backup_interval=timedelta(minutes=10)
177
)
178
179
cluster_main(
180
flow,
181
addresses=addresses,
182
proc_id=process_id,
183
worker_count_per_proc=workers_per_proc,
184
epoch_interval=timedelta(seconds=5),
185
recovery_config=recovery_config
186
)
187
```
188
189
**Kubernetes Deployment Pattern:**
190
```python
191
import socket
192
from pathlib import Path
193
194
def get_k8s_config():
195
# Get configuration from Kubernetes environment
196
pod_name = os.environ["HOSTNAME"] # Pod name
197
service_name = os.environ["BYTEWAX_SERVICE_NAME"]
198
replica_count = int(os.environ["BYTEWAX_REPLICAS"])
199
200
# Build addresses list
201
addresses = []
202
for i in range(replica_count):
203
hostname = f"{service_name}-{i}.{service_name}"
204
addresses.append(f"{hostname}:9999")
205
206
# Extract process ID from pod name
207
process_id = int(pod_name.split("-")[-1])
208
209
return addresses, process_id
210
211
# Use in Kubernetes pod
212
addresses, proc_id = get_k8s_config()
213
214
cluster_main(
215
flow,
216
addresses=addresses,
217
proc_id=proc_id,
218
recovery_config=RecoveryConfig(Path("/data/recovery")),
219
worker_count_per_proc=2
220
)
221
```
222
223
### Performance Tuning
224
225
**Epoch Interval Tuning:**
226
```python
227
# Short epochs for low latency (higher overhead)
228
run_main(flow, epoch_interval=timedelta(milliseconds=100))
229
230
# Long epochs for high throughput (higher latency)
231
run_main(flow, epoch_interval=timedelta(seconds=30))
232
233
# Adaptive based on data rate
234
def get_epoch_interval():
235
if high_volume_period():
236
return timedelta(seconds=5)
237
else:
238
return timedelta(milliseconds=500)
239
```
240
241
**Worker Configuration:**
242
```python
243
import multiprocessing
244
245
# Use all CPU cores
246
cpu_count = multiprocessing.cpu_count()
247
cli_main(flow, workers_per_process=cpu_count)
248
249
# Leave some cores for system
250
cli_main(flow, workers_per_process=max(1, cpu_count - 2))
251
252
# I/O bound workloads can use more workers than cores
253
cli_main(flow, workers_per_process=cpu_count * 2)
254
```
255
256
**Memory and Resource Management:**
257
```python
258
# Monitor resource usage in production
259
import psutil
260
import logging
261
262
def log_resource_usage():
263
process = psutil.Process()
264
memory_mb = process.memory_info().rss / 1024 / 1024
265
cpu_percent = process.cpu_percent()
266
267
logging.info(f"Memory: {memory_mb:.1f} MB, CPU: {cpu_percent:.1f}%")
268
269
# Call periodically during execution
270
```
271
272
### Error Handling and Monitoring
273
274
**Graceful Shutdown:**
275
```python
276
import signal
277
import sys
278
279
def signal_handler(signum, frame):
280
logging.info("Received shutdown signal, stopping dataflow...")
281
# Bytewax runtime handles graceful shutdown automatically
282
sys.exit(0)
283
284
signal.signal(signal.SIGINT, signal_handler)
285
signal.signal(signal.SIGTERM, signal_handler)
286
287
# Run dataflow
288
cli_main(flow)
289
```
290
291
**Health Checks:**
292
```python
293
# Health check endpoint for orchestrators
294
from http.server import HTTPServer, BaseHTTPRequestHandler
295
import threading
296
297
class HealthHandler(BaseHTTPRequestHandler):
298
def do_GET(self):
299
if self.path == "/health":
300
self.send_response(200)
301
self.end_headers()
302
self.wfile.write(b"OK")
303
else:
304
self.send_response(404)
305
self.end_headers()
306
307
def start_health_server():
308
server = HTTPServer(("0.0.0.0", 8080), HealthHandler)
309
thread = threading.Thread(target=server.serve_forever)
310
thread.daemon = True
311
thread.start()
312
return server
313
314
# Start health server before dataflow
315
health_server = start_health_server()
316
cli_main(flow)
317
```