CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-toil

Pipeline management software for clusters.

Overall
score

67%

Overview
Eval results
Files

core-workflow.mddocs/

Core Workflow Management

Overview

Toil's core workflow management provides the fundamental building blocks for creating, scheduling, and executing computational pipelines. The system centers around three key concepts: Jobs (units of work), Promises (result handling), and the Workflow Context (execution environment). This enables creation of complex DAG (Directed Acyclic Graph) workflows with sophisticated resource management and error handling.

Capabilities

Job Definition and Execution

{ .api }

The Job class is the fundamental unit of work in Toil workflows. Jobs can be defined as classes or using function decorators.

from toil.job import Job
from toil.fileStores import AbstractFileStore

# Class-based job definition
class ProcessingJob(Job):
    def __init__(self, input_data, memory=None, cores=None, disk=None, 
                 accelerators=None, preemptible=True, checkpoint=False, 
                 displayName=None):
        # Resource requirements: memory/disk in bytes, cores as int/float
        super().__init__(
            memory=memory or 1024*1024*1024,  # 1GB default
            cores=cores or 1,                  # 1 core default  
            disk=disk or 1024*1024*1024,      # 1GB default
            accelerators=accelerators or [],   # No accelerators default
            preemptible=preemptible,          # Allow preemption
            checkpoint=checkpoint,            # Checkpointing disabled
            displayName=displayName          # Job display name
        )
        self.input_data = input_data
        
    def run(self, fileStore: AbstractFileStore) -> str:
        """Execute job logic. Must return serializable result."""
        fileStore.logToMaster(f"Processing: {self.input_data}")
        # Perform actual work
        result = self.input_data.upper()
        return result

# Function-based job definition
@Job.wrapJobFn
def simple_task(job, data, multiplier=2):
    """Function automatically wrapped as job with default resources."""
    job.fileStore.logToMaster(f"Task processing: {data}")
    return data * multiplier

# Custom resource function job
@Job.wrapJobFn(memory="2G", cores=2, disk="500M")
def resource_intensive_task(job, large_dataset):
    """Function with explicit resource requirements."""
    return process_large_data(large_dataset)

Resource Requirements and Accelerators

{ .api }

Toil supports detailed resource specifications including GPU/accelerator requirements.

from toil.job import Job, AcceleratorRequirement, parse_accelerator

# GPU-enabled job
class MLTrainingJob(Job):
    def __init__(self, model_config):
        # Define GPU requirements
        gpu_requirement: AcceleratorRequirement = {
            "count": 2,                    # Number of GPUs
            "kind": "gpu",                # Accelerator type
            "model": "Tesla V100",        # Specific GPU model (optional)
            "brand": "nvidia",            # GPU brand (optional)  
            "api": "cuda"                 # API interface (optional)
        }
        
        super().__init__(
            memory=16*1024*1024*1024,     # 16GB RAM
            cores=8,                      # 8 CPU cores
            disk=100*1024*1024*1024,      # 100GB disk
            accelerators=[gpu_requirement],
            preemptible=False             # Don't preempt GPU jobs
        )
        self.model_config = model_config
        
    def run(self, fileStore):
        # Access GPU resources for training
        return train_model(self.model_config)

# Parse accelerator from string specification  
gpu_spec = parse_accelerator("2:nvidia:tesla_v100:cuda")
# Returns: {"count": 2, "kind": "gpu", "brand": "nvidia", "model": "tesla_v100", "api": "cuda"}

Job Scheduling and Dependencies

{ .api }

Toil provides flexible job scheduling patterns including sequential, parallel, and conditional execution.

from toil.job import Job

class WorkflowController(Job):
    def run(self, fileStore):
        # Parallel execution - children run concurrently
        child1 = DataPreprocessingJob("dataset1")
        child2 = DataPreprocessingJob("dataset2") 
        child3 = DataPreprocessingJob("dataset3")
        
        self.addChild(child1)
        self.addChild(child2)
        self.addChild(child3)
        
        # Sequential execution - follow-on runs after children complete
        merge_job = MergeDataJob()
        self.addFollowOn(merge_job)
        
        # Analysis runs after merge completes
        analysis_job = AnalysisJob()
        merge_job.addFollowOn(analysis_job)
        
        # Service job - runs alongside other jobs
        monitoring_service = MonitoringService()
        self.addService(monitoring_service)
        
        return "Workflow initiated"

