CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-bytewax

Python Stateful Stream Processing Framework

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

recovery.mddocs/

State Recovery

Recovery mechanisms for fault tolerance including state snapshotting, partition management, and resume capabilities. Enables exactly-once processing guarantees in distributed environments through persistent state management.

Capabilities

Recovery Configuration

Configuration class for setting up state persistence and recovery behavior.

class RecoveryConfig:
    def __init__(self, db_dir: Path, backup_interval: Optional[timedelta] = None): ...
    
    @property
    def db_dir(self) -> Path: ...
    
    @property  
    def backup_interval(self) -> Optional[timedelta]: ...

Parameters:

  • db_dir (Path): Local filesystem directory for recovery database partitions
  • backup_interval (timedelta): Time to wait before permanently deleting old snapshots (default: zero duration)

Usage Examples:

from bytewax.recovery import RecoveryConfig
from pathlib import Path
from datetime import timedelta

# Basic recovery configuration
recovery_config = RecoveryConfig(
    db_dir=Path("/data/bytewax/recovery")
)

# With backup interval for archival systems
recovery_config = RecoveryConfig(
    db_dir=Path("/data/bytewax/recovery"),
    backup_interval=timedelta(hours=1)  # Keep old snapshots for 1 hour
)

# Use in dataflow execution
from bytewax._bytewax import cli_main

cli_main(
    flow,
    workers_per_process=4,
    recovery_config=recovery_config
)

Recovery Database Management

Functions for initializing and managing recovery partition databases.

def init_db_dir(db_dir: Path, count: int): ...

Parameters:

  • db_dir (Path): Directory to create recovery partitions in
  • count (int): Number of partitions to create

Usage Examples:

from bytewax.recovery import init_db_dir
from pathlib import Path

# Initialize recovery database with 4 partitions
recovery_dir = Path("/data/bytewax/recovery")
init_db_dir(recovery_dir, count=4)

# Command line usage
# python -m bytewax.recovery /data/recovery 4

Recovery Exceptions

Exceptions that can occur during recovery operations, indicating various failure modes.

class InconsistentPartitionsError(ValueError):
    """Raised when two recovery partitions are from very different times."""
    ...

class MissingPartitionsError(FileNotFoundError):
    """Raised when an incomplete set of recovery partitions is detected."""
    ...

class NoPartitionsError(FileNotFoundError):
    """Raised when no recovery partitions are found on any worker."""
    ...

InconsistentPartitionsError: Occurs when recovery partitions have timestamps that differ beyond the backup interval, indicating a backup/restore issue.

MissingPartitionsError: Occurs when some but not all expected recovery partitions are found, indicating incomplete backup restoration.

NoPartitionsError: Occurs when no recovery partitions are found at all, typically due to wrong directory path.

Usage Examples:

from bytewax.recovery import (
    InconsistentPartitionsError, 
    MissingPartitionsError, 
    NoPartitionsError
)

try:
    cli_main(flow, recovery_config=recovery_config)
except NoPartitionsError:
    print("No recovery data found - starting fresh")
    init_db_dir(recovery_config.db_dir, worker_count)
    cli_main(flow, recovery_config=recovery_config)
except MissingPartitionsError:
    print("Incomplete recovery data - check backup restoration")
    raise
except InconsistentPartitionsError:
    print("Recovery partitions are inconsistent - check backup timing")
    raise

Recovery Patterns

Production Recovery Setup:

import os
from pathlib import Path
from datetime import timedelta
from bytewax.recovery import RecoveryConfig, init_db_dir

def setup_recovery():
    """Set up recovery with environment-based configuration."""
    recovery_dir = Path(os.environ.get("BYTEWAX_RECOVERY_DIR", "/data/recovery"))
    worker_count = int(os.environ.get("BYTEWAX_WORKERS", "4"))
    backup_interval = timedelta(minutes=int(os.environ.get("BACKUP_INTERVAL_MINUTES", "10")))
    
    # Ensure recovery directory exists and is initialized
    if not recovery_dir.exists():
        recovery_dir.mkdir(parents=True, exist_ok=True)
        init_db_dir(recovery_dir, worker_count)
    
    return RecoveryConfig(recovery_dir, backup_interval)

# Use in production
recovery_config = setup_recovery()
cli_main(flow, recovery_config=recovery_config)

Kubernetes Recovery with Persistent Volumes:

from pathlib import Path
import subprocess

def setup_k8s_recovery():
    """Recovery setup for Kubernetes with persistent volumes."""
    # Use persistent volume mount
    recovery_dir = Path("/data/recovery")  # PVC mount point
    
    # Get replica count from StatefulSet
    replica_count = int(os.environ.get("REPLICA_COUNT", "3"))
    
    # Initialize if this is the first pod (ordinal 0)
    pod_ordinal = int(os.environ["HOSTNAME"].split("-")[-1])
    if pod_ordinal == 0:
        # Check if already initialized
        if not any(recovery_dir.glob("partition_*")):
            print("Initializing recovery database...")
            init_db_dir(recovery_dir, replica_count)
    
    return RecoveryConfig(
        db_dir=recovery_dir,
        backup_interval=timedelta(minutes=5)
    )

Cloud Storage Backup Integration:

import boto3
import tarfile
from datetime import datetime

