Pipeline management software for clusters.
Overall
score
67%
Toil's job store system provides persistent storage for workflow metadata, job descriptions, and intermediate files. Job stores abstract the underlying storage mechanism, allowing workflows to run with different backends including local file systems, cloud object storage (AWS S3, Google Cloud Storage), and distributed file systems. The job store maintains workflow state, enables fault tolerance through checkpointing, and facilitates workflow restart and recovery capabilities.
{ .api }
The AbstractJobStore defines the core interface that all job store implementations must provide.
from toil.jobStores.abstractJobStore import (
AbstractJobStore,
NoSuchJobException,
NoSuchFileException,
ConcurrentFileModificationException
)
from toil.job import JobDescription
from toil.common import Config
from typing import Iterator, Optional
class CustomJobStore(AbstractJobStore):
"""Custom job store implementation."""
def initialize(self, config: Config) -> None:
"""Initialize job store with configuration."""
self.config = config
self.locator = config.jobStore
# Set up storage backend
self.setup_storage()
def resume(self) -> None:
"""Resume from existing job store."""
# Verify job store exists and is accessible
if not self.exists():
raise NoSuchJobStoreException(f"Job store not found: {self.locator}")
# Load existing state
self.load_state()
def assignID(self, jobDescription: JobDescription) -> str:
"""Assign unique ID to job description."""
job_id = self.generate_unique_id()
jobDescription.jobStoreID = job_id
return job_id
def create(self, jobDescription: JobDescription) -> JobDescription:
"""Create and store new job."""
if not hasattr(jobDescription, 'jobStoreID'):
self.assignID(jobDescription)
# Serialize and store job description
job_data = self.serialize_job(jobDescription)
self.store_job_data(jobDescription.jobStoreID, job_data)
return jobDescription
def update(self, job: JobDescription) -> None:
"""Update existing job description."""
if not self.job_exists(job.jobStoreID):
raise NoSuchJobException(f"Job not found: {job.jobStoreID}")
job_data = self.serialize_job(job)
self.store_job_data(job.jobStoreID, job_data)
def load(self, jobStoreID: str) -> JobDescription:
"""Load job description by ID."""
if not self.job_exists(jobStoreID):
raise NoSuchJobException(f"Job not found: {jobStoreID}")
job_data = self.load_job_data(jobStoreID)
return self.deserialize_job(job_data)
def delete(self, jobStoreID: str) -> None:
"""Delete job and associated data."""
if not self.job_exists(jobStoreID):
raise NoSuchJobException(f"Job not found: {jobStoreID}")
# Delete job data and any associated files
self.delete_job_data(jobStoreID)
self.delete_job_files(jobStoreID)
def jobs(self) -> Iterator[JobDescription]:
"""Iterate over all jobs in store."""
for job_id in self.list_job_ids():
yield self.load(job_id)
def writeFile(self, localFilePath: str, jobStoreID: Optional[str] = None) -> str:
"""Write local file to job store."""
file_id = self.generate_file_id()
with open(localFilePath, 'rb') as local_file:
file_data = local_file.read()
self.store_file_data(file_id, file_data)
# Associate file with job if specified
if jobStoreID:
self.associate_file_with_job(file_id, jobStoreID)
return file_id
def readFile(self, jobStoreFileID: str, localFilePath: str) -> None:
"""Read file from job store to local path."""
if not self.file_exists(jobStoreFileID):
raise NoSuchFileException(f"File not found: {jobStoreFileID}")
file_data = self.load_file_data(jobStoreFileID)
with open(localFilePath, 'wb') as local_file:
local_file.write(file_data)
def deleteFile(self, jobStoreFileID: str) -> None:
"""Delete file from job store."""
if not self.file_exists(jobStoreFileID):
raise NoSuchFileException(f"File not found: {jobStoreFileID}")
self.delete_file_data(jobStoreFileID)
def fileExists(self, jobStoreFileID: str) -> bool:
"""Check if file exists in job store."""
return self.file_exists_impl(jobStoreFileID){ .api }
The FileJobStore uses the local file system for storage, suitable for single-node deployments and shared file systems.
from toil.jobStores.fileJobStore import FileJobStore
from toil.common import Config
import os
# File job store configuration
config = Config()
config.jobStore = "file:/tmp/my-workflow-jobstore" # Local directory
# Alternative: network file system
config.jobStore = "file:/shared/nfs/workflow-store"
# Initialize file job store
file_store = FileJobStore(config.jobStore)
file_store.initialize(config)
# File job store structure:
# /tmp/my-workflow-jobstore/
# ├── jobs/ # Job descriptions
# ├── files/ # Stored files
# ├── stats/ # Statistics files
# └── tmp/ # Temporary files
# Working with file job store
from toil.job import JobDescription
# Create job description
job_desc = JobDescription(
requirements={"memory": 1024*1024*1024, "cores": 1, "disk": 1024*1024*1024},
jobName="test_job",
unitName="test_unit"
)
# Store job
created_job = file_store.create(job_desc)
job_id = created_job.jobStoreID
# Update job
created_job.remainingTryCount = 2
file_store.update(created_job)
# Load job
loaded_job = file_store.load(job_id)
# Store file
test_file = "/tmp/input.txt"
with open(test_file, 'w') as f:
f.write("test data")
file_id = file_store.writeFile(test_file, jobStoreID=job_id)
# Read file back
output_file = "/tmp/output.txt"
file_store.readFile(file_id, output_file)
# Cleanup
file_store.deleteFile(file_id)
file_store.delete(job_id){ .api }
The AWSJobStore uses Amazon S3 for scalable, distributed storage in cloud environments.
from toil.jobStores.aws.jobStore import AWSJobStore
from toil.common import Config
# AWS S3 job store configuration
config = Config()
config.jobStore = "aws:us-west-2:my-toil-bucket:workflow-123"
# Format: aws:region:bucket:path_prefix
# AWS credentials configuration (multiple options)
# Option 1: Environment variables
import os
os.environ['AWS_ACCESS_KEY_ID'] = 'your_access_key'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'your_secret_key'
# Option 2: AWS credentials file
config.awsCredentials = "~/.aws/credentials"
# Option 3: IAM roles (for EC2 instances)
# No explicit credentials needed
# S3-specific settings
config.awsRegion = "us-west-2"
config.sseKey = "alias/my-kms-key" # KMS encryption
config.sseKeyFile = "/path/to/sse-key.txt" # Local encryption key
# Initialize AWS job store
aws_store = AWSJobStore(config.jobStore)
aws_store.initialize(config)
# S3 bucket structure:
# my-toil-bucket/
# └── workflow-123/
# ├── jobs/ # Job descriptions as JSON
# ├── files/ # Binary file storage
# ├── stats/ # Workflow statistics
# └── versions/ # Versioning metadata
# Working with S3 job store
job_desc = JobDescription(
requirements={"memory": 2*1024*1024*1024, "cores": 2, "disk": 5*1024*1024*1024},
jobName="s3_job"
)
# Operations are identical to file store but backed by S3
job = aws_store.create(job_desc)
# Large file handling optimized for S3
large_file = "/tmp/large_dataset.bin"
file_id = aws_store.writeFile(large_file) # Automatically uses multipart upload
# Concurrent access protection
try:
aws_store.update(job)
except ConcurrentFileModificationException:
# Handle concurrent modification
fresh_job = aws_store.load(job.jobStoreID)
# Retry update with fresh data{ .api }
The GoogleJobStore provides integration with Google Cloud Storage for Google Cloud Platform deployments.
from toil.jobStores.googleJobStore import GoogleJobStore
from toil.common import Config
# Google Cloud Storage job store configuration
config = Config()
config.jobStore = "gce:us-central1:my-gcs-bucket:workflow-path"
# Format: gce:region:bucket:path_prefix
# Google Cloud authentication
# Option 1: Service account key file
config.googleCredentials = "/path/to/service-account.json"
# Option 2: Application default credentials
# gcloud auth application-default login
# Option 3: Service account on GCE instances
# Automatic authentication
# Initialize Google job store
gcs_store = GoogleJobStore(config.jobStore)
gcs_store.initialize(config)
# GCS bucket structure similar to S3:
# my-gcs-bucket/
# └── workflow-path/
# ├── jobs/ # Job metadata
# ├── files/ # File storage
# └── stats/ # Statistics
# Google-specific features
job_desc = JobDescription(
requirements={"memory": 4*1024*1024*1024, "cores": 4, "disk": 10*1024*1024*1024},
jobName="gcs_job"
)
job = gcs_store.create(job_desc)
# Efficient handling of Google Cloud native formats
file_id = gcs_store.writeFile("/tmp/data.csv")
# Integration with Google Cloud IAM
# Automatic handling of GCS permissions and access controls{ .api }
Job stores support importing and exporting files from external sources and destinations.
from toil.jobStores.abstractJobStore import AbstractJobStore
def demonstrate_import_export(job_store: AbstractJobStore):
"""Demonstrate file import/export capabilities."""
# Import from various sources
# Import from HTTP/HTTPS URL
http_file_id = job_store.importFile(
srcUrl="https://example.com/dataset.csv",
sharedFileName="shared_dataset.csv" # Optional shared name
)
# Import from FTP
ftp_file_id = job_store.importFile(
srcUrl="ftp://data.example.com/public/genome.fa"
)
# Import from S3 (from different job store)
s3_file_id = job_store.importFile(
srcUrl="s3://other-bucket/path/to/file.txt"
)
# Import from Google Cloud Storage
gcs_file_id = job_store.importFile(
srcUrl="gs://other-bucket/data/results.json"
)
# Import from local file system
local_file_id = job_store.importFile(
srcUrl="file:///absolute/path/to/local/file.dat"
)
# Export to various destinations
# Export to S3
job_store.exportFile(
jobStoreFileID=http_file_id,
dstUrl="s3://output-bucket/processed/dataset.csv"
)
# Export to Google Cloud Storage
job_store.exportFile(
jobStoreFileID=s3_file_id,
dstUrl="gs://results-bucket/analysis/output.txt"
)
# Export to HTTP endpoint (POST)
job_store.exportFile(
jobStoreFileID=ftp_file_id,
dstUrl="https://api.example.com/upload/genome"
)
# Export to local file system
job_store.exportFile(
jobStoreFileID=gcs_file_id,
dstUrl="file:///tmp/final_results.json"
)
# Bulk import/export operations
def bulk_file_operations(job_store: AbstractJobStore):
"""Handle multiple file operations efficiently."""
# Import multiple files
import_urls = [
"https://data.example.com/file1.csv",
"https://data.example.com/file2.csv",
"https://data.example.com/file3.csv"
]
imported_files = []
for url in import_urls:
file_id = job_store.importFile(url)
imported_files.append(file_id)
# Process files...
# Export results
export_destinations = [
"s3://results/output1.csv",
"s3://results/output2.csv",
"s3://results/output3.csv"
]
for file_id, dest_url in zip(imported_files, export_destinations):
job_store.exportFile(file_id, dest_url){ .api }
Utilities for job store management, cleanup, and maintenance operations.
from toil.jobStores.abstractJobStore import AbstractJobStore
from toil.common import Config
def job_store_utilities():
"""Demonstrate job store utility operations."""
config = Config()
config.jobStore = "file:/tmp/utility-demo"
# Get job store instance
job_store = AbstractJobStore.createJobStore(config.jobStore)
job_store.initialize(config)
# Job enumeration and statistics
total_jobs = 0
completed_jobs = 0
failed_jobs = 0
for job in job_store.jobs():
total_jobs += 1
if job.remainingTryCount == 0:
failed_jobs += 1
elif hasattr(job, 'completed') and job.completed:
completed_jobs += 1
print(f"Total jobs: {total_jobs}")
print(f"Completed: {completed_jobs}")
print(f"Failed: {failed_jobs}")
# File inventory
all_files = job_store.get_all_file_ids() # Implementation specific
total_size = 0
for file_id in all_files:
if job_store.fileExists(file_id):
file_size = job_store.getFileSize(file_id) # Implementation specific
total_size += file_size
print(f"Total files: {len(all_files)}")
print(f"Total size: {total_size / (1024*1024)} MB")
# Cleanup orphaned files
def cleanup_orphaned_files():
"""Remove files not associated with any job."""
active_job_ids = {job.jobStoreID for job in job_store.jobs()}
for file_id in all_files:
associated_job = job_store.get_file_job_association(file_id)
if associated_job not in active_job_ids:
print(f"Cleaning up orphaned file: {file_id}")
job_store.deleteFile(file_id)
# Job store migration between backends
def migrate_job_store(source_locator: str, dest_locator: str):
"""Migrate job store from one backend to another."""
source_config = Config()
source_config.jobStore = source_locator
source_store = AbstractJobStore.createJobStore(source_locator)
source_store.resume()
dest_config = Config()
dest_config.jobStore = dest_locator
dest_store = AbstractJobStore.createJobStore(dest_locator)
dest_store.initialize(dest_config)
# Migrate all jobs
for job in source_store.jobs():
dest_store.create(job)
# Migrate all files
for file_id in source_store.get_all_file_ids():
if source_store.fileExists(file_id):
# Read from source
temp_file = f"/tmp/migration_{file_id}"
source_store.readFile(file_id, temp_file)
# Write to destination
dest_store.writeFile(temp_file, jobStoreID=None)
# Cleanup temp file
os.unlink(temp_file)
print(f"Migration complete: {source_locator} -> {dest_locator}"){ .api }
Comprehensive error handling for job store operations and data integrity.
from toil.jobStores.abstractJobStore import (
NoSuchJobException,
NoSuchFileException,
ConcurrentFileModificationException,
JobStoreExistsException,
NoSuchJobStoreException
)
import logging
import time
def robust_job_store_operations(job_store: AbstractJobStore):
"""Demonstrate robust error handling for job store operations."""
def safe_job_update(job: JobDescription, max_retries: int = 3):
"""Update job with retry logic for concurrent modifications."""
for attempt in range(max_retries):
try:
job_store.update(job)
return True
except ConcurrentFileModificationException as e:
logging.warning(f"Concurrent modification attempt {attempt + 1}: {e}")
if attempt < max_retries - 1:
# Wait and reload fresh job state
time.sleep(0.1 * (2 ** attempt)) # Exponential backoff
fresh_job = job_store.load(job.jobStoreID)
# Merge changes if possible
job = merge_job_changes(job, fresh_job)
else:
logging.error("Failed to update job after max retries")
raise
except NoSuchJobException as e:
logging.error(f"Job no longer exists: {e}")
return False
return False
def safe_file_operations(file_operations: list):
"""Execute file operations with error recovery."""
completed_operations = []
for operation in file_operations:
try:
if operation['type'] == 'write':
file_id = job_store.writeFile(
operation['local_path'],
jobStoreID=operation.get('job_id')
)
completed_operations.append(('write', file_id))
elif operation['type'] == 'read':
job_store.readFile(
operation['file_id'],
operation['local_path']
)
completed_operations.append(('read', operation['file_id']))
elif operation['type'] == 'import':
file_id = job_store.importFile(
operation['src_url'],
sharedFileName=operation.get('shared_name')
)
completed_operations.append(('import', file_id))
except NoSuchFileException as e:
logging.error(f"File operation failed - file not found: {e}")
# Skip this operation, continue with others
except Exception as e:
logging.error(f"File operation failed: {e}")
# Rollback completed operations
rollback_file_operations(completed_operations)
raise
return completed_operations
def rollback_file_operations(operations: list):
"""Rollback completed file operations on error."""
for op_type, file_id in reversed(operations):
try:
if op_type in ('write', 'import'):
job_store.deleteFile(file_id)
logging.info(f"Rolled back {op_type} operation for file {file_id}")
except Exception as e:
logging.warning(f"Failed to rollback {op_type} for {file_id}: {e}")
def handle_job_store_initialization_errors():
"""Handle errors during job store initialization."""
config = Config()
config.jobStore = "aws:us-west-2:my-bucket:workflow-123"
try:
job_store = AbstractJobStore.createJobStore(config.jobStore)
job_store.initialize(config)
except JobStoreExistsException as e:
logging.info(f"Job store already exists, resuming: {e}")
job_store.resume()
except NoSuchJobStoreException as e:
logging.error(f"Job store not found, cannot resume: {e}")
# Create new job store
job_store.initialize(config)
except Exception as e:
logging.error(f"Failed to initialize job store: {e}")
# Try alternative job store location
config.jobStore = "file:/tmp/fallback-jobstore"
job_store = AbstractJobStore.createJobStore(config.jobStore)
job_store.initialize(config)
return job_storeThis job store management system provides robust, scalable storage for workflow metadata and files across diverse storage backends with comprehensive error handling and recovery capabilities.
Install with Tessl CLI
npx tessl i tessl/pypi-toildocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10