or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connectors.mddataflow.mdindex.mdoperators.mdrecovery.mdruntime.mdsinks.mdsources.mdstateful.mdtesting.mdtracing.mdwindowing.md

runtime.mddocs/

0

# Runtime and Execution

1

2

Functions and classes for executing dataflows in various environments, from single-threaded testing to distributed production clusters. The runtime handles worker coordination, state management, and fault recovery.

3

4

## Capabilities

5

6

### Execution Functions

7

8

Core functions for running dataflows in different execution modes.

9

10

```python { .api }

11

def cli_main(flow: Dataflow, workers_per_process: int = 1, process_id: Optional[int] = None, addresses: Optional[List[str]] = None, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...

12

13

def run_main(flow: Dataflow, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...

14

15

def cluster_main(flow: Dataflow, addresses: List[str], proc_id: int, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None, worker_count_per_proc: int = 1): ...

16

```

17

18

**cli_main Parameters:**

19

- `flow` (Dataflow): Dataflow to execute

20

- `workers_per_process` (int): Number of worker threads per process

21

- `process_id` (int): Process ID in cluster (None for single process)

22

- `addresses` (List[str]): Addresses of all processes in cluster

23

- `epoch_interval` (timedelta): Duration of each processing epoch

24

- `recovery_config` (RecoveryConfig): State recovery configuration

25

26

**run_main Parameters:**

27

- `flow` (Dataflow): Dataflow to execute in single thread

28

- `epoch_interval` (timedelta): Duration of each processing epoch

29

- `recovery_config` (RecoveryConfig): State recovery configuration

30

31

**cluster_main Parameters:**

32

- `flow` (Dataflow): Dataflow to execute

33

- `addresses` (List[str]): Host:port addresses of all cluster processes

34

- `proc_id` (int): Index of this process in cluster (0-based)

35

- `epoch_interval` (timedelta): Duration of each processing epoch

36

- `recovery_config` (RecoveryConfig): State recovery configuration

37

- `worker_count_per_proc` (int): Worker threads per process

38

39

**Usage Examples:**

40

```python

41

from bytewax.dataflow import Dataflow

42

from bytewax.recovery import RecoveryConfig

43

from bytewax._bytewax import cli_main, run_main, cluster_main

44

from datetime import timedelta

45

from pathlib import Path

46

47

# Single-threaded execution (for testing)

48

flow = Dataflow("test_flow")

49

# ... build dataflow ...

50

run_main(flow)

51

52

# Multi-worker single process

53

cli_main(flow, workers_per_process=4)

54

55

# Distributed cluster execution

56

addresses = ["worker1:9999", "worker2:9999", "worker3:9999"]

57

recovery = RecoveryConfig(Path("/data/recovery"), timedelta(minutes=5))

58

59

# Run on worker 0

60

cluster_main(

61

flow,

62

addresses=addresses,

63

proc_id=0,

64

epoch_interval=timedelta(seconds=10),

65

recovery_config=recovery,

66

worker_count_per_proc=2

67

)

68

```

69

70

### Command Line Interface

71

72

When using `python -m bytewax.run`, the CLI provides additional options for dataflow execution.

73

74

**CLI Usage:**

75

```bash

76

# Single process execution

77

python -m bytewax.run my_module:flow

78

79

# Multi-worker execution

80

python -m bytewax.run -w4 my_module:flow

81

82

# Distributed execution

83

python -m bytewax.run -w2 -i0 -a worker1:9999 -a worker2:9999 my_module:flow

84

85

# With recovery

86

python -m bytewax.run -w2 --recovery-dir /data/recovery my_module:flow

87

88

# Custom epoch interval

89

python -m bytewax.run --epoch-interval 5s my_module:flow

90

```

91

92

**CLI Parameters:**

93

- `-w, --workers-per-process`: Number of worker threads

94

- `-i, --process-id`: Process ID in cluster

95

- `-a, --addresses`: Process addresses (repeat for each)

96

- `--recovery-dir`: Directory for recovery state

97

- `--epoch-interval`: Duration of each epoch (e.g., 10s, 2m)

98

99

### Dataflow Location

100

101

The runtime can locate and import dataflows from Python modules.

102

103

```python { .api }

104

def _locate_dataflow(module_name: str, dataflow_name: str): ...

105

```

106

107

**Parameters:**

108

- `module_name` (str): Python module containing the dataflow

109

- `dataflow_name` (str): Name of dataflow variable or function

110

111

**Usage Pattern:**

112

```python

113

# my_flows.py

114

from bytewax.dataflow import Dataflow

115

import bytewax.operators as op

116

117

# Dataflow as variable

118

my_flow = Dataflow("my_flow")

119

# ... build dataflow ...

120

121

# Dataflow as function

122

def create_flow():

123

flow = Dataflow("dynamic_flow")

124

# ... build dataflow ...

125

return flow

126

127

# Run with: python -m bytewax.run my_flows:my_flow

128

# Or: python -m bytewax.run my_flows:create_flow

129

```

130

131

### Execution Patterns

132

133

**Development/Testing Pattern:**

134

```python

135

from bytewax.testing import run_main

136

137

# Simple test execution

138

flow = Dataflow("test")

139

# ... build dataflow ...

140

141

# Run synchronously in current thread

142

run_main(flow)

143

```

144

