or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-systems.mdcore-workflow.mdfile-management.mdindex.mdjob-stores.mdprovisioning.mdutilities.mdworkflow-languages.md

job-stores.mddocs/

0

# Job Store Management

1

2

## Overview

3

4

Toil's job store system provides persistent storage for workflow metadata, job descriptions, and intermediate files. Job stores abstract the underlying storage mechanism, allowing workflows to run with different backends including local file systems, cloud object storage (AWS S3, Google Cloud Storage), and distributed file systems. The job store maintains workflow state, enables fault tolerance through checkpointing, and facilitates workflow restart and recovery capabilities.

5

6

## Capabilities

7

8

### Abstract Job Store Interface

9

{ .api }

10

11

The `AbstractJobStore` defines the core interface that all job store implementations must provide.

12

13

```python

14

from toil.jobStores.abstractJobStore import (

15

AbstractJobStore,

16

NoSuchJobException,

17

NoSuchFileException,

18

ConcurrentFileModificationException

19

)

20

from toil.job import JobDescription

21

from toil.common import Config

22

from typing import Iterator, Optional

23

24

class CustomJobStore(AbstractJobStore):

25

"""Custom job store implementation."""

26

27

def initialize(self, config: Config) -> None:

28

"""Initialize job store with configuration."""

29

self.config = config

30

self.locator = config.jobStore

31

# Set up storage backend

32

self.setup_storage()

33

34

def resume(self) -> None:

35

"""Resume from existing job store."""

36

# Verify job store exists and is accessible

37

if not self.exists():

38

raise NoSuchJobStoreException(f"Job store not found: {self.locator}")

39

# Load existing state

40

self.load_state()

41

42

def assignID(self, jobDescription: JobDescription) -> str:

43

"""Assign unique ID to job description."""

44

job_id = self.generate_unique_id()

45

jobDescription.jobStoreID = job_id

46

return job_id

47

48

def create(self, jobDescription: JobDescription) -> JobDescription:

49

"""Create and store new job."""

50

if not hasattr(jobDescription, 'jobStoreID'):

51

self.assignID(jobDescription)

52

53

# Serialize and store job description

54

job_data = self.serialize_job(jobDescription)

55

self.store_job_data(jobDescription.jobStoreID, job_data)

56

57

return jobDescription

58

59

def update(self, job: JobDescription) -> None:

60

"""Update existing job description."""

61

if not self.job_exists(job.jobStoreID):

62

raise NoSuchJobException(f"Job not found: {job.jobStoreID}")

63

64

job_data = self.serialize_job(job)

65

self.store_job_data(job.jobStoreID, job_data)

66

67

def load(self, jobStoreID: str) -> JobDescription:

68

"""Load job description by ID."""

69

if not self.job_exists(jobStoreID):

70

raise NoSuchJobException(f"Job not found: {jobStoreID}")

71

72

job_data = self.load_job_data(jobStoreID)

73

return self.deserialize_job(job_data)

74

75

def delete(self, jobStoreID: str) -> None:

76

"""Delete job and associated data."""

77

if not self.job_exists(jobStoreID):

78

raise NoSuchJobException(f"Job not found: {jobStoreID}")

79

80

# Delete job data and any associated files

81

self.delete_job_data(jobStoreID)

82

self.delete_job_files(jobStoreID)

83

84

def jobs(self) -> Iterator[JobDescription]:

85

"""Iterate over all jobs in store."""

86

for job_id in self.list_job_ids():

87

yield self.load(job_id)

88

89

def writeFile(self, localFilePath: str, jobStoreID: Optional[str] = None) -> str:

90

"""Write local file to job store."""

91

file_id = self.generate_file_id()

92

93

with open(localFilePath, 'rb') as local_file:

94

file_data = local_file.read()

95

96

self.store_file_data(file_id, file_data)

97

98

# Associate file with job if specified

99

if jobStoreID:

100

self.associate_file_with_job(file_id, jobStoreID)

101

102

return file_id

103

104

def readFile(self, jobStoreFileID: str, localFilePath: str) -> None:

105

"""Read file from job store to local path."""

106

if not self.file_exists(jobStoreFileID):

107

raise NoSuchFileException(f"File not found: {jobStoreFileID}")

108

109

file_data = self.load_file_data(jobStoreFileID)

110

111

with open(localFilePath, 'wb') as local_file:

112

local_file.write(file_data)

113

114

def deleteFile(self, jobStoreFileID: str) -> None:

115

"""Delete file from job store."""

116

if not self.file_exists(jobStoreFileID):

117

raise NoSuchFileException(f"File not found: {jobStoreFileID}")

118

119

self.delete_file_data(jobStoreFileID)

120

121

def fileExists(self, jobStoreFileID: str) -> bool:

122

"""Check if file exists in job store."""

123

return self.file_exists_impl(jobStoreFileID)

124

```

