Python Stateful Stream Processing Framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Functions and classes for executing dataflows in various environments, from single-threaded testing to distributed production clusters. The runtime handles worker coordination, state management, and fault recovery.
Core functions for running dataflows in different execution modes.
def cli_main(flow: Dataflow, workers_per_process: int = 1, process_id: Optional[int] = None, addresses: Optional[List[str]] = None, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...
def run_main(flow: Dataflow, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...
def cluster_main(flow: Dataflow, addresses: List[str], proc_id: int, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None, worker_count_per_proc: int = 1): ...cli_main Parameters:
flow (Dataflow): Dataflow to executeworkers_per_process (int): Number of worker threads per processprocess_id (int): Process ID in cluster (None for single process)addresses (List[str]): Addresses of all processes in clusterepoch_interval (timedelta): Duration of each processing epochrecovery_config (RecoveryConfig): State recovery configurationrun_main Parameters:
flow (Dataflow): Dataflow to execute in single threadepoch_interval (timedelta): Duration of each processing epochrecovery_config (RecoveryConfig): State recovery configurationcluster_main Parameters:
flow (Dataflow): Dataflow to executeaddresses (List[str]): Host:port addresses of all cluster processesproc_id (int): Index of this process in cluster (0-based)epoch_interval (timedelta): Duration of each processing epochrecovery_config (RecoveryConfig): State recovery configurationworker_count_per_proc (int): Worker threads per processUsage Examples:
from bytewax.dataflow import Dataflow
from bytewax.recovery import RecoveryConfig
from bytewax._bytewax import cli_main, run_main, cluster_main
from datetime import timedelta
from pathlib import Path
# Single-threaded execution (for testing)
flow = Dataflow("test_flow")
# ... build dataflow ...
run_main(flow)
# Multi-worker single process
cli_main(flow, workers_per_process=4)
# Distributed cluster execution
addresses = ["worker1:9999", "worker2:9999", "worker3:9999"]
recovery = RecoveryConfig(Path("/data/recovery"), timedelta(minutes=5))
# Run on worker 0
cluster_main(
flow,
addresses=addresses,
proc_id=0,
epoch_interval=timedelta(seconds=10),
recovery_config=recovery,
worker_count_per_proc=2
)When using python -m bytewax.run, the CLI provides additional options for dataflow execution.
CLI Usage:
# Single process execution
python -m bytewax.run my_module:flow
# Multi-worker execution
python -m bytewax.run -w4 my_module:flow
# Distributed execution
python -m bytewax.run -w2 -i0 -a worker1:9999 -a worker2:9999 my_module:flow
# With recovery
python -m bytewax.run -w2 --recovery-dir /data/recovery my_module:flow
# Custom epoch interval
python -m bytewax.run --epoch-interval 5s my_module:flowCLI Parameters:
-w, --workers-per-process: Number of worker threads-i, --process-id: Process ID in cluster-a, --addresses: Process addresses (repeat for each)--recovery-dir: Directory for recovery state--epoch-interval: Duration of each epoch (e.g., 10s, 2m)The runtime can locate and import dataflows from Python modules.
def _locate_dataflow(module_name: str, dataflow_name: str): ...Parameters:
module_name (str): Python module containing the dataflowdataflow_name (str): Name of dataflow variable or functionUsage Pattern:
# my_flows.py
from bytewax.dataflow import Dataflow
import bytewax.operators as op
# Dataflow as variable
my_flow = Dataflow("my_flow")
# ... build dataflow ...
# Dataflow as function
def create_flow():
flow = Dataflow("dynamic_flow")
# ... build dataflow ...
return flow
# Run with: python -m bytewax.run my_flows:my_flow
# Or: python -m bytewax.run my_flows:create_flowDevelopment/Testing Pattern:
from bytewax.testing import run_main
# Simple test execution
flow = Dataflow("test")
# ... build dataflow ...
# Run synchronously in current thread
run_main(flow)Production Single-Node Pattern:
from bytewax._bytewax import cli_main
from bytewax.recovery import RecoveryConfig
# Production single-node with recovery
recovery_config = RecoveryConfig(
db_dir=Path("/data/bytewax/recovery"),
backup_interval=timedelta(minutes=5)
)
cli_main(
flow,
workers_per_process=8, # Use all CPU cores
epoch_interval=timedelta(seconds=10),
recovery_config=recovery_config
)Production Distributed Pattern:
import os
from bytewax._bytewax import cluster_main
# Configuration from environment
addresses = os.environ["BYTEWAX_ADDRESSES"].split(",")
process_id = int(os.environ["BYTEWAX_PROCESS_ID"])
workers_per_proc = int(os.environ.get("BYTEWAX_WORKERS", "4"))
recovery_config = RecoveryConfig(
db_dir=Path(os.environ["BYTEWAX_RECOVERY_DIR"]),
backup_interval=timedelta(minutes=10)
)
cluster_main(
flow,
addresses=addresses,
proc_id=process_id,
worker_count_per_proc=workers_per_proc,
epoch_interval=timedelta(seconds=5),
recovery_config=recovery_config
)Kubernetes Deployment Pattern:
import socket
from pathlib import Path
def get_k8s_config():
# Get configuration from Kubernetes environment
pod_name = os.environ["HOSTNAME"] # Pod name
service_name = os.environ["BYTEWAX_SERVICE_NAME"]
replica_count = int(os.environ["BYTEWAX_REPLICAS"])
# Build addresses list
addresses = []
for i in range(replica_count):
hostname = f"{service_name}-{i}.{service_name}"
addresses.append(f"{hostname}:9999")
# Extract process ID from pod name
process_id = int(pod_name.split("-")[-1])
return addresses, process_id
# Use in Kubernetes pod
addresses, proc_id = get_k8s_config()
cluster_main(
flow,
addresses=addresses,
proc_id=proc_id,
recovery_config=RecoveryConfig(Path("/data/recovery")),
worker_count_per_proc=2
)Epoch Interval Tuning:
# Short epochs for low latency (higher overhead)
run_main(flow, epoch_interval=timedelta(milliseconds=100))
# Long epochs for high throughput (higher latency)
run_main(flow, epoch_interval=timedelta(seconds=30))
# Adaptive based on data rate
def get_epoch_interval():
if high_volume_period():
return timedelta(seconds=5)
else:
return timedelta(milliseconds=500)Worker Configuration:
import multiprocessing
# Use all CPU cores
cpu_count = multiprocessing.cpu_count()
cli_main(flow, workers_per_process=cpu_count)
# Leave some cores for system
cli_main(flow, workers_per_process=max(1, cpu_count - 2))
# I/O bound workloads can use more workers than cores
cli_main(flow, workers_per_process=cpu_count * 2)Memory and Resource Management:
# Monitor resource usage in production
import psutil
import logging
def log_resource_usage():
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
cpu_percent = process.cpu_percent()
logging.info(f"Memory: {memory_mb:.1f} MB, CPU: {cpu_percent:.1f}%")
# Call periodically during executionGraceful Shutdown:
import signal
import sys
def signal_handler(signum, frame):
logging.info("Received shutdown signal, stopping dataflow...")
# Bytewax runtime handles graceful shutdown automatically
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Run dataflow
cli_main(flow)Health Checks:
# Health check endpoint for orchestrators
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
class HealthHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/health":
self.send_response(200)
self.end_headers()
self.wfile.write(b"OK")
else:
self.send_response(404)
self.end_headers()
def start_health_server():
server = HTTPServer(("0.0.0.0", 8080), HealthHandler)
thread = threading.Thread(target=server.serve_forever)
thread.daemon = True
thread.start()
return server
# Start health server before dataflow
health_server = start_health_server()
cli_main(flow)Install with Tessl CLI
npx tessl i tessl/pypi-bytewax