0
# State Recovery
1
2
Recovery mechanisms for fault tolerance including state snapshotting, partition management, and resume capabilities. Enables exactly-once processing guarantees in distributed environments through persistent state management.
3
4
## Capabilities
5
6
### Recovery Configuration
7
8
Configuration class for setting up state persistence and recovery behavior.
9
10
```python { .api }
11
class RecoveryConfig:
12
def __init__(self, db_dir: Path, backup_interval: Optional[timedelta] = None): ...
13
14
@property
15
def db_dir(self) -> Path: ...
16
17
@property
18
def backup_interval(self) -> Optional[timedelta]: ...
19
```
20
21
**Parameters:**
22
- `db_dir` (Path): Local filesystem directory for recovery database partitions
23
- `backup_interval` (timedelta): Time to wait before permanently deleting old snapshots (default: zero duration)
24
25
**Usage Examples:**
26
```python
27
from bytewax.recovery import RecoveryConfig
28
from pathlib import Path
29
from datetime import timedelta
30
31
# Basic recovery configuration
32
recovery_config = RecoveryConfig(
33
db_dir=Path("/data/bytewax/recovery")
34
)
35
36
# With backup interval for archival systems
37
recovery_config = RecoveryConfig(
38
db_dir=Path("/data/bytewax/recovery"),
39
backup_interval=timedelta(hours=1) # Keep old snapshots for 1 hour
40
)
41
42
# Use in dataflow execution
43
from bytewax._bytewax import cli_main
44
45
cli_main(
46
flow,
47
workers_per_process=4,
48
recovery_config=recovery_config
49
)
50
```
51
52
### Recovery Database Management
53
54
Functions for initializing and managing recovery partition databases.
55
56
```python { .api }
57
def init_db_dir(db_dir: Path, count: int): ...
58
```
59
60
**Parameters:**
61
- `db_dir` (Path): Directory to create recovery partitions in
62
- `count` (int): Number of partitions to create
63
64
**Usage Examples:**
65
```python
66
from bytewax.recovery import init_db_dir
67
from pathlib import Path
68
69
# Initialize recovery database with 4 partitions
70
recovery_dir = Path("/data/bytewax/recovery")
71
init_db_dir(recovery_dir, count=4)
72
73
# Command line usage
74
# python -m bytewax.recovery /data/recovery 4
75
```
76
77
### Recovery Exceptions
78
79
Exceptions that can occur during recovery operations, indicating various failure modes.
80
81
```python { .api }
82
class InconsistentPartitionsError(ValueError):
83
"""Raised when two recovery partitions are from very different times."""
84
...
85
86
class MissingPartitionsError(FileNotFoundError):
87
"""Raised when an incomplete set of recovery partitions is detected."""
88
...
89
90
class NoPartitionsError(FileNotFoundError):
91
"""Raised when no recovery partitions are found on any worker."""
92
...
93
```
94
95
**InconsistentPartitionsError**: Occurs when recovery partitions have timestamps that differ beyond the backup interval, indicating a backup/restore issue.
96
97
**MissingPartitionsError**: Occurs when some but not all expected recovery partitions are found, indicating incomplete backup restoration.
98
99
**NoPartitionsError**: Occurs when no recovery partitions are found at all, typically due to wrong directory path.
100
101
**Usage Examples:**
102
```python
103
from bytewax.recovery import (
104
InconsistentPartitionsError,
105
MissingPartitionsError,
106
NoPartitionsError
107
)
108
109
try:
110
cli_main(flow, recovery_config=recovery_config)
111
except NoPartitionsError:
112
print("No recovery data found - starting fresh")
113
init_db_dir(recovery_config.db_dir, worker_count)
114
cli_main(flow, recovery_config=recovery_config)
115
except MissingPartitionsError:
116
print("Incomplete recovery data - check backup restoration")
117
raise
118
except InconsistentPartitionsError:
119
print("Recovery partitions are inconsistent - check backup timing")
120
raise
121
```
122
123
### Recovery Patterns
124
125
**Production Recovery Setup:**
126
```python
127
import os
128
from pathlib import Path
129
from datetime import timedelta
130
from bytewax.recovery import RecoveryConfig, init_db_dir
131
132
def setup_recovery():
133
"""Set up recovery with environment-based configuration."""
134
recovery_dir = Path(os.environ.get("BYTEWAX_RECOVERY_DIR", "/data/recovery"))
135
worker_count = int(os.environ.get("BYTEWAX_WORKERS", "4"))
136
backup_interval = timedelta(minutes=int(os.environ.get("BACKUP_INTERVAL_MINUTES", "10")))
137
138
# Ensure recovery directory exists and is initialized
139
if not recovery_dir.exists():
140
recovery_dir.mkdir(parents=True, exist_ok=True)
141
init_db_dir(recovery_dir, worker_count)
142
143
return RecoveryConfig(recovery_dir, backup_interval)
144
145
# Use in production
146
recovery_config = setup_recovery()
147
cli_main(flow, recovery_config=recovery_config)
148
```
149
150
**Kubernetes Recovery with Persistent Volumes:**
151
```python
152
from pathlib import Path
153
import subprocess
154
155
def setup_k8s_recovery():
156
"""Recovery setup for Kubernetes with persistent volumes."""
157
# Use persistent volume mount
158
recovery_dir = Path("/data/recovery") # PVC mount point
159
160
# Get replica count from StatefulSet
161
replica_count = int(os.environ.get("REPLICA_COUNT", "3"))
162
163
# Initialize if this is the first pod (ordinal 0)
164
pod_ordinal = int(os.environ["HOSTNAME"].split("-")[-1])
165
if pod_ordinal == 0:
166
# Check if already initialized
167
if not any(recovery_dir.glob("partition_*")):
168
print("Initializing recovery database...")
169
init_db_dir(recovery_dir, replica_count)
170
171
return RecoveryConfig(
172
db_dir=recovery_dir,
173
backup_interval=timedelta(minutes=5)
174
)
175
```
176
177
**Cloud Storage Backup Integration:**
178
```python
179
import boto3
180
import tarfile
181
from datetime import datetime
182
183
class S3RecoveryManager:
184
def __init__(self, bucket_name, recovery_dir):
185
self.s3 = boto3.client('s3')
186
self.bucket = bucket_name
187
self.recovery_dir = Path(recovery_dir)
188
189
def backup_to_s3(self):
190
"""Backup recovery partitions to S3."""
191
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
192
backup_name = f"recovery_backup_{timestamp}.tar.gz"
193
194
# Create tar archive
195
with tarfile.open(f"/tmp/{backup_name}", "w:gz") as tar:
196
tar.add(str(self.recovery_dir), arcname="recovery")
197
198
# Upload to S3
199
self.s3.upload_file(
200
f"/tmp/{backup_name}",
201
self.bucket,
202
f"bytewax/backups/{backup_name}"
203
)
204
205
print(f"Backup uploaded: s3://{self.bucket}/bytewax/backups/{backup_name}")
206
207
def restore_from_s3(self, backup_name):
208
"""Restore recovery partitions from S3."""
209
# Download from S3
210
self.s3.download_file(
211
self.bucket,
212
f"bytewax/backups/{backup_name}",
213
f"/tmp/{backup_name}"
214
)
215
216
# Extract to recovery directory
217
with tarfile.open(f"/tmp/{backup_name}", "r:gz") as tar:
218
tar.extractall(path=str(self.recovery_dir.parent))
219
220
print(f"Recovery data restored from {backup_name}")
221
222
# Usage
223
recovery_manager = S3RecoveryManager("my-bytewax-backups", "/data/recovery")
224
225
# Schedule regular backups
226
import threading
227
import time
228
229
def backup_scheduler():
230
while True:
231
time.sleep(3600) # Backup every hour
232
try:
233
recovery_manager.backup_to_s3()
234
except Exception as e:
235
print(f"Backup failed: {e}")
236
237
backup_thread = threading.Thread(target=backup_scheduler, daemon=True)
238
backup_thread.start()
239
```
240
241
**Recovery Monitoring:**
242
```python
243
import psutil
244
import logging
245
from pathlib import Path
246
247
class RecoveryMonitor:
248
def __init__(self, recovery_dir):
249
self.recovery_dir = Path(recovery_dir)
250
self.logger = logging.getLogger("recovery_monitor")
251
252
def check_recovery_health(self):
253
"""Check recovery database health."""
254
try:
255
# Check if recovery directory exists
256
if not self.recovery_dir.exists():
257
self.logger.error("Recovery directory does not exist")
258
return False
259
260
# Check partition files
261
partitions = list(self.recovery_dir.glob("partition_*"))
262
if not partitions:
263
self.logger.warning("No recovery partitions found")
264
return False
265
266
# Check disk space
267
disk_usage = psutil.disk_usage(str(self.recovery_dir))
268
free_space_gb = disk_usage.free / (1024**3)
269
270
if free_space_gb < 1.0: # Less than 1GB free
271
self.logger.warning(f"Low disk space for recovery: {free_space_gb:.1f}GB free")
272
273
# Check partition file sizes and modification times
274
for partition in partitions:
275
stat = partition.stat()
276
size_mb = stat.st_size / (1024**2)
277
278
self.logger.info(f"Partition {partition.name}: {size_mb:.1f}MB")
279
280
# Warn if partitions are very large (might indicate memory leak)
281
if size_mb > 1000: # 1GB
282
self.logger.warning(f"Large partition file: {partition.name} ({size_mb:.1f}MB)")
283
284
return True
285
286
except Exception as e:
287
self.logger.error(f"Recovery health check failed: {e}")
288
return False
289
290
def get_recovery_stats(self):
291
"""Get recovery statistics."""
292
stats = {
293
"partition_count": 0,
294
"total_size_mb": 0,
295
"oldest_partition": None,
296
"newest_partition": None
297
}
298
299
partitions = list(self.recovery_dir.glob("partition_*"))
300
stats["partition_count"] = len(partitions)
301
302
if partitions:
303
oldest_time = float('inf')
304
newest_time = 0
305
306
for partition in partitions:
307
stat = partition.stat()
308
stats["total_size_mb"] += stat.st_size / (1024**2)
309
310
if stat.st_mtime < oldest_time:
311
oldest_time = stat.st_mtime
312
stats["oldest_partition"] = partition.name
313
314
if stat.st_mtime > newest_time:
315
newest_time = stat.st_mtime
316
stats["newest_partition"] = partition.name
317
318
return stats
319
320
# Use in production monitoring
321
monitor = RecoveryMonitor("/data/recovery")
322
323
# Regular health checks
324
def monitor_recovery():
325
while True:
326
if monitor.check_recovery_health():
327
stats = monitor.get_recovery_stats()
328
logging.info(f"Recovery stats: {stats}")
329
time.sleep(300) # Check every 5 minutes
330
331
monitoring_thread = threading.Thread(target=monitor_recovery, daemon=True)
332
monitoring_thread.start()
333
```
334
335
**Recovery Testing:**
336
```python
337
import tempfile
338
import shutil
339
from pathlib import Path
340
341
def test_recovery_functionality():
342
"""Test recovery works correctly."""
343
with tempfile.TemporaryDirectory() as temp_dir:
344
recovery_dir = Path(temp_dir) / "recovery"
345
346
# Initialize recovery database
347
init_db_dir(recovery_dir, count=2)
348
349
# Create test dataflow with state
350
flow = Dataflow("recovery_test")
351
352
test_data = [("key1", 1), ("key1", 2), ("key2", 3)]
353
stream = op.input("input", flow, TestingSource(test_data))
354
355
# Stateful operation that accumulates values
356
def accumulate(state, value):
357
total = (state or 0) + value
358
return total, total
359
360
accumulated = op.stateful_map("accumulate", stream, accumulate)
361
362
results = []
363
op.output("output", accumulated, TestingSink(results))
364
365
# Run with recovery
366
recovery_config = RecoveryConfig(recovery_dir)
367
run_main(flow, recovery_config=recovery_config)
368
369
# Verify results
370
expected = [("key1", 1), ("key1", 3), ("key2", 3)] # Cumulative sums
371
assert results == expected
372
373
# Verify recovery files were created
374
partitions = list(recovery_dir.glob("partition_*"))
375
assert len(partitions) == 2 # Should have 2 partition files
376
377
print("Recovery test passed!")
378
379
# Run recovery test
380
test_recovery_functionality()
381
```