125

126

### File-Based Job Store

127

{ .api }

128

129

The `FileJobStore` uses the local file system for storage, suitable for single-node deployments and shared file systems.

130

131

```python

132

from toil.jobStores.fileJobStore import FileJobStore

133

from toil.common import Config

134

import os

135

136

# File job store configuration

137

config = Config()

138

config.jobStore = "file:/tmp/my-workflow-jobstore" # Local directory

139

140

# Alternative: network file system

141

config.jobStore = "file:/shared/nfs/workflow-store"

142

143

# Initialize file job store

144

file_store = FileJobStore(config.jobStore)

145

file_store.initialize(config)

146

147

# File job store structure:

148

# /tmp/my-workflow-jobstore/

149

# ├── jobs/ # Job descriptions

150

# ├── files/ # Stored files

151

# ├── stats/ # Statistics files

152

# └── tmp/ # Temporary files

153

154

# Working with file job store

155

from toil.job import JobDescription

156

157

# Create job description

158

job_desc = JobDescription(

159

requirements={"memory": 1024*1024*1024, "cores": 1, "disk": 1024*1024*1024},

160

jobName="test_job",

161

unitName="test_unit"

162

)

163

164

# Store job

165

created_job = file_store.create(job_desc)

166

job_id = created_job.jobStoreID

167

168

# Update job

169

created_job.remainingTryCount = 2

170

file_store.update(created_job)

171

172

# Load job

173

loaded_job = file_store.load(job_id)

174

175

# Store file

176

test_file = "/tmp/input.txt"

177

with open(test_file, 'w') as f:

178

f.write("test data")

179

180

file_id = file_store.writeFile(test_file, jobStoreID=job_id)

181

182

# Read file back

183

output_file = "/tmp/output.txt"

184

file_store.readFile(file_id, output_file)

185

186

# Cleanup

187

file_store.deleteFile(file_id)

188

file_store.delete(job_id)

189

```

190

191

### AWS S3 Job Store

192

{ .api }

193

194

The `AWSJobStore` uses Amazon S3 for scalable, distributed storage in cloud environments.

195

196

```python

197

from toil.jobStores.aws.jobStore import AWSJobStore

198

from toil.common import Config

199

200

# AWS S3 job store configuration

201

config = Config()

202

config.jobStore = "aws:us-west-2:my-toil-bucket:workflow-123"

203

# Format: aws:region:bucket:path_prefix

204

205

# AWS credentials configuration (multiple options)

206

# Option 1: Environment variables

207

import os

208

os.environ['AWS_ACCESS_KEY_ID'] = 'your_access_key'

209

os.environ['AWS_SECRET_ACCESS_KEY'] = 'your_secret_key'

210

211

# Option 2: AWS credentials file

212

config.awsCredentials = "~/.aws/credentials"

213

214

# Option 3: IAM roles (for EC2 instances)

215

# No explicit credentials needed

216

217

# S3-specific settings

218

config.awsRegion = "us-west-2"

219

config.sseKey = "alias/my-kms-key" # KMS encryption

220

config.sseKeyFile = "/path/to/sse-key.txt" # Local encryption key

221

222

# Initialize AWS job store

223

aws_store = AWSJobStore(config.jobStore)

224

aws_store.initialize(config)

225

226

# S3 bucket structure:

227

# my-toil-bucket/

228

# └── workflow-123/

229

# ├── jobs/ # Job descriptions as JSON

230

# ├── files/ # Binary file storage

231

# ├── stats/ # Workflow statistics

232

# └── versions/ # Versioning metadata

233

234

# Working with S3 job store

235

job_desc = JobDescription(

236

requirements={"memory": 2*1024*1024*1024, "cores": 2, "disk": 5*1024*1024*1024},

237

jobName="s3_job"

238

)

239

240

# Operations are identical to file store but backed by S3

241

job = aws_store.create(job_desc)

242

243

# Large file handling optimized for S3

244

large_file = "/tmp/large_dataset.bin"

245

file_id = aws_store.writeFile(large_file) # Automatically uses multipart upload

246

247

# Concurrent access protection

248

try:

249

aws_store.update(job)

250

except ConcurrentFileModificationException:

251

# Handle concurrent modification

252

fresh_job = aws_store.load(job.jobStoreID)

253

# Retry update with fresh data

254

```