145

**Production Single-Node Pattern:**

146

```python

147

from bytewax._bytewax import cli_main

148

from bytewax.recovery import RecoveryConfig

149

150

# Production single-node with recovery

151

recovery_config = RecoveryConfig(

152

db_dir=Path("/data/bytewax/recovery"),

153

backup_interval=timedelta(minutes=5)

154

)

155

156

cli_main(

157

flow,

158

workers_per_process=8, # Use all CPU cores

159

epoch_interval=timedelta(seconds=10),

160

recovery_config=recovery_config

161

)

162

```

163

164

**Production Distributed Pattern:**

165

```python

166

import os

167

from bytewax._bytewax import cluster_main

168

169

# Configuration from environment

170

addresses = os.environ["BYTEWAX_ADDRESSES"].split(",")

171

process_id = int(os.environ["BYTEWAX_PROCESS_ID"])

172

workers_per_proc = int(os.environ.get("BYTEWAX_WORKERS", "4"))

173

174

recovery_config = RecoveryConfig(

175

db_dir=Path(os.environ["BYTEWAX_RECOVERY_DIR"]),

176

backup_interval=timedelta(minutes=10)

177

)

178

179

cluster_main(

180

flow,

181

addresses=addresses,

182

proc_id=process_id,

183

worker_count_per_proc=workers_per_proc,

184

epoch_interval=timedelta(seconds=5),

185

recovery_config=recovery_config

186

)

187

```

188

189

**Kubernetes Deployment Pattern:**

190

```python

191

import socket

192

from pathlib import Path

193

194

def get_k8s_config():

195

# Get configuration from Kubernetes environment

196

pod_name = os.environ["HOSTNAME"] # Pod name

197

service_name = os.environ["BYTEWAX_SERVICE_NAME"]

198

replica_count = int(os.environ["BYTEWAX_REPLICAS"])

199

200

# Build addresses list

201

addresses = []

202

for i in range(replica_count):

203

hostname = f"{service_name}-{i}.{service_name}"

204

addresses.append(f"{hostname}:9999")

205

206

# Extract process ID from pod name

207

process_id = int(pod_name.split("-")[-1])

208

209

return addresses, process_id

210

211

# Use in Kubernetes pod

212

addresses, proc_id = get_k8s_config()

213

214

cluster_main(

215

flow,

216

addresses=addresses,

217

proc_id=proc_id,

218

recovery_config=RecoveryConfig(Path("/data/recovery")),

219

worker_count_per_proc=2

220

)

221

```

222

223

### Performance Tuning

224

225

**Epoch Interval Tuning:**

226

```python

227

# Short epochs for low latency (higher overhead)

228

run_main(flow, epoch_interval=timedelta(milliseconds=100))

229

230

# Long epochs for high throughput (higher latency)

231

run_main(flow, epoch_interval=timedelta(seconds=30))

232

233

# Adaptive based on data rate

234

def get_epoch_interval():

235

if high_volume_period():

236

return timedelta(seconds=5)

237

else:

238

return timedelta(milliseconds=500)

239

```

240

241

**Worker Configuration:**

242

```python

243

import multiprocessing

244

245

# Use all CPU cores

246

cpu_count = multiprocessing.cpu_count()

247

cli_main(flow, workers_per_process=cpu_count)

248

249

# Leave some cores for system

250

cli_main(flow, workers_per_process=max(1, cpu_count - 2))

251

252

# I/O bound workloads can use more workers than cores

253

cli_main(flow, workers_per_process=cpu_count * 2)

254

```

255

256

**Memory and Resource Management:**

257

```python

258

# Monitor resource usage in production

259

import psutil

260

import logging

261

262

def log_resource_usage():

263

process = psutil.Process()

264

memory_mb = process.memory_info().rss / 1024 / 1024

265

cpu_percent = process.cpu_percent()

266

267

logging.info(f"Memory: {memory_mb:.1f} MB, CPU: {cpu_percent:.1f}%")

268

269

# Call periodically during execution

270

```

271

272

### Error Handling and Monitoring

273

274

**Graceful Shutdown:**

275

```python

276

import signal

277

import sys

278

279

def signal_handler(signum, frame):

280

logging.info("Received shutdown signal, stopping dataflow...")

281

# Bytewax runtime handles graceful shutdown automatically

282

sys.exit(0)

283

284

signal.signal(signal.SIGINT, signal_handler)

285

signal.signal(signal.SIGTERM, signal_handler)

286

287

# Run dataflow

288

cli_main(flow)

289

```

290

291

**Health Checks:**

292

```python

293

# Health check endpoint for orchestrators

294

from http.server import HTTPServer, BaseHTTPRequestHandler

295

import threading

296

297

class HealthHandler(BaseHTTPRequestHandler):

298

def do_GET(self):

299

if self.path == "/health":

300

self.send_response(200)

301

self.end_headers()

302

self.wfile.write(b"OK")

303

else:

304

self.send_response(404)

305

self.end_headers()

306

307

def start_health_server():

308

server = HTTPServer(("0.0.0.0", 8080), HealthHandler)

309

thread = threading.Thread(target=server.serve_forever)

310

thread.daemon = True

311

thread.start()

312

return server

313

314

# Start health server before dataflow

315

health_server = start_health_server()

316

cli_main(flow)

317

```