or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connectors.mddataflow.mdindex.mdoperators.mdrecovery.mdruntime.mdsinks.mdsources.mdstateful.mdtesting.mdtracing.mdwindowing.md

recovery.mddocs/

0

# State Recovery

1

2

Recovery mechanisms for fault tolerance including state snapshotting, partition management, and resume capabilities. Enables exactly-once processing guarantees in distributed environments through persistent state management.

3

4

## Capabilities

5

6

### Recovery Configuration

7

8

Configuration class for setting up state persistence and recovery behavior.

9

10

```python { .api }

11

class RecoveryConfig:

12

def __init__(self, db_dir: Path, backup_interval: Optional[timedelta] = None): ...

13

14

@property

15

def db_dir(self) -> Path: ...

16

17

@property

18

def backup_interval(self) -> Optional[timedelta]: ...

19

```

20

21

**Parameters:**

22

- `db_dir` (Path): Local filesystem directory for recovery database partitions

23

- `backup_interval` (timedelta): Time to wait before permanently deleting old snapshots (default: zero duration)

24

25

**Usage Examples:**

26

```python

27

from bytewax.recovery import RecoveryConfig

28

from pathlib import Path

29

from datetime import timedelta

30

31

# Basic recovery configuration

32

recovery_config = RecoveryConfig(

33

db_dir=Path("/data/bytewax/recovery")

34

)

35

36

# With backup interval for archival systems

37

recovery_config = RecoveryConfig(

38

db_dir=Path("/data/bytewax/recovery"),

39

backup_interval=timedelta(hours=1) # Keep old snapshots for 1 hour

40

)

41

42

# Use in dataflow execution

43

from bytewax._bytewax import cli_main

44

45

cli_main(

46

flow,

47

workers_per_process=4,

48

recovery_config=recovery_config

49

)

50

```

51

52

### Recovery Database Management

53

54

Functions for initializing and managing recovery partition databases.

55

56

```python { .api }

57

def init_db_dir(db_dir: Path, count: int): ...

58

```

59

60

**Parameters:**

61

- `db_dir` (Path): Directory to create recovery partitions in

62

- `count` (int): Number of partitions to create

63

64

**Usage Examples:**

65

```python

66

from bytewax.recovery import init_db_dir

67

from pathlib import Path

68

69

# Initialize recovery database with 4 partitions

70

recovery_dir = Path("/data/bytewax/recovery")

71

init_db_dir(recovery_dir, count=4)

72

73

# Command line usage

74

# python -m bytewax.recovery /data/recovery 4

75

```

76

77

### Recovery Exceptions

78

79

Exceptions that can occur during recovery operations, indicating various failure modes.

80

81

```python { .api }

82

class InconsistentPartitionsError(ValueError):

83

"""Raised when two recovery partitions are from very different times."""

84

...

85

86

class MissingPartitionsError(FileNotFoundError):

87

"""Raised when an incomplete set of recovery partitions is detected."""

88

...

89

90

class NoPartitionsError(FileNotFoundError):

91

"""Raised when no recovery partitions are found on any worker."""

92

...

93

```

94

95

**InconsistentPartitionsError**: Occurs when recovery partitions have timestamps that differ beyond the backup interval, indicating a backup/restore issue.

96

97

**MissingPartitionsError**: Occurs when some but not all expected recovery partitions are found, indicating incomplete backup restoration.

98

99

**NoPartitionsError**: Occurs when no recovery partitions are found at all, typically due to wrong directory path.

100

101

**Usage Examples:**

102

```python

103

from bytewax.recovery import (

104

InconsistentPartitionsError,

105

MissingPartitionsError,

106

NoPartitionsError

107

)

108

109

try:

110

cli_main(flow, recovery_config=recovery_config)

111

except NoPartitionsError:

112

print("No recovery data found - starting fresh")

113

init_db_dir(recovery_config.db_dir, worker_count)

114

cli_main(flow, recovery_config=recovery_config)

115

except MissingPartitionsError:

116

print("Incomplete recovery data - check backup restoration")

117

raise

118

except InconsistentPartitionsError:

119

print("Recovery partitions are inconsistent - check backup timing")

120

raise

121

```

122

123

### Recovery Patterns

124

125

**Production Recovery Setup:**

126

