Python Stateful Stream Processing Framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Recovery mechanisms for fault tolerance including state snapshotting, partition management, and resume capabilities. Enables exactly-once processing guarantees in distributed environments through persistent state management.
Configuration class for setting up state persistence and recovery behavior.
class RecoveryConfig:
def __init__(self, db_dir: Path, backup_interval: Optional[timedelta] = None): ...
@property
def db_dir(self) -> Path: ...
@property
def backup_interval(self) -> Optional[timedelta]: ...Parameters:
db_dir (Path): Local filesystem directory for recovery database partitionsbackup_interval (timedelta): Time to wait before permanently deleting old snapshots (default: zero duration)Usage Examples:
from bytewax.recovery import RecoveryConfig
from pathlib import Path
from datetime import timedelta
# Basic recovery configuration
recovery_config = RecoveryConfig(
db_dir=Path("/data/bytewax/recovery")
)
# With backup interval for archival systems
recovery_config = RecoveryConfig(
db_dir=Path("/data/bytewax/recovery"),
backup_interval=timedelta(hours=1) # Keep old snapshots for 1 hour
)
# Use in dataflow execution
from bytewax._bytewax import cli_main
cli_main(
flow,
workers_per_process=4,
recovery_config=recovery_config
)Functions for initializing and managing recovery partition databases.
def init_db_dir(db_dir: Path, count: int): ...Parameters:
db_dir (Path): Directory to create recovery partitions incount (int): Number of partitions to createUsage Examples:
from bytewax.recovery import init_db_dir
from pathlib import Path
# Initialize recovery database with 4 partitions
recovery_dir = Path("/data/bytewax/recovery")
init_db_dir(recovery_dir, count=4)
# Command line usage
# python -m bytewax.recovery /data/recovery 4Exceptions that can occur during recovery operations, indicating various failure modes.
class InconsistentPartitionsError(ValueError):
"""Raised when two recovery partitions are from very different times."""
...
class MissingPartitionsError(FileNotFoundError):
"""Raised when an incomplete set of recovery partitions is detected."""
...
class NoPartitionsError(FileNotFoundError):
"""Raised when no recovery partitions are found on any worker."""
...InconsistentPartitionsError: Occurs when recovery partitions have timestamps that differ beyond the backup interval, indicating a backup/restore issue.
MissingPartitionsError: Occurs when some but not all expected recovery partitions are found, indicating incomplete backup restoration.
NoPartitionsError: Occurs when no recovery partitions are found at all, typically due to wrong directory path.
Usage Examples:
from bytewax.recovery import (
InconsistentPartitionsError,
MissingPartitionsError,
NoPartitionsError
)
try:
cli_main(flow, recovery_config=recovery_config)
except NoPartitionsError:
print("No recovery data found - starting fresh")
init_db_dir(recovery_config.db_dir, worker_count)
cli_main(flow, recovery_config=recovery_config)
except MissingPartitionsError:
print("Incomplete recovery data - check backup restoration")
raise
except InconsistentPartitionsError:
print("Recovery partitions are inconsistent - check backup timing")
raiseProduction Recovery Setup:
import os
from pathlib import Path
from datetime import timedelta
from bytewax.recovery import RecoveryConfig, init_db_dir
def setup_recovery():
"""Set up recovery with environment-based configuration."""
recovery_dir = Path(os.environ.get("BYTEWAX_RECOVERY_DIR", "/data/recovery"))
worker_count = int(os.environ.get("BYTEWAX_WORKERS", "4"))
backup_interval = timedelta(minutes=int(os.environ.get("BACKUP_INTERVAL_MINUTES", "10")))
# Ensure recovery directory exists and is initialized
if not recovery_dir.exists():
recovery_dir.mkdir(parents=True, exist_ok=True)
init_db_dir(recovery_dir, worker_count)
return RecoveryConfig(recovery_dir, backup_interval)
# Use in production
recovery_config = setup_recovery()
cli_main(flow, recovery_config=recovery_config)Kubernetes Recovery with Persistent Volumes:
from pathlib import Path
import subprocess
def setup_k8s_recovery():
"""Recovery setup for Kubernetes with persistent volumes."""
# Use persistent volume mount
recovery_dir = Path("/data/recovery") # PVC mount point
# Get replica count from StatefulSet
replica_count = int(os.environ.get("REPLICA_COUNT", "3"))
# Initialize if this is the first pod (ordinal 0)
pod_ordinal = int(os.environ["HOSTNAME"].split("-")[-1])
if pod_ordinal == 0:
# Check if already initialized
if not any(recovery_dir.glob("partition_*")):
print("Initializing recovery database...")
init_db_dir(recovery_dir, replica_count)
return RecoveryConfig(
db_dir=recovery_dir,
backup_interval=timedelta(minutes=5)
)Cloud Storage Backup Integration:
import boto3
import tarfile
from datetime import datetime
class S3RecoveryManager:
def __init__(self, bucket_name, recovery_dir):
self.s3 = boto3.client('s3')
self.bucket = bucket_name
self.recovery_dir = Path(recovery_dir)
def backup_to_s3(self):
"""Backup recovery partitions to S3."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"recovery_backup_{timestamp}.tar.gz"
# Create tar archive
with tarfile.open(f"/tmp/{backup_name}", "w:gz") as tar:
tar.add(str(self.recovery_dir), arcname="recovery")
# Upload to S3
self.s3.upload_file(
f"/tmp/{backup_name}",
self.bucket,
f"bytewax/backups/{backup_name}"
)
print(f"Backup uploaded: s3://{self.bucket}/bytewax/backups/{backup_name}")
def restore_from_s3(self, backup_name):
"""Restore recovery partitions from S3."""
# Download from S3
self.s3.download_file(
self.bucket,
f"bytewax/backups/{backup_name}",
f"/tmp/{backup_name}"
)
# Extract to recovery directory
with tarfile.open(f"/tmp/{backup_name}", "r:gz") as tar:
tar.extractall(path=str(self.recovery_dir.parent))
print(f"Recovery data restored from {backup_name}")
# Usage
recovery_manager = S3RecoveryManager("my-bytewax-backups", "/data/recovery")
# Schedule regular backups
import threading
import time
def backup_scheduler():
while True:
time.sleep(3600) # Backup every hour
try:
recovery_manager.backup_to_s3()
except Exception as e:
print(f"Backup failed: {e}")
backup_thread = threading.Thread(target=backup_scheduler, daemon=True)
backup_thread.start()Recovery Monitoring:
import psutil
import logging
from pathlib import Path
class RecoveryMonitor:
def __init__(self, recovery_dir):
self.recovery_dir = Path(recovery_dir)
self.logger = logging.getLogger("recovery_monitor")
def check_recovery_health(self):
"""Check recovery database health."""
try:
# Check if recovery directory exists
if not self.recovery_dir.exists():
self.logger.error("Recovery directory does not exist")
return False
# Check partition files
partitions = list(self.recovery_dir.glob("partition_*"))
if not partitions:
self.logger.warning("No recovery partitions found")
return False
# Check disk space
disk_usage = psutil.disk_usage(str(self.recovery_dir))
free_space_gb = disk_usage.free / (1024**3)
if free_space_gb < 1.0: # Less than 1GB free
self.logger.warning(f"Low disk space for recovery: {free_space_gb:.1f}GB free")
# Check partition file sizes and modification times
for partition in partitions:
stat = partition.stat()
size_mb = stat.st_size / (1024**2)
self.logger.info(f"Partition {partition.name}: {size_mb:.1f}MB")
# Warn if partitions are very large (might indicate memory leak)
if size_mb > 1000: # 1GB
self.logger.warning(f"Large partition file: {partition.name} ({size_mb:.1f}MB)")
return True
except Exception as e:
self.logger.error(f"Recovery health check failed: {e}")
return False
def get_recovery_stats(self):
"""Get recovery statistics."""
stats = {
"partition_count": 0,
"total_size_mb": 0,
"oldest_partition": None,
"newest_partition": None
}
partitions = list(self.recovery_dir.glob("partition_*"))
stats["partition_count"] = len(partitions)
if partitions:
oldest_time = float('inf')
newest_time = 0
for partition in partitions:
stat = partition.stat()
stats["total_size_mb"] += stat.st_size / (1024**2)
if stat.st_mtime < oldest_time:
oldest_time = stat.st_mtime
stats["oldest_partition"] = partition.name
if stat.st_mtime > newest_time:
newest_time = stat.st_mtime
stats["newest_partition"] = partition.name
return stats
# Use in production monitoring
monitor = RecoveryMonitor("/data/recovery")
# Regular health checks
def monitor_recovery():
while True:
if monitor.check_recovery_health():
stats = monitor.get_recovery_stats()
logging.info(f"Recovery stats: {stats}")
time.sleep(300) # Check every 5 minutes
monitoring_thread = threading.Thread(target=monitor_recovery, daemon=True)
monitoring_thread.start()Recovery Testing:
import tempfile
import shutil
from pathlib import Path
def test_recovery_functionality():
"""Test recovery works correctly."""
with tempfile.TemporaryDirectory() as temp_dir:
recovery_dir = Path(temp_dir) / "recovery"
# Initialize recovery database
init_db_dir(recovery_dir, count=2)
# Create test dataflow with state
flow = Dataflow("recovery_test")
test_data = [("key1", 1), ("key1", 2), ("key2", 3)]
stream = op.input("input", flow, TestingSource(test_data))
# Stateful operation that accumulates values
def accumulate(state, value):
total = (state or 0) + value
return total, total
accumulated = op.stateful_map("accumulate", stream, accumulate)
results = []
op.output("output", accumulated, TestingSink(results))
# Run with recovery
recovery_config = RecoveryConfig(recovery_dir)
run_main(flow, recovery_config=recovery_config)
# Verify results
expected = [("key1", 1), ("key1", 3), ("key2", 3)] # Cumulative sums
assert results == expected
# Verify recovery files were created
partitions = list(recovery_dir.glob("partition_*"))
assert len(partitions) == 2 # Should have 2 partition files
print("Recovery test passed!")
# Run recovery test
test_recovery_functionality()Install with Tessl CLI
npx tessl i tessl/pypi-bytewax