CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-toil

Pipeline management software for clusters.

Overall
score

67%

Overview
Eval results
Files

job-stores.mddocs/

Job Store Management

Overview

Toil's job store system provides persistent storage for workflow metadata, job descriptions, and intermediate files. Job stores abstract the underlying storage mechanism, allowing workflows to run with different backends including local file systems, cloud object storage (AWS S3, Google Cloud Storage), and distributed file systems. The job store maintains workflow state, enables fault tolerance through checkpointing, and facilitates workflow restart and recovery capabilities.

Capabilities

Abstract Job Store Interface

{ .api }

The AbstractJobStore defines the core interface that all job store implementations must provide.

from toil.jobStores.abstractJobStore import (
    AbstractJobStore,
    NoSuchJobException,
    NoSuchFileException,
    ConcurrentFileModificationException
)
from toil.job import JobDescription
from toil.common import Config
from typing import Iterator, Optional

class CustomJobStore(AbstractJobStore):
    """Custom job store implementation."""
    
    def initialize(self, config: Config) -> None:
        """Initialize job store with configuration."""
        self.config = config
        self.locator = config.jobStore
        # Set up storage backend
        self.setup_storage()
    
    def resume(self) -> None:
        """Resume from existing job store."""
        # Verify job store exists and is accessible
        if not self.exists():
            raise NoSuchJobStoreException(f"Job store not found: {self.locator}")
        # Load existing state
        self.load_state()
    
    def assignID(self, jobDescription: JobDescription) -> str:
        """Assign unique ID to job description."""
        job_id = self.generate_unique_id()
        jobDescription.jobStoreID = job_id
        return job_id
    
    def create(self, jobDescription: JobDescription) -> JobDescription:
        """Create and store new job."""
        if not hasattr(jobDescription, 'jobStoreID'):
            self.assignID(jobDescription)
        
        # Serialize and store job description
        job_data = self.serialize_job(jobDescription)
        self.store_job_data(jobDescription.jobStoreID, job_data)
        
        return jobDescription
    
    def update(self, job: JobDescription) -> None:
        """Update existing job description."""
        if not self.job_exists(job.jobStoreID):
            raise NoSuchJobException(f"Job not found: {job.jobStoreID}")
        
        job_data = self.serialize_job(job)
        self.store_job_data(job.jobStoreID, job_data)
    
    def load(self, jobStoreID: str) -> JobDescription:
        """Load job description by ID."""
        if not self.job_exists(jobStoreID):
            raise NoSuchJobException(f"Job not found: {jobStoreID}")
        
        job_data = self.load_job_data(jobStoreID)
        return self.deserialize_job(job_data)
    
    def delete(self, jobStoreID: str) -> None:
        """Delete job and associated data."""
        if not self.job_exists(jobStoreID):
            raise NoSuchJobException(f"Job not found: {jobStoreID}")
        
        # Delete job data and any associated files
        self.delete_job_data(jobStoreID)
        self.delete_job_files(jobStoreID)
    
    def jobs(self) -> Iterator[JobDescription]:
        """Iterate over all jobs in store."""
        for job_id in self.list_job_ids():
            yield self.load(job_id)
    
    def writeFile(self, localFilePath: str, jobStoreID: Optional[str] = None) -> str:
        """Write local file to job store."""
        file_id = self.generate_file_id()
        
        with open(localFilePath, 'rb') as local_file:
            file_data = local_file.read()
            
        self.store_file_data(file_id, file_data)
        
        # Associate file with job if specified
        if jobStoreID:
            self.associate_file_with_job(file_id, jobStoreID)
            
        return file_id
    
    def readFile(self, jobStoreFileID: str, localFilePath: str) -> None:
        """Read file from job store to local path."""
        if not self.file_exists(jobStoreFileID):
            raise NoSuchFileException(f"File not found: {jobStoreFileID}")
        
        file_data = self.load_file_data(jobStoreFileID)
        
        with open(localFilePath, 'wb') as local_file:
            local_file.write(file_data)
    
    def deleteFile(self, jobStoreFileID: str) -> None:
        """Delete file from job store."""
        if not self.file_exists(jobStoreFileID):
            raise NoSuchFileException(f"File not found: {jobStoreFileID}")
        
        self.delete_file_data(jobStoreFileID)
    
    def fileExists(self, jobStoreFileID: str) -> bool:
        """Check if file exists in job store."""
        return self.file_exists_impl(jobStoreFileID)

File-Based Job Store

{ .api }

The FileJobStore uses the local file system for storage, suitable for single-node deployments and shared file systems.

from toil.jobStores.fileJobStore import FileJobStore
from toil.common import Config
import os

# File job store configuration
config = Config()
config.jobStore = "file:/tmp/my-workflow-jobstore"  # Local directory