255

256

### Google Cloud Storage Job Store

257

{ .api }

258

259

The `GoogleJobStore` provides integration with Google Cloud Storage for Google Cloud Platform deployments.

260

261

```python

262

from toil.jobStores.googleJobStore import GoogleJobStore

263

from toil.common import Config

264

265

# Google Cloud Storage job store configuration

266

config = Config()

267

config.jobStore = "gce:us-central1:my-gcs-bucket:workflow-path"

268

# Format: gce:region:bucket:path_prefix

269

270

# Google Cloud authentication

271

# Option 1: Service account key file

272

config.googleCredentials = "/path/to/service-account.json"

273

274

# Option 2: Application default credentials

275

# gcloud auth application-default login

276

277

# Option 3: Service account on GCE instances

278

# Automatic authentication

279

280

# Initialize Google job store

281

gcs_store = GoogleJobStore(config.jobStore)

282

gcs_store.initialize(config)

283

284

# GCS bucket structure similar to S3:

285

# my-gcs-bucket/

286

# └── workflow-path/

287

# ├── jobs/ # Job metadata

288

# ├── files/ # File storage

289

# └── stats/ # Statistics

290

291

# Google-specific features

292

job_desc = JobDescription(

293

requirements={"memory": 4*1024*1024*1024, "cores": 4, "disk": 10*1024*1024*1024},

294

jobName="gcs_job"

295

)

296

297

job = gcs_store.create(job_desc)

298

299

# Efficient handling of Google Cloud native formats

300

file_id = gcs_store.writeFile("/tmp/data.csv")

301

302

# Integration with Google Cloud IAM

303

# Automatic handling of GCS permissions and access controls

304

```

305

306

### File Import and Export

307

{ .api }

308

309

Job stores support importing and exporting files from external sources and destinations.

310

311

```python

312

from toil.jobStores.abstractJobStore import AbstractJobStore

313

314

def demonstrate_import_export(job_store: AbstractJobStore):

315

"""Demonstrate file import/export capabilities."""

316

317

# Import from various sources

318

319

# Import from HTTP/HTTPS URL

320

http_file_id = job_store.importFile(

321

srcUrl="https://example.com/dataset.csv",

322

sharedFileName="shared_dataset.csv" # Optional shared name

323

)

324

325

# Import from FTP

326

ftp_file_id = job_store.importFile(

327

srcUrl="ftp://data.example.com/public/genome.fa"

328

)

329

330

# Import from S3 (from different job store)

331

s3_file_id = job_store.importFile(

332

srcUrl="s3://other-bucket/path/to/file.txt"

333

)

334

335

# Import from Google Cloud Storage

336

gcs_file_id = job_store.importFile(

337

srcUrl="gs://other-bucket/data/results.json"

338

)

339

340

# Import from local file system

341

local_file_id = job_store.importFile(

342

srcUrl="file:///absolute/path/to/local/file.dat"

343

)

344

345

# Export to various destinations

346

347

# Export to S3

348

job_store.exportFile(

349

jobStoreFileID=http_file_id,

350

dstUrl="s3://output-bucket/processed/dataset.csv"

351

)

352

353

# Export to Google Cloud Storage

354

job_store.exportFile(

355

jobStoreFileID=s3_file_id,

356

dstUrl="gs://results-bucket/analysis/output.txt"

357

)

358

359

# Export to HTTP endpoint (POST)

360

job_store.exportFile(

361

jobStoreFileID=ftp_file_id,

362

dstUrl="https://api.example.com/upload/genome"

363

)

364

365

# Export to local file system

366

job_store.exportFile(

367

jobStoreFileID=gcs_file_id,

368

dstUrl="file:///tmp/final_results.json"

369

)

370

371

# Bulk import/export operations

372

def bulk_file_operations(job_store: AbstractJobStore):

373

"""Handle multiple file operations efficiently."""

374

375

# Import multiple files

376

import_urls = [

377

"https://data.example.com/file1.csv",

378

"https://data.example.com/file2.csv",

379

"https://data.example.com/file3.csv"

380

]

381

382

imported_files = []

383

for url in import_urls:

384

file_id = job_store.importFile(url)

385

imported_files.append(file_id)

386

387

# Process files...

388

389

# Export results

390

export_destinations = [

391

"s3://results/output1.csv",

392

"s3://results/output2.csv",

393

"s3://results/output3.csv"

394

]

395

396

for file_id, dest_url in zip(imported_files, export_destinations):

397

job_store.exportFile(file_id, dest_url)

398

```