# Conditional job execution based on results
@Job.wrapJobFn  
def conditional_processor(job, input_file):
    # Check input characteristics
    if needs_preprocessing(input_file):
        preprocess_job = PreprocessingJob(input_file)
        job.addChild(preprocess_job)
        return preprocess_job.rv()  # Return preprocessed result
    else:
        return input_file  # Return original file

Promise-Based Result Handling

{ .api }

Promises enable jobs to reference results from other jobs before they complete execution.

from toil.job import Job, Promise

class PipelineJob(Job):
    def run(self, fileStore):
        # Create processing jobs
        step1 = ProcessStep1Job("input_data")
        step2 = ProcessStep2Job()  
        step3 = ProcessStep3Job()
        
        # Chain jobs using promises
        self.addChild(step1)
        
        # step2 will receive result of step1 when it completes
        step2_with_input = Job.wrapJobFn(step2.run, step1.rv())
        step1.addFollowOn(step2_with_input)
        
        # step3 receives results from both step1 and step2  
        step3_with_inputs = Job.wrapJobFn(
            step3.run, 
            step1.rv(),           # Promise for step1 result
            step2_with_input.rv() # Promise for step2 result
        )
        step2_with_input.addFollowOn(step3_with_inputs)
        
        return step3_with_inputs.rv()  # Return final result promise

# Function using multiple promise results
@Job.wrapJobFn
def combine_results(job, result1_promise: Promise, result2_promise: Promise, result3_promise: Promise):
    """Function receives resolved promise values as arguments."""
    # Promises are automatically resolved to actual values
    combined = f"{result1_promise} + {result2_promise} + {result3_promise}"
    job.fileStore.logToMaster(f"Combined results: {combined}")
    return combined

Workflow Configuration

{ .api }

The Config class provides comprehensive workflow configuration options.

from toil.common import Config
from toil.lib.conversions import human2bytes

# Create and configure workflow
config = Config()

# Job store configuration  
config.jobStore = "file:/tmp/my-job-store"  # Local file store
# config.jobStore = "aws:us-west-2:my-toil-bucket"  # AWS S3 store

# Batch system configuration
config.batchSystem = "local"     # Local execution
# config.batchSystem = "slurm"   # Slurm cluster
# config.batchSystem = "kubernetes"  # Kubernetes cluster

# Resource defaults
config.defaultMemory = human2bytes("2G")    # Default job memory
config.defaultCores = 1                     # Default CPU cores
config.defaultDisk = human2bytes("1G")      # Default disk space

# Resource limits  
config.maxCores = 32                        # Maximum cores per job
config.maxMemory = human2bytes("64G")       # Maximum memory per job
config.maxDisk = human2bytes("1T")          # Maximum disk per job

# Error handling
config.retryCount = 3                       # Job retry attempts
config.rescueJobsFrequency = 60            # Rescue job check interval

# Logging and monitoring
config.logLevel = "INFO"                   # Log verbosity
config.stats = True                        # Enable statistics collection

# Cleanup configuration
config.clean = "onSuccess"                 # Clean on workflow success
# config.clean = "always"                  # Always clean
# config.clean = "never"                   # Never clean

# Working directory
config.workDir = "/tmp/toil-work"          # Temporary file location

# Preemptible job configuration
config.preemptibleWorkerTimeout = 1800    # Preemptible timeout (seconds)
config.defaultPreemptible = True          # Jobs preemptible by default

Workflow Execution Context

{ .api }

The Toil context manager handles workflow lifecycle and provides execution environment.

from toil.common import Toil, Config
from toil.exceptions import FailedJobsException

def run_workflow():
    config = Config()
    config.jobStore = "file:/tmp/workflow-store"
    config.batchSystem = "local"
    
    try:
        with Toil(config) as toil:
            # Create root job
            root_job = MainWorkflowJob("input_parameters")
            
            # Start fresh workflow
            if not toil.config.restart:
                result = toil.start(root_job)
                print(f"Workflow completed: {result}")
            else:
                # Restart failed workflow
                result = toil.restart()
                print(f"Workflow restarted: {result}")
                
            return result
            
    except FailedJobsException as e:
        print(f"Workflow failed with {e.numberOfFailedJobs} failed jobs")
        print(f"Job store: {e.jobStoreLocator}")
        return None