# Alternative: network file system
config.jobStore = "file:/shared/nfs/workflow-store"

# Initialize file job store
file_store = FileJobStore(config.jobStore)
file_store.initialize(config)

# File job store structure:
# /tmp/my-workflow-jobstore/
# ├── jobs/           # Job descriptions
# ├── files/          # Stored files
# ├── stats/          # Statistics files
# └── tmp/            # Temporary files

# Working with file job store
from toil.job import JobDescription

# Create job description
job_desc = JobDescription(
    requirements={"memory": 1024*1024*1024, "cores": 1, "disk": 1024*1024*1024},
    jobName="test_job",
    unitName="test_unit"
)

# Store job
created_job = file_store.create(job_desc)
job_id = created_job.jobStoreID

# Update job
created_job.remainingTryCount = 2
file_store.update(created_job)

# Load job
loaded_job = file_store.load(job_id)

# Store file
test_file = "/tmp/input.txt"
with open(test_file, 'w') as f:
    f.write("test data")

file_id = file_store.writeFile(test_file, jobStoreID=job_id)

# Read file back
output_file = "/tmp/output.txt"
file_store.readFile(file_id, output_file)

# Cleanup
file_store.deleteFile(file_id)
file_store.delete(job_id)

AWS S3 Job Store

{ .api }

The AWSJobStore uses Amazon S3 for scalable, distributed storage in cloud environments.

from toil.jobStores.aws.jobStore import AWSJobStore
from toil.common import Config

# AWS S3 job store configuration
config = Config()
config.jobStore = "aws:us-west-2:my-toil-bucket:workflow-123"
# Format: aws:region:bucket:path_prefix

# AWS credentials configuration (multiple options)
# Option 1: Environment variables
import os
os.environ['AWS_ACCESS_KEY_ID'] = 'your_access_key'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'your_secret_key'

# Option 2: AWS credentials file
config.awsCredentials = "~/.aws/credentials"

# Option 3: IAM roles (for EC2 instances)
# No explicit credentials needed

# S3-specific settings
config.awsRegion = "us-west-2"
config.sseKey = "alias/my-kms-key"  # KMS encryption
config.sseKeyFile = "/path/to/sse-key.txt"  # Local encryption key

# Initialize AWS job store
aws_store = AWSJobStore(config.jobStore)
aws_store.initialize(config)

# S3 bucket structure:
# my-toil-bucket/
# └── workflow-123/
#     ├── jobs/           # Job descriptions as JSON
#     ├── files/          # Binary file storage  
#     ├── stats/          # Workflow statistics
#     └── versions/       # Versioning metadata

# Working with S3 job store
job_desc = JobDescription(
    requirements={"memory": 2*1024*1024*1024, "cores": 2, "disk": 5*1024*1024*1024},
    jobName="s3_job"
)

# Operations are identical to file store but backed by S3
job = aws_store.create(job_desc)

# Large file handling optimized for S3
large_file = "/tmp/large_dataset.bin"
file_id = aws_store.writeFile(large_file)  # Automatically uses multipart upload

# Concurrent access protection
try:
    aws_store.update(job)
except ConcurrentFileModificationException:
    # Handle concurrent modification
    fresh_job = aws_store.load(job.jobStoreID)
    # Retry update with fresh data

Google Cloud Storage Job Store

{ .api }

The GoogleJobStore provides integration with Google Cloud Storage for Google Cloud Platform deployments.

from toil.jobStores.googleJobStore import GoogleJobStore  
from toil.common import Config

# Google Cloud Storage job store configuration
config = Config()
config.jobStore = "gce:us-central1:my-gcs-bucket:workflow-path"
# Format: gce:region:bucket:path_prefix

# Google Cloud authentication
# Option 1: Service account key file
config.googleCredentials = "/path/to/service-account.json"

# Option 2: Application default credentials
# gcloud auth application-default login

# Option 3: Service account on GCE instances
# Automatic authentication

# Initialize Google job store
gcs_store = GoogleJobStore(config.jobStore)
gcs_store.initialize(config)

# GCS bucket structure similar to S3:
# my-gcs-bucket/
# └── workflow-path/
#     ├── jobs/           # Job metadata
#     ├── files/          # File storage
#     └── stats/          # Statistics

# Google-specific features
job_desc = JobDescription(
    requirements={"memory": 4*1024*1024*1024, "cores": 4, "disk": 10*1024*1024*1024},
    jobName="gcs_job"
)

job = gcs_store.create(job_desc)

# Efficient handling of Google Cloud native formats
file_id = gcs_store.writeFile("/tmp/data.csv")

# Integration with Google Cloud IAM
# Automatic handling of GCS permissions and access controls

