Pipeline management software for clusters.
Overall
score
67%
Toil's core workflow management provides the fundamental building blocks for creating, scheduling, and executing computational pipelines. The system centers around three key concepts: Jobs (units of work), Promises (result handling), and the Workflow Context (execution environment). This enables creation of complex DAG (Directed Acyclic Graph) workflows with sophisticated resource management and error handling.
{ .api }
The Job class is the fundamental unit of work in Toil workflows. Jobs can be defined as classes or using function decorators.
from toil.job import Job
from toil.fileStores import AbstractFileStore
# Class-based job definition
class ProcessingJob(Job):
def __init__(self, input_data, memory=None, cores=None, disk=None,
accelerators=None, preemptible=True, checkpoint=False,
displayName=None):
# Resource requirements: memory/disk in bytes, cores as int/float
super().__init__(
memory=memory or 1024*1024*1024, # 1GB default
cores=cores or 1, # 1 core default
disk=disk or 1024*1024*1024, # 1GB default
accelerators=accelerators or [], # No accelerators default
preemptible=preemptible, # Allow preemption
checkpoint=checkpoint, # Checkpointing disabled
displayName=displayName # Job display name
)
self.input_data = input_data
def run(self, fileStore: AbstractFileStore) -> str:
"""Execute job logic. Must return serializable result."""
fileStore.logToMaster(f"Processing: {self.input_data}")
# Perform actual work
result = self.input_data.upper()
return result
# Function-based job definition
@Job.wrapJobFn
def simple_task(job, data, multiplier=2):
"""Function automatically wrapped as job with default resources."""
job.fileStore.logToMaster(f"Task processing: {data}")
return data * multiplier
# Custom resource function job
@Job.wrapJobFn(memory="2G", cores=2, disk="500M")
def resource_intensive_task(job, large_dataset):
"""Function with explicit resource requirements."""
return process_large_data(large_dataset){ .api }
Toil supports detailed resource specifications including GPU/accelerator requirements.
from toil.job import Job, AcceleratorRequirement, parse_accelerator
# GPU-enabled job
class MLTrainingJob(Job):
def __init__(self, model_config):
# Define GPU requirements
gpu_requirement: AcceleratorRequirement = {
"count": 2, # Number of GPUs
"kind": "gpu", # Accelerator type
"model": "Tesla V100", # Specific GPU model (optional)
"brand": "nvidia", # GPU brand (optional)
"api": "cuda" # API interface (optional)
}
super().__init__(
memory=16*1024*1024*1024, # 16GB RAM
cores=8, # 8 CPU cores
disk=100*1024*1024*1024, # 100GB disk
accelerators=[gpu_requirement],
preemptible=False # Don't preempt GPU jobs
)
self.model_config = model_config
def run(self, fileStore):
# Access GPU resources for training
return train_model(self.model_config)
# Parse accelerator from string specification
gpu_spec = parse_accelerator("2:nvidia:tesla_v100:cuda")
# Returns: {"count": 2, "kind": "gpu", "brand": "nvidia", "model": "tesla_v100", "api": "cuda"}{ .api }
Toil provides flexible job scheduling patterns including sequential, parallel, and conditional execution.
from toil.job import Job
class WorkflowController(Job):
def run(self, fileStore):
# Parallel execution - children run concurrently
child1 = DataPreprocessingJob("dataset1")
child2 = DataPreprocessingJob("dataset2")
child3 = DataPreprocessingJob("dataset3")
self.addChild(child1)
self.addChild(child2)
self.addChild(child3)
# Sequential execution - follow-on runs after children complete
merge_job = MergeDataJob()
self.addFollowOn(merge_job)
# Analysis runs after merge completes
analysis_job = AnalysisJob()
merge_job.addFollowOn(analysis_job)
# Service job - runs alongside other jobs
monitoring_service = MonitoringService()
self.addService(monitoring_service)
return "Workflow initiated"
# Conditional job execution based on results
@Job.wrapJobFn
def conditional_processor(job, input_file):
# Check input characteristics
if needs_preprocessing(input_file):
preprocess_job = PreprocessingJob(input_file)
job.addChild(preprocess_job)
return preprocess_job.rv() # Return preprocessed result
else:
return input_file # Return original file{ .api }
Promises enable jobs to reference results from other jobs before they complete execution.
from toil.job import Job, Promise
class PipelineJob(Job):
def run(self, fileStore):
# Create processing jobs
step1 = ProcessStep1Job("input_data")
step2 = ProcessStep2Job()
step3 = ProcessStep3Job()
# Chain jobs using promises
self.addChild(step1)
# step2 will receive result of step1 when it completes
step2_with_input = Job.wrapJobFn(step2.run, step1.rv())
step1.addFollowOn(step2_with_input)
# step3 receives results from both step1 and step2
step3_with_inputs = Job.wrapJobFn(
step3.run,
step1.rv(), # Promise for step1 result
step2_with_input.rv() # Promise for step2 result
)
step2_with_input.addFollowOn(step3_with_inputs)
return step3_with_inputs.rv() # Return final result promise
# Function using multiple promise results
@Job.wrapJobFn
def combine_results(job, result1_promise: Promise, result2_promise: Promise, result3_promise: Promise):
"""Function receives resolved promise values as arguments."""
# Promises are automatically resolved to actual values
combined = f"{result1_promise} + {result2_promise} + {result3_promise}"
job.fileStore.logToMaster(f"Combined results: {combined}")
return combined{ .api }
The Config class provides comprehensive workflow configuration options.
from toil.common import Config
from toil.lib.conversions import human2bytes
# Create and configure workflow
config = Config()
# Job store configuration
config.jobStore = "file:/tmp/my-job-store" # Local file store
# config.jobStore = "aws:us-west-2:my-toil-bucket" # AWS S3 store
# Batch system configuration
config.batchSystem = "local" # Local execution
# config.batchSystem = "slurm" # Slurm cluster
# config.batchSystem = "kubernetes" # Kubernetes cluster
# Resource defaults
config.defaultMemory = human2bytes("2G") # Default job memory
config.defaultCores = 1 # Default CPU cores
config.defaultDisk = human2bytes("1G") # Default disk space
# Resource limits
config.maxCores = 32 # Maximum cores per job
config.maxMemory = human2bytes("64G") # Maximum memory per job
config.maxDisk = human2bytes("1T") # Maximum disk per job
# Error handling
config.retryCount = 3 # Job retry attempts
config.rescueJobsFrequency = 60 # Rescue job check interval
# Logging and monitoring
config.logLevel = "INFO" # Log verbosity
config.stats = True # Enable statistics collection
# Cleanup configuration
config.clean = "onSuccess" # Clean on workflow success
# config.clean = "always" # Always clean
# config.clean = "never" # Never clean
# Working directory
config.workDir = "/tmp/toil-work" # Temporary file location
# Preemptible job configuration
config.preemptibleWorkerTimeout = 1800 # Preemptible timeout (seconds)
config.defaultPreemptible = True # Jobs preemptible by default{ .api }
The Toil context manager handles workflow lifecycle and provides execution environment.
from toil.common import Toil, Config
from toil.exceptions import FailedJobsException
def run_workflow():
config = Config()
config.jobStore = "file:/tmp/workflow-store"
config.batchSystem = "local"
try:
with Toil(config) as toil:
# Create root job
root_job = MainWorkflowJob("input_parameters")
# Start fresh workflow
if not toil.config.restart:
result = toil.start(root_job)
print(f"Workflow completed: {result}")
else:
# Restart failed workflow
result = toil.restart()
print(f"Workflow restarted: {result}")
return result
except FailedJobsException as e:
print(f"Workflow failed with {e.numberOfFailedJobs} failed jobs")
print(f"Job store: {e.jobStoreLocator}")
return None
# Alternative: manual context management
def manual_workflow_execution():
config = Config()
config.jobStore = "file:/tmp/manual-store"
toil = Toil(config)
try:
# Initialize workflow
toil.__enter__()
# Execute workflow
root_job = SimpleJob("data")
result = toil.start(root_job)
return result
finally:
# Cleanup
toil.__exit__(None, None, None){ .api }
Sophisticated job patterns for complex workflow requirements.
from toil.job import Job, PromisedRequirement
class DynamicResourceJob(Job):
"""Job with resource requirements determined at runtime."""
def __init__(self, size_calculator_promise):
# Use promise to determine resources dynamically
dynamic_memory = PromisedRequirement(
lambda size: size * 1024 * 1024, # Convert MB to bytes
size_calculator_promise
)
super().__init__(
memory=dynamic_memory,
cores=1,
disk="1G"
)
self.size_promise = size_calculator_promise
def run(self, fileStore):
# Access resolved size value
actual_size = self.size_promise # Automatically resolved
fileStore.logToMaster(f"Processing size: {actual_size}MB")
return f"Processed {actual_size}MB of data"
class ServiceJob(Job):
"""Long-running service job."""
def __init__(self):
super().__init__(memory="512M", cores=1, disk="100M")
def run(self, fileStore):
# Start service process
service_process = start_monitoring_service()
# Service runs until workflow completes
try:
while True:
time.sleep(10)
if should_stop_service():
break
finally:
service_process.stop()
return "Service completed"
# Checkpointing for fault tolerance
class CheckpointedJob(Job):
def __init__(self, data):
super().__init__(
memory="1G",
cores=1,
disk="1G",
checkpoint=True # Enable checkpointing
)
self.data = data
def run(self, fileStore):
# Job can be restarted from checkpoint on failure
checkpoint_file = fileStore.getLocalTempFile()
# Save intermediate state
with open(checkpoint_file, 'w') as f:
json.dump(self.data, f)
# Long-running computation
result = expensive_computation(self.data)
return result{ .api }
Comprehensive error handling and debugging capabilities for workflow development.
from toil.job import Job, JobException
from toil.exceptions import FailedJobsException
class RobustJob(Job):
"""Job with comprehensive error handling."""
def run(self, fileStore):
try:
# Potentially failing operation
result = risky_operation()
except ValueError as e:
# Log error and continue
fileStore.logToMaster(f"Handled ValueError: {e}", level=logging.WARNING)
result = default_result()
except Exception as e:
# Fatal error - job will be retried
fileStore.logToMaster(f"Job failed: {e}", level=logging.ERROR)
raise JobException(f"Unrecoverable error: {e}")
return result
# Workflow-level error handling
def resilient_workflow():
config = Config()
config.retryCount = 5 # Retry failed jobs 5 times
try:
with Toil(config) as toil:
root_job = MainJob("input")
result = toil.start(root_job)
except FailedJobsException as e:
# Handle workflow failure
print(f"Workflow failed: {e.numberOfFailedJobs} jobs failed")
# Optionally restart from failure point
config.restart = True
with Toil(config) as toil:
result = toil.restart()
return resultThis core workflow management system provides the foundation for building scalable, fault-tolerant computational pipelines with sophisticated resource management and flexible job scheduling patterns.
Install with Tessl CLI
npx tessl i tessl/pypi-toildocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10