CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-bytewax

Python Stateful Stream Processing Framework

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

runtime.mddocs/

Runtime and Execution

Functions and classes for executing dataflows in various environments, from single-threaded testing to distributed production clusters. The runtime handles worker coordination, state management, and fault recovery.

Capabilities

Execution Functions

Core functions for running dataflows in different execution modes.

def cli_main(flow: Dataflow, workers_per_process: int = 1, process_id: Optional[int] = None, addresses: Optional[List[str]] = None, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...

def run_main(flow: Dataflow, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...

def cluster_main(flow: Dataflow, addresses: List[str], proc_id: int, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None, worker_count_per_proc: int = 1): ...

cli_main Parameters:

  • flow (Dataflow): Dataflow to execute
  • workers_per_process (int): Number of worker threads per process
  • process_id (int): Process ID in cluster (None for single process)
  • addresses (List[str]): Addresses of all processes in cluster
  • epoch_interval (timedelta): Duration of each processing epoch
  • recovery_config (RecoveryConfig): State recovery configuration

run_main Parameters:

  • flow (Dataflow): Dataflow to execute in single thread
  • epoch_interval (timedelta): Duration of each processing epoch
  • recovery_config (RecoveryConfig): State recovery configuration

cluster_main Parameters:

  • flow (Dataflow): Dataflow to execute
  • addresses (List[str]): Host:port addresses of all cluster processes
  • proc_id (int): Index of this process in cluster (0-based)
  • epoch_interval (timedelta): Duration of each processing epoch
  • recovery_config (RecoveryConfig): State recovery configuration
  • worker_count_per_proc (int): Worker threads per process

Usage Examples:

from bytewax.dataflow import Dataflow
from bytewax.recovery import RecoveryConfig
from bytewax._bytewax import cli_main, run_main, cluster_main
from datetime import timedelta
from pathlib import Path

# Single-threaded execution (for testing)
flow = Dataflow("test_flow")
# ... build dataflow ...
run_main(flow)

# Multi-worker single process
cli_main(flow, workers_per_process=4)

# Distributed cluster execution
addresses = ["worker1:9999", "worker2:9999", "worker3:9999"]
recovery = RecoveryConfig(Path("/data/recovery"), timedelta(minutes=5))

# Run on worker 0
cluster_main(
    flow, 
    addresses=addresses, 
    proc_id=0,
    epoch_interval=timedelta(seconds=10),
    recovery_config=recovery,
    worker_count_per_proc=2
)

Command Line Interface

When using python -m bytewax.run, the CLI provides additional options for dataflow execution.

CLI Usage:

# Single process execution
python -m bytewax.run my_module:flow

# Multi-worker execution
python -m bytewax.run -w4 my_module:flow

# Distributed execution
python -m bytewax.run -w2 -i0 -a worker1:9999 -a worker2:9999 my_module:flow

# With recovery
python -m bytewax.run -w2 --recovery-dir /data/recovery my_module:flow

# Custom epoch interval
python -m bytewax.run --epoch-interval 5s my_module:flow

CLI Parameters:

  • -w, --workers-per-process: Number of worker threads
  • -i, --process-id: Process ID in cluster
  • -a, --addresses: Process addresses (repeat for each)
  • --recovery-dir: Directory for recovery state
  • --epoch-interval: Duration of each epoch (e.g., 10s, 2m)

Dataflow Location

The runtime can locate and import dataflows from Python modules.

def _locate_dataflow(module_name: str, dataflow_name: str): ...

Parameters:

  • module_name (str): Python module containing the dataflow
  • dataflow_name (str): Name of dataflow variable or function

Usage Pattern:

# my_flows.py
from bytewax.dataflow import Dataflow
import bytewax.operators as op

# Dataflow as variable
my_flow = Dataflow("my_flow")
# ... build dataflow ...

# Dataflow as function
def create_flow():
    flow = Dataflow("dynamic_flow")
    # ... build dataflow ...
    return flow

# Run with: python -m bytewax.run my_flows:my_flow
# Or: python -m bytewax.run my_flows:create_flow

Execution Patterns

Development/Testing Pattern:

from bytewax.testing import run_main

# Simple test execution
flow = Dataflow("test")
# ... build dataflow ...

# Run synchronously in current thread
run_main(flow)

Production Single-Node Pattern:

from bytewax._bytewax import cli_main
from bytewax.recovery import RecoveryConfig

# Production single-node with recovery
recovery_config = RecoveryConfig(
    db_dir=Path("/data/bytewax/recovery"),
    backup_interval=timedelta(minutes=5)
)

cli_main(
    flow,
    workers_per_process=8,  # Use all CPU cores
    epoch_interval=timedelta(seconds=10),
    recovery_config=recovery_config
)

Production Distributed Pattern:

import os
from bytewax._bytewax import cluster_main

# Configuration from environment
addresses = os.environ["BYTEWAX_ADDRESSES"].split(",")
process_id = int(os.environ["BYTEWAX_PROCESS_ID"])
workers_per_proc = int(os.environ.get("BYTEWAX_WORKERS", "4"))

recovery_config = RecoveryConfig(
    db_dir=Path(os.environ["BYTEWAX_RECOVERY_DIR"]),
    backup_interval=timedelta(minutes=10)
)

cluster_main(
    flow,
    addresses=addresses,
    proc_id=process_id,
    worker_count_per_proc=workers_per_proc,
    epoch_interval=timedelta(seconds=5),
    recovery_config=recovery_config
)

Kubernetes Deployment Pattern:

import socket
from pathlib import Path

def get_k8s_config():
    # Get configuration from Kubernetes environment
    pod_name = os.environ["HOSTNAME"]  # Pod name
    service_name = os.environ["BYTEWAX_SERVICE_NAME"]
    replica_count = int(os.environ["BYTEWAX_REPLICAS"])
    
    # Build addresses list
    addresses = []
    for i in range(replica_count):
        hostname = f"{service_name}-{i}.{service_name}"
        addresses.append(f"{hostname}:9999")
    
    # Extract process ID from pod name
    process_id = int(pod_name.split("-")[-1])
    
    return addresses, process_id

# Use in Kubernetes pod
addresses, proc_id = get_k8s_config()

cluster_main(
    flow,
    addresses=addresses,
    proc_id=proc_id,
    recovery_config=RecoveryConfig(Path("/data/recovery")),
    worker_count_per_proc=2
)

Performance Tuning

Epoch Interval Tuning:

# Short epochs for low latency (higher overhead)
run_main(flow, epoch_interval=timedelta(milliseconds=100))

# Long epochs for high throughput (higher latency)
run_main(flow, epoch_interval=timedelta(seconds=30))

# Adaptive based on data rate
def get_epoch_interval():
    if high_volume_period():
        return timedelta(seconds=5)
    else:
        return timedelta(milliseconds=500)

Worker Configuration:

import multiprocessing

# Use all CPU cores
cpu_count = multiprocessing.cpu_count()
cli_main(flow, workers_per_process=cpu_count)

# Leave some cores for system
cli_main(flow, workers_per_process=max(1, cpu_count - 2))

# I/O bound workloads can use more workers than cores
cli_main(flow, workers_per_process=cpu_count * 2)

Memory and Resource Management:

# Monitor resource usage in production
import psutil
import logging

def log_resource_usage():
    process = psutil.Process()
    memory_mb = process.memory_info().rss / 1024 / 1024
    cpu_percent = process.cpu_percent()
    
    logging.info(f"Memory: {memory_mb:.1f} MB, CPU: {cpu_percent:.1f}%")

# Call periodically during execution

Error Handling and Monitoring

Graceful Shutdown:

import signal
import sys

def signal_handler(signum, frame):
    logging.info("Received shutdown signal, stopping dataflow...")
    # Bytewax runtime handles graceful shutdown automatically
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

# Run dataflow
cli_main(flow)

Health Checks:

# Health check endpoint for orchestrators
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading

class HealthHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == "/health":
            self.send_response(200)
            self.end_headers()
            self.wfile.write(b"OK")
        else:
            self.send_response(404)
            self.end_headers()

def start_health_server():
    server = HTTPServer(("0.0.0.0", 8080), HealthHandler)
    thread = threading.Thread(target=server.serve_forever)
    thread.daemon = True
    thread.start()
    return server

# Start health server before dataflow
health_server = start_health_server()
cli_main(flow)

Install with Tessl CLI

npx tessl i tessl/pypi-bytewax

docs

connectors.md

dataflow.md

index.md

operators.md

recovery.md

runtime.md

sinks.md

sources.md

stateful.md

testing.md

tracing.md

windowing.md

tile.json