class S3RecoveryManager:
    def __init__(self, bucket_name, recovery_dir):
        self.s3 = boto3.client('s3')
        self.bucket = bucket_name
        self.recovery_dir = Path(recovery_dir)
    
    def backup_to_s3(self):
        """Backup recovery partitions to S3."""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_name = f"recovery_backup_{timestamp}.tar.gz"
        
        # Create tar archive
        with tarfile.open(f"/tmp/{backup_name}", "w:gz") as tar:
            tar.add(str(self.recovery_dir), arcname="recovery")
        
        # Upload to S3
        self.s3.upload_file(
            f"/tmp/{backup_name}",
            self.bucket,
            f"bytewax/backups/{backup_name}"
        )
        
        print(f"Backup uploaded: s3://{self.bucket}/bytewax/backups/{backup_name}")
    
    def restore_from_s3(self, backup_name):
        """Restore recovery partitions from S3."""
        # Download from S3
        self.s3.download_file(
            self.bucket,
            f"bytewax/backups/{backup_name}",
            f"/tmp/{backup_name}"
        )
        
        # Extract to recovery directory
        with tarfile.open(f"/tmp/{backup_name}", "r:gz") as tar:
            tar.extractall(path=str(self.recovery_dir.parent))
        
        print(f"Recovery data restored from {backup_name}")

# Usage
recovery_manager = S3RecoveryManager("my-bytewax-backups", "/data/recovery")

# Schedule regular backups
import threading
import time

def backup_scheduler():
    while True:
        time.sleep(3600)  # Backup every hour
        try:
            recovery_manager.backup_to_s3()
        except Exception as e:
            print(f"Backup failed: {e}")

backup_thread = threading.Thread(target=backup_scheduler, daemon=True)
backup_thread.start()

Recovery Monitoring:

import psutil
import logging
from pathlib import Path

class RecoveryMonitor:
    def __init__(self, recovery_dir):
        self.recovery_dir = Path(recovery_dir)
        self.logger = logging.getLogger("recovery_monitor")
    
    def check_recovery_health(self):
        """Check recovery database health."""
        try:
            # Check if recovery directory exists
            if not self.recovery_dir.exists():
                self.logger.error("Recovery directory does not exist")
                return False
            
            # Check partition files
            partitions = list(self.recovery_dir.glob("partition_*"))
            if not partitions:
                self.logger.warning("No recovery partitions found")
                return False
            
            # Check disk space
            disk_usage = psutil.disk_usage(str(self.recovery_dir))
            free_space_gb = disk_usage.free / (1024**3)
            
            if free_space_gb < 1.0:  # Less than 1GB free
                self.logger.warning(f"Low disk space for recovery: {free_space_gb:.1f}GB free")
            
            # Check partition file sizes and modification times
            for partition in partitions:
                stat = partition.stat()
                size_mb = stat.st_size / (1024**2)
                
                self.logger.info(f"Partition {partition.name}: {size_mb:.1f}MB")
                
                # Warn if partitions are very large (might indicate memory leak)
                if size_mb > 1000:  # 1GB
                    self.logger.warning(f"Large partition file: {partition.name} ({size_mb:.1f}MB)")
            
            return True
            
        except Exception as e:
            self.logger.error(f"Recovery health check failed: {e}")
            return False
    
    def get_recovery_stats(self):
        """Get recovery statistics."""
        stats = {
            "partition_count": 0,
            "total_size_mb": 0,
            "oldest_partition": None,
            "newest_partition": None
        }
        
        partitions = list(self.recovery_dir.glob("partition_*"))
        stats["partition_count"] = len(partitions)
        
        if partitions:
            oldest_time = float('inf')
            newest_time = 0
            
            for partition in partitions:
                stat = partition.stat()
                stats["total_size_mb"] += stat.st_size / (1024**2)
                
                if stat.st_mtime < oldest_time:
                    oldest_time = stat.st_mtime
                    stats["oldest_partition"] = partition.name
                
                if stat.st_mtime > newest_time:
                    newest_time = stat.st_mtime
                    stats["newest_partition"] = partition.name
        
        return stats

# Use in production monitoring
monitor = RecoveryMonitor("/data/recovery")

# Regular health checks
def monitor_recovery():
    while True:
        if monitor.check_recovery_health():
            stats = monitor.get_recovery_stats()
            logging.info(f"Recovery stats: {stats}")
        time.sleep(300)  # Check every 5 minutes

monitoring_thread = threading.Thread(target=monitor_recovery, daemon=True)
monitoring_thread.start()

Recovery Testing:

import tempfile
import shutil
from pathlib import Path

def test_recovery_functionality():
    """Test recovery works correctly."""
    with tempfile.TemporaryDirectory() as temp_dir:
        recovery_dir = Path(temp_dir) / "recovery"
        
        # Initialize recovery database
        init_db_dir(recovery_dir, count=2)
        
        # Create test dataflow with state
        flow = Dataflow("recovery_test")
        
        test_data = [("key1", 1), ("key1", 2), ("key2", 3)]
        stream = op.input("input", flow, TestingSource(test_data))
        
        # Stateful operation that accumulates values
        def accumulate(state, value):
            total = (state or 0) + value
            return total, total
        
        accumulated = op.stateful_map("accumulate", stream, accumulate)
        
        results = []
        op.output("output", accumulated, TestingSink(results))
        
        # Run with recovery
        recovery_config = RecoveryConfig(recovery_dir)
        run_main(flow, recovery_config=recovery_config)
        
        # Verify results
        expected = [("key1", 1), ("key1", 3), ("key2", 3)]  # Cumulative sums
        assert results == expected
        
        # Verify recovery files were created
        partitions = list(recovery_dir.glob("partition_*"))
        assert len(partitions) == 2  # Should have 2 partition files
        
        print("Recovery test passed!")

# Run recovery test
test_recovery_functionality()

Install with Tessl CLI

npx tessl i tessl/pypi-bytewax

docs

connectors.md

dataflow.md

index.md

operators.md

recovery.md

runtime.md

sinks.md

sources.md

stateful.md

testing.md

tracing.md

windowing.md

tile.json