399

400

### Job Store Utilities and Management

401

{ .api }

402

403

Utilities for job store management, cleanup, and maintenance operations.

404

405

```python

406

from toil.jobStores.abstractJobStore import AbstractJobStore

407

from toil.common import Config

408

409

def job_store_utilities():

410

"""Demonstrate job store utility operations."""

411

412

config = Config()

413

config.jobStore = "file:/tmp/utility-demo"

414

415

# Get job store instance

416

job_store = AbstractJobStore.createJobStore(config.jobStore)

417

job_store.initialize(config)

418

419

# Job enumeration and statistics

420

total_jobs = 0

421

completed_jobs = 0

422

failed_jobs = 0

423

424

for job in job_store.jobs():

425

total_jobs += 1

426

if job.remainingTryCount == 0:

427

failed_jobs += 1

428

elif hasattr(job, 'completed') and job.completed:

429

completed_jobs += 1

430

431

print(f"Total jobs: {total_jobs}")

432

print(f"Completed: {completed_jobs}")

433

print(f"Failed: {failed_jobs}")

434

435

# File inventory

436

all_files = job_store.get_all_file_ids() # Implementation specific

437

total_size = 0

438

439

for file_id in all_files:

440

if job_store.fileExists(file_id):

441

file_size = job_store.getFileSize(file_id) # Implementation specific

442

total_size += file_size

443

444

print(f"Total files: {len(all_files)}")

445

print(f"Total size: {total_size / (1024*1024)} MB")

446

447

# Cleanup orphaned files

448

def cleanup_orphaned_files():

449

"""Remove files not associated with any job."""

450

active_job_ids = {job.jobStoreID for job in job_store.jobs()}

451

452

for file_id in all_files:

453

associated_job = job_store.get_file_job_association(file_id)

454

if associated_job not in active_job_ids:

455

print(f"Cleaning up orphaned file: {file_id}")

456

job_store.deleteFile(file_id)

457

458

# Job store migration between backends

459

def migrate_job_store(source_locator: str, dest_locator: str):

460

"""Migrate job store from one backend to another."""

461

462

source_config = Config()

463

source_config.jobStore = source_locator

464

source_store = AbstractJobStore.createJobStore(source_locator)

465

source_store.resume()

466

467

dest_config = Config()

468

dest_config.jobStore = dest_locator

469

dest_store = AbstractJobStore.createJobStore(dest_locator)

470

dest_store.initialize(dest_config)

471

472

# Migrate all jobs

473

for job in source_store.jobs():

474

dest_store.create(job)

475

476

# Migrate all files

477

for file_id in source_store.get_all_file_ids():

478

if source_store.fileExists(file_id):

479

# Read from source

480

temp_file = f"/tmp/migration_{file_id}"

481

source_store.readFile(file_id, temp_file)

482

483

# Write to destination

484

dest_store.writeFile(temp_file, jobStoreID=None)

485

486

# Cleanup temp file

487

os.unlink(temp_file)

488

489

print(f"Migration complete: {source_locator} -> {dest_locator}")

490

```