# Alternative: manual context management
def manual_workflow_execution():
    config = Config()
    config.jobStore = "file:/tmp/manual-store"
    
    toil = Toil(config)
    try:
        # Initialize workflow
        toil.__enter__()
        
        # Execute workflow
        root_job = SimpleJob("data")
        result = toil.start(root_job)
        
        return result
    finally:
        # Cleanup
        toil.__exit__(None, None, None)

Advanced Job Patterns

{ .api }

Sophisticated job patterns for complex workflow requirements.

from toil.job import Job, PromisedRequirement

class DynamicResourceJob(Job):
    """Job with resource requirements determined at runtime."""
    
    def __init__(self, size_calculator_promise):
        # Use promise to determine resources dynamically
        dynamic_memory = PromisedRequirement(
            lambda size: size * 1024 * 1024,  # Convert MB to bytes
            size_calculator_promise
        )
        
        super().__init__(
            memory=dynamic_memory,
            cores=1,
            disk="1G"
        )
        self.size_promise = size_calculator_promise
        
    def run(self, fileStore):
        # Access resolved size value
        actual_size = self.size_promise  # Automatically resolved
        fileStore.logToMaster(f"Processing size: {actual_size}MB")
        return f"Processed {actual_size}MB of data"

class ServiceJob(Job):
    """Long-running service job."""
    
    def __init__(self):
        super().__init__(memory="512M", cores=1, disk="100M")
        
    def run(self, fileStore):
        # Start service process
        service_process = start_monitoring_service()
        
        # Service runs until workflow completes
        try:
            while True:
                time.sleep(10)
                if should_stop_service():
                    break
        finally:
            service_process.stop()
            
        return "Service completed"

# Checkpointing for fault tolerance
class CheckpointedJob(Job):
    def __init__(self, data):
        super().__init__(
            memory="1G", 
            cores=1, 
            disk="1G",
            checkpoint=True  # Enable checkpointing
        )
        self.data = data
        
    def run(self, fileStore):
        # Job can be restarted from checkpoint on failure
        checkpoint_file = fileStore.getLocalTempFile()
        
        # Save intermediate state
        with open(checkpoint_file, 'w') as f:
            json.dump(self.data, f)
            
        # Long-running computation
        result = expensive_computation(self.data)
        
        return result

Error Handling and Debugging

{ .api }

Comprehensive error handling and debugging capabilities for workflow development.

from toil.job import Job, JobException
from toil.exceptions import FailedJobsException

class RobustJob(Job):
    """Job with comprehensive error handling."""
    
    def run(self, fileStore):
        try:
            # Potentially failing operation
            result = risky_operation()
            
        except ValueError as e:
            # Log error and continue
            fileStore.logToMaster(f"Handled ValueError: {e}", level=logging.WARNING)
            result = default_result()
            
        except Exception as e:
            # Fatal error - job will be retried
            fileStore.logToMaster(f"Job failed: {e}", level=logging.ERROR)
            raise JobException(f"Unrecoverable error: {e}")
            
        return result

# Workflow-level error handling
def resilient_workflow():
    config = Config()
    config.retryCount = 5  # Retry failed jobs 5 times
    
    try:
        with Toil(config) as toil:
            root_job = MainJob("input")
            result = toil.start(root_job)
            
    except FailedJobsException as e:
        # Handle workflow failure
        print(f"Workflow failed: {e.numberOfFailedJobs} jobs failed")
        
        # Optionally restart from failure point
        config.restart = True
        with Toil(config) as toil:
            result = toil.restart()
            
    return result

This core workflow management system provides the foundation for building scalable, fault-tolerant computational pipelines with sophisticated resource management and flexible job scheduling patterns.

Install with Tessl CLI

npx tessl i tessl/pypi-toil

docs

batch-systems.md

core-workflow.md

file-management.md

index.md

job-stores.md

provisioning.md

utilities.md

workflow-languages.md

tile.json