0
# Job Store Management
1
2
## Overview
3
4
Toil's job store system provides persistent storage for workflow metadata, job descriptions, and intermediate files. Job stores abstract the underlying storage mechanism, allowing workflows to run with different backends including local file systems, cloud object storage (AWS S3, Google Cloud Storage), and distributed file systems. The job store maintains workflow state, enables fault tolerance through checkpointing, and facilitates workflow restart and recovery capabilities.
5
6
## Capabilities
7
8
### Abstract Job Store Interface
9
{ .api }
10
11
The `AbstractJobStore` defines the core interface that all job store implementations must provide.
12
13
```python
14
from toil.jobStores.abstractJobStore import (
15
AbstractJobStore,
16
NoSuchJobException,
17
NoSuchFileException,
18
ConcurrentFileModificationException
19
)
20
from toil.job import JobDescription
21
from toil.common import Config
22
from typing import Iterator, Optional
23
24
class CustomJobStore(AbstractJobStore):
25
"""Custom job store implementation."""
26
27
def initialize(self, config: Config) -> None:
28
"""Initialize job store with configuration."""
29
self.config = config
30
self.locator = config.jobStore
31
# Set up storage backend
32
self.setup_storage()
33
34
def resume(self) -> None:
35
"""Resume from existing job store."""
36
# Verify job store exists and is accessible
37
if not self.exists():
38
raise NoSuchJobStoreException(f"Job store not found: {self.locator}")
39
# Load existing state
40
self.load_state()
41
42
def assignID(self, jobDescription: JobDescription) -> str:
43
"""Assign unique ID to job description."""
44
job_id = self.generate_unique_id()
45
jobDescription.jobStoreID = job_id
46
return job_id
47
48
def create(self, jobDescription: JobDescription) -> JobDescription:
49
"""Create and store new job."""
50
if not hasattr(jobDescription, 'jobStoreID'):
51
self.assignID(jobDescription)
52
53
# Serialize and store job description
54
job_data = self.serialize_job(jobDescription)
55
self.store_job_data(jobDescription.jobStoreID, job_data)
56
57
return jobDescription
58
59
def update(self, job: JobDescription) -> None:
60
"""Update existing job description."""
61
if not self.job_exists(job.jobStoreID):
62
raise NoSuchJobException(f"Job not found: {job.jobStoreID}")
63
64
job_data = self.serialize_job(job)
65
self.store_job_data(job.jobStoreID, job_data)
66
67
def load(self, jobStoreID: str) -> JobDescription:
68
"""Load job description by ID."""
69
if not self.job_exists(jobStoreID):
70
raise NoSuchJobException(f"Job not found: {jobStoreID}")
71
72
job_data = self.load_job_data(jobStoreID)
73
return self.deserialize_job(job_data)
74
75
def delete(self, jobStoreID: str) -> None:
76
"""Delete job and associated data."""
77
if not self.job_exists(jobStoreID):
78
raise NoSuchJobException(f"Job not found: {jobStoreID}")
79
80
# Delete job data and any associated files
81
self.delete_job_data(jobStoreID)
82
self.delete_job_files(jobStoreID)
83
84
def jobs(self) -> Iterator[JobDescription]:
85
"""Iterate over all jobs in store."""
86
for job_id in self.list_job_ids():
87
yield self.load(job_id)
88
89
def writeFile(self, localFilePath: str, jobStoreID: Optional[str] = None) -> str:
90
"""Write local file to job store."""
91
file_id = self.generate_file_id()
92
93
with open(localFilePath, 'rb') as local_file:
94
file_data = local_file.read()
95
96
self.store_file_data(file_id, file_data)
97
98
# Associate file with job if specified
99
if jobStoreID:
100
self.associate_file_with_job(file_id, jobStoreID)
101
102
return file_id
103
104
def readFile(self, jobStoreFileID: str, localFilePath: str) -> None:
105
"""Read file from job store to local path."""
106
if not self.file_exists(jobStoreFileID):
107
raise NoSuchFileException(f"File not found: {jobStoreFileID}")
108
109
file_data = self.load_file_data(jobStoreFileID)
110
111
with open(localFilePath, 'wb') as local_file:
112
local_file.write(file_data)
113
114
def deleteFile(self, jobStoreFileID: str) -> None:
115
"""Delete file from job store."""
116
if not self.file_exists(jobStoreFileID):
117
raise NoSuchFileException(f"File not found: {jobStoreFileID}")
118
119
self.delete_file_data(jobStoreFileID)
120
121
def fileExists(self, jobStoreFileID: str) -> bool:
122
"""Check if file exists in job store."""
123
return self.file_exists_impl(jobStoreFileID)
124
```
125
126
### File-Based Job Store
127
{ .api }
128
129
The `FileJobStore` uses the local file system for storage, suitable for single-node deployments and shared file systems.
130
131
```python
132
from toil.jobStores.fileJobStore import FileJobStore
133
from toil.common import Config
134
import os
135
136
# File job store configuration
137
config = Config()
138
config.jobStore = "file:/tmp/my-workflow-jobstore" # Local directory
139
140
# Alternative: network file system
141
config.jobStore = "file:/shared/nfs/workflow-store"
142
143
# Initialize file job store
144
file_store = FileJobStore(config.jobStore)
145
file_store.initialize(config)
146
147
# File job store structure:
148
# /tmp/my-workflow-jobstore/
149
# ├── jobs/ # Job descriptions
150
# ├── files/ # Stored files
151
# ├── stats/ # Statistics files
152
# └── tmp/ # Temporary files
153
154
# Working with file job store
155
from toil.job import JobDescription
156
157
# Create job description
158
job_desc = JobDescription(
159
requirements={"memory": 1024*1024*1024, "cores": 1, "disk": 1024*1024*1024},
160
jobName="test_job",
161
unitName="test_unit"
162
)
163
164
# Store job
165
created_job = file_store.create(job_desc)
166
job_id = created_job.jobStoreID
167
168
# Update job
169
created_job.remainingTryCount = 2
170
file_store.update(created_job)
171
172
# Load job
173
loaded_job = file_store.load(job_id)
174
175
# Store file
176
test_file = "/tmp/input.txt"
177
with open(test_file, 'w') as f:
178
f.write("test data")
179
180
file_id = file_store.writeFile(test_file, jobStoreID=job_id)
181
182
# Read file back
183
output_file = "/tmp/output.txt"
184
file_store.readFile(file_id, output_file)
185
186
# Cleanup
187
file_store.deleteFile(file_id)
188
file_store.delete(job_id)
189
```
190
191
### AWS S3 Job Store
192
{ .api }
193
194
The `AWSJobStore` uses Amazon S3 for scalable, distributed storage in cloud environments.
195
196
```python
197
from toil.jobStores.aws.jobStore import AWSJobStore
198
from toil.common import Config
199
200
# AWS S3 job store configuration
201
config = Config()
202
config.jobStore = "aws:us-west-2:my-toil-bucket:workflow-123"
203
# Format: aws:region:bucket:path_prefix
204
205
# AWS credentials configuration (multiple options)
206
# Option 1: Environment variables
207
import os
208
os.environ['AWS_ACCESS_KEY_ID'] = 'your_access_key'
209
os.environ['AWS_SECRET_ACCESS_KEY'] = 'your_secret_key'
210
211
# Option 2: AWS credentials file
212
config.awsCredentials = "~/.aws/credentials"
213
214
# Option 3: IAM roles (for EC2 instances)
215
# No explicit credentials needed
216
217
# S3-specific settings
218
config.awsRegion = "us-west-2"
219
config.sseKey = "alias/my-kms-key" # KMS encryption
220
config.sseKeyFile = "/path/to/sse-key.txt" # Local encryption key
221
222
# Initialize AWS job store
223
aws_store = AWSJobStore(config.jobStore)
224
aws_store.initialize(config)
225
226
# S3 bucket structure:
227
# my-toil-bucket/
228
# └── workflow-123/
229
# ├── jobs/ # Job descriptions as JSON
230
# ├── files/ # Binary file storage
231
# ├── stats/ # Workflow statistics
232
# └── versions/ # Versioning metadata
233
234
# Working with S3 job store
235
job_desc = JobDescription(
236
requirements={"memory": 2*1024*1024*1024, "cores": 2, "disk": 5*1024*1024*1024},
237
jobName="s3_job"
238
)
239
240
# Operations are identical to file store but backed by S3
241
job = aws_store.create(job_desc)
242
243
# Large file handling optimized for S3
244
large_file = "/tmp/large_dataset.bin"
245
file_id = aws_store.writeFile(large_file) # Automatically uses multipart upload
246
247
# Concurrent access protection
248
try:
249
aws_store.update(job)
250
except ConcurrentFileModificationException:
251
# Handle concurrent modification
252
fresh_job = aws_store.load(job.jobStoreID)
253
# Retry update with fresh data
254
```
255
256
### Google Cloud Storage Job Store
257
{ .api }
258
259
The `GoogleJobStore` provides integration with Google Cloud Storage for Google Cloud Platform deployments.
260
261
```python
262
from toil.jobStores.googleJobStore import GoogleJobStore
263
from toil.common import Config
264
265
# Google Cloud Storage job store configuration
266
config = Config()
267
config.jobStore = "gce:us-central1:my-gcs-bucket:workflow-path"
268
# Format: gce:region:bucket:path_prefix
269
270
# Google Cloud authentication
271
# Option 1: Service account key file
272
config.googleCredentials = "/path/to/service-account.json"
273
274
# Option 2: Application default credentials
275
# gcloud auth application-default login
276
277
# Option 3: Service account on GCE instances
278
# Automatic authentication
279
280
# Initialize Google job store
281
gcs_store = GoogleJobStore(config.jobStore)
282
gcs_store.initialize(config)
283
284
# GCS bucket structure similar to S3:
285
# my-gcs-bucket/
286
# └── workflow-path/
287
# ├── jobs/ # Job metadata
288
# ├── files/ # File storage
289
# └── stats/ # Statistics
290
291
# Google-specific features
292
job_desc = JobDescription(
293
requirements={"memory": 4*1024*1024*1024, "cores": 4, "disk": 10*1024*1024*1024},
294
jobName="gcs_job"
295
)
296
297
job = gcs_store.create(job_desc)
298
299
# Efficient handling of Google Cloud native formats
300
file_id = gcs_store.writeFile("/tmp/data.csv")
301
302
# Integration with Google Cloud IAM
303
# Automatic handling of GCS permissions and access controls
304
```
305
306
### File Import and Export
307
{ .api }
308
309
Job stores support importing and exporting files from external sources and destinations.
310
311
```python
312
from toil.jobStores.abstractJobStore import AbstractJobStore
313
314
def demonstrate_import_export(job_store: AbstractJobStore):
315
"""Demonstrate file import/export capabilities."""
316
317
# Import from various sources
318
319
# Import from HTTP/HTTPS URL
320
http_file_id = job_store.importFile(
321
srcUrl="https://example.com/dataset.csv",
322
sharedFileName="shared_dataset.csv" # Optional shared name
323
)
324
325
# Import from FTP
326
ftp_file_id = job_store.importFile(
327
srcUrl="ftp://data.example.com/public/genome.fa"
328
)
329
330
# Import from S3 (from different job store)
331
s3_file_id = job_store.importFile(
332
srcUrl="s3://other-bucket/path/to/file.txt"
333
)
334
335
# Import from Google Cloud Storage
336
gcs_file_id = job_store.importFile(
337
srcUrl="gs://other-bucket/data/results.json"
338
)
339
340
# Import from local file system
341
local_file_id = job_store.importFile(
342
srcUrl="file:///absolute/path/to/local/file.dat"
343
)
344
345
# Export to various destinations
346
347
# Export to S3
348
job_store.exportFile(
349
jobStoreFileID=http_file_id,
350
dstUrl="s3://output-bucket/processed/dataset.csv"
351
)
352
353
# Export to Google Cloud Storage
354
job_store.exportFile(
355
jobStoreFileID=s3_file_id,
356
dstUrl="gs://results-bucket/analysis/output.txt"
357
)
358
359
# Export to HTTP endpoint (POST)
360
job_store.exportFile(
361
jobStoreFileID=ftp_file_id,
362
dstUrl="https://api.example.com/upload/genome"
363
)
364
365
# Export to local file system
366
job_store.exportFile(
367
jobStoreFileID=gcs_file_id,
368
dstUrl="file:///tmp/final_results.json"
369
)
370
371
# Bulk import/export operations
372
def bulk_file_operations(job_store: AbstractJobStore):
373
"""Handle multiple file operations efficiently."""
374
375
# Import multiple files
376
import_urls = [
377
"https://data.example.com/file1.csv",
378
"https://data.example.com/file2.csv",
379
"https://data.example.com/file3.csv"
380
]
381
382
imported_files = []
383
for url in import_urls:
384
file_id = job_store.importFile(url)
385
imported_files.append(file_id)
386
387
# Process files...
388
389
# Export results
390
export_destinations = [
391
"s3://results/output1.csv",
392
"s3://results/output2.csv",
393
"s3://results/output3.csv"
394
]
395
396
for file_id, dest_url in zip(imported_files, export_destinations):
397
job_store.exportFile(file_id, dest_url)
398
```
399
400
### Job Store Utilities and Management
401
{ .api }
402
403
Utilities for job store management, cleanup, and maintenance operations.
404
405
```python
406
from toil.jobStores.abstractJobStore import AbstractJobStore
407
from toil.common import Config
408
409
def job_store_utilities():
410
"""Demonstrate job store utility operations."""
411
412
config = Config()
413
config.jobStore = "file:/tmp/utility-demo"
414
415
# Get job store instance
416
job_store = AbstractJobStore.createJobStore(config.jobStore)
417
job_store.initialize(config)
418
419
# Job enumeration and statistics
420
total_jobs = 0
421
completed_jobs = 0
422
failed_jobs = 0
423
424
for job in job_store.jobs():
425
total_jobs += 1
426
if job.remainingTryCount == 0:
427
failed_jobs += 1
428
elif hasattr(job, 'completed') and job.completed:
429
completed_jobs += 1
430
431
print(f"Total jobs: {total_jobs}")
432
print(f"Completed: {completed_jobs}")
433
print(f"Failed: {failed_jobs}")
434
435
# File inventory
436
all_files = job_store.get_all_file_ids() # Implementation specific
437
total_size = 0
438
439
for file_id in all_files:
440
if job_store.fileExists(file_id):
441
file_size = job_store.getFileSize(file_id) # Implementation specific
442
total_size += file_size
443
444
print(f"Total files: {len(all_files)}")
445
print(f"Total size: {total_size / (1024*1024)} MB")
446
447
# Cleanup orphaned files
448
def cleanup_orphaned_files():
449
"""Remove files not associated with any job."""
450
active_job_ids = {job.jobStoreID for job in job_store.jobs()}
451
452
for file_id in all_files:
453
associated_job = job_store.get_file_job_association(file_id)
454
if associated_job not in active_job_ids:
455
print(f"Cleaning up orphaned file: {file_id}")
456
job_store.deleteFile(file_id)
457
458
# Job store migration between backends
459
def migrate_job_store(source_locator: str, dest_locator: str):
460
"""Migrate job store from one backend to another."""
461
462
source_config = Config()
463
source_config.jobStore = source_locator
464
source_store = AbstractJobStore.createJobStore(source_locator)
465
source_store.resume()
466
467
dest_config = Config()
468
dest_config.jobStore = dest_locator
469
dest_store = AbstractJobStore.createJobStore(dest_locator)
470
dest_store.initialize(dest_config)
471
472
# Migrate all jobs
473
for job in source_store.jobs():
474
dest_store.create(job)
475
476
# Migrate all files
477
for file_id in source_store.get_all_file_ids():
478
if source_store.fileExists(file_id):
479
# Read from source
480
temp_file = f"/tmp/migration_{file_id}"
481
source_store.readFile(file_id, temp_file)
482
483
# Write to destination
484
dest_store.writeFile(temp_file, jobStoreID=None)
485
486
# Cleanup temp file
487
os.unlink(temp_file)
488
489
print(f"Migration complete: {source_locator} -> {dest_locator}")
490
```
491
492
### Error Handling and Recovery
493
{ .api }
494
495
Comprehensive error handling for job store operations and data integrity.
496
497
```python
498
from toil.jobStores.abstractJobStore import (
499
NoSuchJobException,
500
NoSuchFileException,
501
ConcurrentFileModificationException,
502
JobStoreExistsException,
503
NoSuchJobStoreException
504
)
505
import logging
506
import time
507
508
def robust_job_store_operations(job_store: AbstractJobStore):
509
"""Demonstrate robust error handling for job store operations."""
510
511
def safe_job_update(job: JobDescription, max_retries: int = 3):
512
"""Update job with retry logic for concurrent modifications."""
513
514
for attempt in range(max_retries):
515
try:
516
job_store.update(job)
517
return True
518
519
except ConcurrentFileModificationException as e:
520
logging.warning(f"Concurrent modification attempt {attempt + 1}: {e}")
521
522
if attempt < max_retries - 1:
523
# Wait and reload fresh job state
524
time.sleep(0.1 * (2 ** attempt)) # Exponential backoff
525
fresh_job = job_store.load(job.jobStoreID)
526
# Merge changes if possible
527
job = merge_job_changes(job, fresh_job)
528
else:
529
logging.error("Failed to update job after max retries")
530
raise
531
532
except NoSuchJobException as e:
533
logging.error(f"Job no longer exists: {e}")
534
return False
535
536
return False
537
538
def safe_file_operations(file_operations: list):
539
"""Execute file operations with error recovery."""
540
541
completed_operations = []
542
543
for operation in file_operations:
544
try:
545
if operation['type'] == 'write':
546
file_id = job_store.writeFile(
547
operation['local_path'],
548
jobStoreID=operation.get('job_id')
549
)
550
completed_operations.append(('write', file_id))
551
552
elif operation['type'] == 'read':
553
job_store.readFile(
554
operation['file_id'],
555
operation['local_path']
556
)
557
completed_operations.append(('read', operation['file_id']))
558
559
elif operation['type'] == 'import':
560
file_id = job_store.importFile(
561
operation['src_url'],
562
sharedFileName=operation.get('shared_name')
563
)
564
completed_operations.append(('import', file_id))
565
566
except NoSuchFileException as e:
567
logging.error(f"File operation failed - file not found: {e}")
568
# Skip this operation, continue with others
569
570
except Exception as e:
571
logging.error(f"File operation failed: {e}")
572
# Rollback completed operations
573
rollback_file_operations(completed_operations)
574
raise
575
576
return completed_operations
577
578
def rollback_file_operations(operations: list):
579
"""Rollback completed file operations on error."""
580
581
for op_type, file_id in reversed(operations):
582
try:
583
if op_type in ('write', 'import'):
584
job_store.deleteFile(file_id)
585
logging.info(f"Rolled back {op_type} operation for file {file_id}")
586
except Exception as e:
587
logging.warning(f"Failed to rollback {op_type} for {file_id}: {e}")
588
589
def handle_job_store_initialization_errors():
590
"""Handle errors during job store initialization."""
591
592
config = Config()
593
config.jobStore = "aws:us-west-2:my-bucket:workflow-123"
594
595
try:
596
job_store = AbstractJobStore.createJobStore(config.jobStore)
597
job_store.initialize(config)
598
599
except JobStoreExistsException as e:
600
logging.info(f"Job store already exists, resuming: {e}")
601
job_store.resume()
602
603
except NoSuchJobStoreException as e:
604
logging.error(f"Job store not found, cannot resume: {e}")
605
# Create new job store
606
job_store.initialize(config)
607
608
except Exception as e:
609
logging.error(f"Failed to initialize job store: {e}")
610
# Try alternative job store location
611
config.jobStore = "file:/tmp/fallback-jobstore"
612
job_store = AbstractJobStore.createJobStore(config.jobStore)
613
job_store.initialize(config)
614
615
return job_store
616
```
617
618
This job store management system provides robust, scalable storage for workflow metadata and files across diverse storage backends with comprehensive error handling and recovery capabilities.