```python

127

import os

128

from pathlib import Path

129

from datetime import timedelta

130

from bytewax.recovery import RecoveryConfig, init_db_dir

131

132

def setup_recovery():

133

"""Set up recovery with environment-based configuration."""

134

recovery_dir = Path(os.environ.get("BYTEWAX_RECOVERY_DIR", "/data/recovery"))

135

worker_count = int(os.environ.get("BYTEWAX_WORKERS", "4"))

136

backup_interval = timedelta(minutes=int(os.environ.get("BACKUP_INTERVAL_MINUTES", "10")))

137

138

# Ensure recovery directory exists and is initialized

139

if not recovery_dir.exists():

140

recovery_dir.mkdir(parents=True, exist_ok=True)

141

init_db_dir(recovery_dir, worker_count)

142

143

return RecoveryConfig(recovery_dir, backup_interval)

144

145

# Use in production

146

recovery_config = setup_recovery()

147

cli_main(flow, recovery_config=recovery_config)

148

```

149

150

**Kubernetes Recovery with Persistent Volumes:**

151

```python

152

from pathlib import Path

153

import subprocess

154

155

def setup_k8s_recovery():

156

"""Recovery setup for Kubernetes with persistent volumes."""

157

# Use persistent volume mount

158

recovery_dir = Path("/data/recovery") # PVC mount point

159

160

# Get replica count from StatefulSet

161

replica_count = int(os.environ.get("REPLICA_COUNT", "3"))

162

163

# Initialize if this is the first pod (ordinal 0)

164

pod_ordinal = int(os.environ["HOSTNAME"].split("-")[-1])

165

if pod_ordinal == 0:

166

# Check if already initialized

167

if not any(recovery_dir.glob("partition_*")):

168

print("Initializing recovery database...")

169

init_db_dir(recovery_dir, replica_count)

170

171

return RecoveryConfig(

172

db_dir=recovery_dir,

173

backup_interval=timedelta(minutes=5)

174

)

175

```

176

177

**Cloud Storage Backup Integration:**

178

```python

179

import boto3

180

import tarfile

181

from datetime import datetime

182

183

class S3RecoveryManager:

184

def __init__(self, bucket_name, recovery_dir):

185

self.s3 = boto3.client('s3')

186

self.bucket = bucket_name

187

self.recovery_dir = Path(recovery_dir)

188

189

def backup_to_s3(self):

190

"""Backup recovery partitions to S3."""

191

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

192

backup_name = f"recovery_backup_{timestamp}.tar.gz"

193

194

# Create tar archive

195

with tarfile.open(f"/tmp/{backup_name}", "w:gz") as tar:

196

tar.add(str(self.recovery_dir), arcname="recovery")

197

198

# Upload to S3

199

self.s3.upload_file(

200

f"/tmp/{backup_name}",

201

self.bucket,

202

f"bytewax/backups/{backup_name}"

203

)

204

205

print(f"Backup uploaded: s3://{self.bucket}/bytewax/backups/{backup_name}")

206

207

def restore_from_s3(self, backup_name):

208

"""Restore recovery partitions from S3."""

209

# Download from S3

210

self.s3.download_file(

211

self.bucket,

212

f"bytewax/backups/{backup_name}",

213

f"/tmp/{backup_name}"

214

)

215

216

# Extract to recovery directory

217

with tarfile.open(f"/tmp/{backup_name}", "r:gz") as tar:

218

tar.extractall(path=str(self.recovery_dir.parent))

219

220

print(f"Recovery data restored from {backup_name}")

221

222

# Usage

223

recovery_manager = S3RecoveryManager("my-bytewax-backups", "/data/recovery")

224

225

# Schedule regular backups

226

import threading

227

import time

228

229

def backup_scheduler():

230

while True:

231

time.sleep(3600) # Backup every hour

232

try:

233

recovery_manager.backup_to_s3()

234

except Exception as e:

235

print(f"Backup failed: {e}")

236

237

backup_thread = threading.Thread(target=backup_scheduler, daemon=True)

238

backup_thread.start()

239

```

240

241

**Recovery Monitoring:**

242