491

492

### Error Handling and Recovery

493

{ .api }

494

495

Comprehensive error handling for job store operations and data integrity.

496

497

```python

498

from toil.jobStores.abstractJobStore import (

499

NoSuchJobException,

500

NoSuchFileException,

501

ConcurrentFileModificationException,

502

JobStoreExistsException,

503

NoSuchJobStoreException

504

)

505

import logging

506

import time

507

508

def robust_job_store_operations(job_store: AbstractJobStore):

509

"""Demonstrate robust error handling for job store operations."""

510

511

def safe_job_update(job: JobDescription, max_retries: int = 3):

512

"""Update job with retry logic for concurrent modifications."""

513

514

for attempt in range(max_retries):

515

try:

516

job_store.update(job)

517

return True

518

519

except ConcurrentFileModificationException as e:

520

logging.warning(f"Concurrent modification attempt {attempt + 1}: {e}")

521

522

if attempt < max_retries - 1:

523

# Wait and reload fresh job state

524

time.sleep(0.1 * (2 ** attempt)) # Exponential backoff

525

fresh_job = job_store.load(job.jobStoreID)

526

# Merge changes if possible

527

job = merge_job_changes(job, fresh_job)

528

else:

529

logging.error("Failed to update job after max retries")

530

raise

531

532

except NoSuchJobException as e:

533

logging.error(f"Job no longer exists: {e}")

534

return False

535

536

return False

537

538

def safe_file_operations(file_operations: list):

539

"""Execute file operations with error recovery."""

540

541

completed_operations = []

542

543

for operation in file_operations:

544

try:

545

if operation['type'] == 'write':

546

file_id = job_store.writeFile(

547

operation['local_path'],

548

jobStoreID=operation.get('job_id')

549

)

550

completed_operations.append(('write', file_id))

551

552

elif operation['type'] == 'read':

553

job_store.readFile(

554

operation['file_id'],

555

operation['local_path']

556

)

557

completed_operations.append(('read', operation['file_id']))

558

559

elif operation['type'] == 'import':

560

file_id = job_store.importFile(

561

operation['src_url'],

562

sharedFileName=operation.get('shared_name')

563

)

564

completed_operations.append(('import', file_id))

565

566

except NoSuchFileException as e:

567

logging.error(f"File operation failed - file not found: {e}")

568

# Skip this operation, continue with others

569

570

except Exception as e:

571

logging.error(f"File operation failed: {e}")

572

# Rollback completed operations

573

rollback_file_operations(completed_operations)

574

raise

575

576

return completed_operations

577

578

def rollback_file_operations(operations: list):

579

"""Rollback completed file operations on error."""

580

581

for op_type, file_id in reversed(operations):

582

try:

583

if op_type in ('write', 'import'):

584

job_store.deleteFile(file_id)

585

logging.info(f"Rolled back {op_type} operation for file {file_id}")

586

except Exception as e:

587

logging.warning(f"Failed to rollback {op_type} for {file_id}: {e}")

588

589

def handle_job_store_initialization_errors():

590

"""Handle errors during job store initialization."""

591

592

config = Config()

593

config.jobStore = "aws:us-west-2:my-bucket:workflow-123"

594

595

try:

596

job_store = AbstractJobStore.createJobStore(config.jobStore)

597

job_store.initialize(config)

598

599

except JobStoreExistsException as e:

600

logging.info(f"Job store already exists, resuming: {e}")

601

job_store.resume()

602

603

except NoSuchJobStoreException as e:

604

logging.error(f"Job store not found, cannot resume: {e}")

605

# Create new job store

606

job_store.initialize(config)

607

608

except Exception as e:

609

logging.error(f"Failed to initialize job store: {e}")

610

# Try alternative job store location

611

config.jobStore = "file:/tmp/fallback-jobstore"

612

job_store = AbstractJobStore.createJobStore(config.jobStore)

613

job_store.initialize(config)

614

615

return job_store

616

```

617

618

This job store management system provides robust, scalable storage for workflow metadata and files across diverse storage backends with comprehensive error handling and recovery capabilities.