File Import and Export

{ .api }

Job stores support importing and exporting files from external sources and destinations.

from toil.jobStores.abstractJobStore import AbstractJobStore

def demonstrate_import_export(job_store: AbstractJobStore):
    """Demonstrate file import/export capabilities."""
    
    # Import from various sources
    
    # Import from HTTP/HTTPS URL
    http_file_id = job_store.importFile(
        srcUrl="https://example.com/dataset.csv",
        sharedFileName="shared_dataset.csv"  # Optional shared name
    )
    
    # Import from FTP
    ftp_file_id = job_store.importFile(
        srcUrl="ftp://data.example.com/public/genome.fa"
    )
    
    # Import from S3 (from different job store)
    s3_file_id = job_store.importFile(
        srcUrl="s3://other-bucket/path/to/file.txt"
    )
    
    # Import from Google Cloud Storage
    gcs_file_id = job_store.importFile(
        srcUrl="gs://other-bucket/data/results.json"
    )
    
    # Import from local file system
    local_file_id = job_store.importFile(
        srcUrl="file:///absolute/path/to/local/file.dat"
    )
    
    # Export to various destinations
    
    # Export to S3
    job_store.exportFile(
        jobStoreFileID=http_file_id,
        dstUrl="s3://output-bucket/processed/dataset.csv"
    )
    
    # Export to Google Cloud Storage  
    job_store.exportFile(
        jobStoreFileID=s3_file_id,
        dstUrl="gs://results-bucket/analysis/output.txt"
    )
    
    # Export to HTTP endpoint (POST)
    job_store.exportFile(
        jobStoreFileID=ftp_file_id,
        dstUrl="https://api.example.com/upload/genome"
    )
    
    # Export to local file system
    job_store.exportFile(
        jobStoreFileID=gcs_file_id,
        dstUrl="file:///tmp/final_results.json"
    )

# Bulk import/export operations
def bulk_file_operations(job_store: AbstractJobStore):
    """Handle multiple file operations efficiently."""
    
    # Import multiple files
    import_urls = [
        "https://data.example.com/file1.csv",
        "https://data.example.com/file2.csv", 
        "https://data.example.com/file3.csv"
    ]
    
    imported_files = []
    for url in import_urls:
        file_id = job_store.importFile(url)
        imported_files.append(file_id)
    
    # Process files...
    
    # Export results
    export_destinations = [
        "s3://results/output1.csv",
        "s3://results/output2.csv",
        "s3://results/output3.csv"
    ]
    
    for file_id, dest_url in zip(imported_files, export_destinations):
        job_store.exportFile(file_id, dest_url)

Job Store Utilities and Management

{ .api }

Utilities for job store management, cleanup, and maintenance operations.

from toil.jobStores.abstractJobStore import AbstractJobStore
from toil.common import Config

def job_store_utilities():
    """Demonstrate job store utility operations."""
    
    config = Config()
    config.jobStore = "file:/tmp/utility-demo"
    
    # Get job store instance
    job_store = AbstractJobStore.createJobStore(config.jobStore)
    job_store.initialize(config)
    
    # Job enumeration and statistics
    total_jobs = 0
    completed_jobs = 0
    failed_jobs = 0
    
    for job in job_store.jobs():
        total_jobs += 1
        if job.remainingTryCount == 0:
            failed_jobs += 1
        elif hasattr(job, 'completed') and job.completed:
            completed_jobs += 1
    
    print(f"Total jobs: {total_jobs}")
    print(f"Completed: {completed_jobs}")
    print(f"Failed: {failed_jobs}")
    
    # File inventory
    all_files = job_store.get_all_file_ids()  # Implementation specific
    total_size = 0
    
    for file_id in all_files:
        if job_store.fileExists(file_id):
            file_size = job_store.getFileSize(file_id)  # Implementation specific
            total_size += file_size
    
    print(f"Total files: {len(all_files)}")
    print(f"Total size: {total_size / (1024*1024)} MB")
    
    # Cleanup orphaned files
    def cleanup_orphaned_files():
        """Remove files not associated with any job."""
        active_job_ids = {job.jobStoreID for job in job_store.jobs()}
        
        for file_id in all_files:
            associated_job = job_store.get_file_job_association(file_id)
            if associated_job not in active_job_ids:
                print(f"Cleaning up orphaned file: {file_id}")
                job_store.deleteFile(file_id)