```python

243

import psutil

244

import logging

245

from pathlib import Path

246

247

class RecoveryMonitor:

248

def __init__(self, recovery_dir):

249

self.recovery_dir = Path(recovery_dir)

250

self.logger = logging.getLogger("recovery_monitor")

251

252

def check_recovery_health(self):

253

"""Check recovery database health."""

254

try:

255

# Check if recovery directory exists

256

if not self.recovery_dir.exists():

257

self.logger.error("Recovery directory does not exist")

258

return False

259

260

# Check partition files

261

partitions = list(self.recovery_dir.glob("partition_*"))

262

if not partitions:

263

self.logger.warning("No recovery partitions found")

264

return False

265

266

# Check disk space

267

disk_usage = psutil.disk_usage(str(self.recovery_dir))

268

free_space_gb = disk_usage.free / (1024**3)

269

270

if free_space_gb < 1.0: # Less than 1GB free

271

self.logger.warning(f"Low disk space for recovery: {free_space_gb:.1f}GB free")

272

273

# Check partition file sizes and modification times

274

for partition in partitions:

275

stat = partition.stat()

276

size_mb = stat.st_size / (1024**2)

277

278

self.logger.info(f"Partition {partition.name}: {size_mb:.1f}MB")

279

280

# Warn if partitions are very large (might indicate memory leak)

281

if size_mb > 1000: # 1GB

282

self.logger.warning(f"Large partition file: {partition.name} ({size_mb:.1f}MB)")

283

284

return True

285

286

except Exception as e:

287

self.logger.error(f"Recovery health check failed: {e}")

288

return False

289

290

def get_recovery_stats(self):

291

"""Get recovery statistics."""

292

stats = {

293

"partition_count": 0,

294

"total_size_mb": 0,

295

"oldest_partition": None,

296

"newest_partition": None

297

}

298

299

partitions = list(self.recovery_dir.glob("partition_*"))

300

stats["partition_count"] = len(partitions)

301

302

if partitions:

303

oldest_time = float('inf')

304

newest_time = 0

305

306

for partition in partitions:

307

stat = partition.stat()

308

stats["total_size_mb"] += stat.st_size / (1024**2)

309

310

if stat.st_mtime < oldest_time:

311

oldest_time = stat.st_mtime

312

stats["oldest_partition"] = partition.name

313

314

if stat.st_mtime > newest_time:

315

newest_time = stat.st_mtime

316

stats["newest_partition"] = partition.name

317

318

return stats

319

320

# Use in production monitoring

321

monitor = RecoveryMonitor("/data/recovery")

322

323

# Regular health checks

324

def monitor_recovery():

325

while True:

326

if monitor.check_recovery_health():

327

stats = monitor.get_recovery_stats()

328

logging.info(f"Recovery stats: {stats}")

329

time.sleep(300) # Check every 5 minutes

330

331

monitoring_thread = threading.Thread(target=monitor_recovery, daemon=True)

332

monitoring_thread.start()

333

```

334

335

**Recovery Testing:**

336

```python

337

import tempfile

338

import shutil

339

from pathlib import Path

340

341

def test_recovery_functionality():

342

"""Test recovery works correctly."""

343

with tempfile.TemporaryDirectory() as temp_dir:

344

recovery_dir = Path(temp_dir) / "recovery"

345

346

# Initialize recovery database

347

init_db_dir(recovery_dir, count=2)

348

349

# Create test dataflow with state

350

flow = Dataflow("recovery_test")

351

352

test_data = [("key1", 1), ("key1", 2), ("key2", 3)]

353

stream = op.input("input", flow, TestingSource(test_data))

354

355

# Stateful operation that accumulates values

356

def accumulate(state, value):

357

total = (state or 0) + value

358

return total, total

359

360

accumulated = op.stateful_map("accumulate", stream, accumulate)

361

362

results = []

363

op.output("output", accumulated, TestingSink(results))

364

365

# Run with recovery

366

recovery_config = RecoveryConfig(recovery_dir)

367

run_main(flow, recovery_config=recovery_config)

368

369

# Verify results

370

expected = [("key1", 1), ("key1", 3), ("key2", 3)] # Cumulative sums

371

assert results == expected

372

373

# Verify recovery files were created

374

partitions = list(recovery_dir.glob("partition_*"))

375

assert len(partitions) == 2 # Should have 2 partition files

376

377

print("Recovery test passed!")

378

379

# Run recovery test

380

test_recovery_functionality()

381

```