# Job store migration between backends
def migrate_job_store(source_locator: str, dest_locator: str):
    """Migrate job store from one backend to another."""
    
    source_config = Config()
    source_config.jobStore = source_locator
    source_store = AbstractJobStore.createJobStore(source_locator)
    source_store.resume()
    
    dest_config = Config()  
    dest_config.jobStore = dest_locator
    dest_store = AbstractJobStore.createJobStore(dest_locator)
    dest_store.initialize(dest_config)
    
    # Migrate all jobs
    for job in source_store.jobs():
        dest_store.create(job)
    
    # Migrate all files
    for file_id in source_store.get_all_file_ids():
        if source_store.fileExists(file_id):
            # Read from source
            temp_file = f"/tmp/migration_{file_id}"
            source_store.readFile(file_id, temp_file)
            
            # Write to destination
            dest_store.writeFile(temp_file, jobStoreID=None)
            
            # Cleanup temp file
            os.unlink(temp_file)
    
    print(f"Migration complete: {source_locator} -> {dest_locator}")

Error Handling and Recovery

{ .api }

Comprehensive error handling for job store operations and data integrity.

from toil.jobStores.abstractJobStore import (
    NoSuchJobException,
    NoSuchFileException,
    ConcurrentFileModificationException,
    JobStoreExistsException,
    NoSuchJobStoreException
)
import logging
import time

def robust_job_store_operations(job_store: AbstractJobStore):
    """Demonstrate robust error handling for job store operations."""
    
    def safe_job_update(job: JobDescription, max_retries: int = 3):
        """Update job with retry logic for concurrent modifications."""
        
        for attempt in range(max_retries):
            try:
                job_store.update(job)
                return True
                
            except ConcurrentFileModificationException as e:
                logging.warning(f"Concurrent modification attempt {attempt + 1}: {e}")
                
                if attempt < max_retries - 1:
                    # Wait and reload fresh job state
                    time.sleep(0.1 * (2 ** attempt))  # Exponential backoff
                    fresh_job = job_store.load(job.jobStoreID)
                    # Merge changes if possible
                    job = merge_job_changes(job, fresh_job)
                else:
                    logging.error("Failed to update job after max retries")
                    raise
                    
            except NoSuchJobException as e:
                logging.error(f"Job no longer exists: {e}")
                return False
                
        return False
    
    def safe_file_operations(file_operations: list):
        """Execute file operations with error recovery."""
        
        completed_operations = []
        
        for operation in file_operations:
            try:
                if operation['type'] == 'write':
                    file_id = job_store.writeFile(
                        operation['local_path'],
                        jobStoreID=operation.get('job_id')
                    )
                    completed_operations.append(('write', file_id))
                    
                elif operation['type'] == 'read':
                    job_store.readFile(
                        operation['file_id'],
                        operation['local_path']
                    )
                    completed_operations.append(('read', operation['file_id']))
                    
                elif operation['type'] == 'import':
                    file_id = job_store.importFile(
                        operation['src_url'],
                        sharedFileName=operation.get('shared_name')
                    )
                    completed_operations.append(('import', file_id))
                    
            except NoSuchFileException as e:
                logging.error(f"File operation failed - file not found: {e}")
                # Skip this operation, continue with others
                
            except Exception as e:
                logging.error(f"File operation failed: {e}")
                # Rollback completed operations
                rollback_file_operations(completed_operations)
                raise
                
        return completed_operations
    
    def rollback_file_operations(operations: list):
        """Rollback completed file operations on error."""
        
        for op_type, file_id in reversed(operations):
            try:
                if op_type in ('write', 'import'):
                    job_store.deleteFile(file_id)
                    logging.info(f"Rolled back {op_type} operation for file {file_id}")
            except Exception as e:
                logging.warning(f"Failed to rollback {op_type} for {file_id}: {e}")

def handle_job_store_initialization_errors():
    """Handle errors during job store initialization."""
    
    config = Config()
    config.jobStore = "aws:us-west-2:my-bucket:workflow-123"
    
    try:
        job_store = AbstractJobStore.createJobStore(config.jobStore)
        job_store.initialize(config)
        
    except JobStoreExistsException as e:
        logging.info(f"Job store already exists, resuming: {e}")
        job_store.resume()
        
    except NoSuchJobStoreException as e:
        logging.error(f"Job store not found, cannot resume: {e}")
        # Create new job store
        job_store.initialize(config)
        
    except Exception as e:
        logging.error(f"Failed to initialize job store: {e}")
        # Try alternative job store location
        config.jobStore = "file:/tmp/fallback-jobstore"
        job_store = AbstractJobStore.createJobStore(config.jobStore)
        job_store.initialize(config)
        
    return job_store

This job store management system provides robust, scalable storage for workflow metadata and files across diverse storage backends with comprehensive error handling and recovery capabilities.

Install with Tessl CLI

npx tessl i tessl/pypi-toil

docs

batch-systems.md

core-workflow.md

file-management.md

index.md

job-stores.md

provisioning.md

utilities.md

workflow-languages.md

tile.json