Pipeline management software for clusters.
Overall
score
67%
Toil provides native support for CWL (Common Workflow Language) and WDL (Workflow Description Language), enabling seamless execution of standardized workflow specifications. The integration translates workflow descriptions into Toil's internal job graph while preserving the semantic meaning and execution requirements of the original specifications. This allows users to leverage existing workflows and tools while benefiting from Toil's distributed execution capabilities, cloud provisioning, and fault tolerance.
{ .api }
Comprehensive support for CWL v1.0+ workflows with full tool and workflow execution capabilities.
from toil.cwl import cwltoil
from toil.common import Config, Toil
import os
# CWL workflow execution using command line
def run_cwl_workflow_cli():
"""Execute CWL workflow using command-line interface."""
# Basic CWL execution
# toil-cwl-runner workflow.cwl inputs.yml
# With Toil-specific options
cmd = [
"toil-cwl-runner",
"--jobStore", "file:cwl-jobstore",
"--batchSystem", "local",
"--maxCores", "4",
"--maxMemory", "8G",
"--logLevel", "INFO",
"workflow.cwl",
"inputs.yml"
]
# Advanced options
advanced_cmd = [
"toil-cwl-runner",
"--jobStore", "aws:us-west-2:my-bucket:cwl-run",
"--batchSystem", "kubernetes",
"--provisioner", "aws",
"--nodeTypes", "m5.large,m5.xlarge",
"--maxNodes", "10",
"--defaultPreemptible",
"--retryCount", "3",
"--cleanWorkDir", "onSuccess",
"--outdir", "/results",
"--tmp-outdir-prefix", "/tmp/cwl-",
"complex-workflow.cwl",
"production-inputs.json"
]
# Programmatic CWL execution
def run_cwl_workflow_programmatic():
"""Execute CWL workflow programmatically."""
# Load CWL workflow and inputs
cwl_file = "analysis-workflow.cwl"
inputs_file = "sample-inputs.yml"
# Configure Toil for CWL execution
config = Config()
config.jobStore = "file:cwl-analysis-store"
config.batchSystem = "local"
config.maxCores = 8
config.maxMemory = "16G"
config.retryCount = 2
# CWL-specific configuration
config.cwl = True
config.cwlVersion = "v1.2"
config.cwlTmpOutDir = "/tmp/cwl-tmp"
config.cwlCachingDir = "/cache/cwl"
# Execute workflow
with Toil(config) as toil:
result = cwltoil.main([
cwl_file,
inputs_file,
"--jobStore", config.jobStore,
"--batchSystem", config.batchSystem
])
print(f"CWL workflow completed with result: {result}")
return result
def advanced_cwl_features():
"""Demonstrate advanced CWL features and configuration."""
# Docker container support
cwl_with_docker = """
cwlVersion: v1.2
class: CommandLineTool
requirements:
DockerRequirement:
dockerPull: ubuntu:20.04
ResourceRequirement:
coresMin: 2
ramMin: 4096
tmpdirMin: 1024
outdirMin: 1024
inputs:
input_file:
type: File
inputBinding:
position: 1
outputs:
output_file:
type: File
outputBinding:
glob: "output.txt"
baseCommand: ["bash", "-c"]
arguments:
- "cat $(inputs.input_file.path) | wc -l > output.txt"
"""
# Execute with Docker support
config = Config()
config.disableChaining = True # Required for some CWL features
config.enableCWLDockerSupport = True
config.dockerAppliance = "ubuntu:20.04"
# Singularity container support (alternative to Docker)
config.cwlUseSingularity = True
config.singularityArgs = ["--cleanenv", "--containall"]
# CWL caching for faster reruns
config.cwlCaching = True
config.cwlCachingDir = "/shared/cwl-cache"
# Custom resource requirements mapping
def map_cwl_resources(cwl_requirements):
"""Map CWL resource requirements to Toil resources."""
toil_resources = {
'memory': cwl_requirements.get('ramMin', 1024) * 1024 * 1024, # Convert MB to bytes
'cores': cwl_requirements.get('coresMin', 1),
'disk': (cwl_requirements.get('tmpdirMin', 1024) +
cwl_requirements.get('outdirMin', 1024)) * 1024 * 1024
}
return toil_resources{ .api }
Native WDL workflow execution with support for WDL 1.0+ specifications.
from toil.wdl import wdltoil
from toil.common import Config, Toil
# WDL workflow execution using command line
def run_wdl_workflow_cli():
"""Execute WDL workflow using command-line interface."""
# Basic WDL execution
# toil-wdl-runner workflow.wdl inputs.json
# With comprehensive options
cmd = [
"toil-wdl-runner",
"--jobStore", "file:wdl-jobstore",
"--batchSystem", "slurm",
"--maxCores", "16",
"--maxMemory", "64G",
"--defaultDisk", "10G",
"--retryCount", "3",
"--logLevel", "DEBUG",
"pipeline.wdl",
"pipeline_inputs.json"
]
# Cloud execution with auto-scaling
cloud_cmd = [
"toil-wdl-runner",
"--jobStore", "aws:us-east-1:wdl-bucket:run-001",
"--batchSystem", "mesos",
"--provisioner", "aws",
"--nodeTypes", "c5.large:0.50,c5.xlarge:0.75,c5.2xlarge",
"--maxNodes", "50",
"--defaultPreemptible",
"--preemptibleCompensation", "1.5",
"large-scale-analysis.wdl",
"production-inputs.json"
]
# Programmatic WDL execution
def run_wdl_workflow_programmatic():
"""Execute WDL workflow programmatically."""
wdl_file = "genomics-pipeline.wdl"
inputs_file = "sample-cohort-inputs.json"
# Configure for WDL execution
config = Config()
config.jobStore = "file:genomics-run"
config.batchSystem = "kubernetes"
config.maxCores = 32
config.maxMemory = "128G"
config.defaultDisk = "100G"
# WDL-specific configuration
config.wdl = True
config.wdlVersion = "1.0"
config.wdlCallCaching = True
config.wdlCacheDir = "/cache/wdl-calls"
# Execute WDL workflow
with Toil(config) as toil:
result = wdltoil.main([
wdl_file,
inputs_file,
"--jobStore", config.jobStore,
"--batchSystem", config.batchSystem
])
return result
def advanced_wdl_features():
"""Advanced WDL workflow features and configuration."""
# WDL workflow with complex features
wdl_workflow = """
version 1.0
workflow GenomicsAnalysis {
input {
File reference_genome
Array[File] sample_fastqs
String output_prefix
Int? cpu_count = 4
String? memory_gb = "8G"
}
# Scatter processing over samples
scatter (fastq in sample_fastqs) {
call AlignReads {
input:
reference = reference_genome,
fastq_file = fastq,
cpu = cpu_count,
memory = memory_gb
}
}
# Merge results
call MergeAlignments {
input:
alignments = AlignReads.output_bam,
output_name = output_prefix
}
output {
File merged_bam = MergeAlignments.merged_bam
Array[File] individual_bams = AlignReads.output_bam
}
}
task AlignReads {
input {
File reference
File fastq_file
Int cpu = 2
String memory = "4G"
}
command <<<
bwa mem -t ${cpu} ${reference} ${fastq_file} | samtools sort -o output.bam
>>>
runtime {
docker: "biocontainers/bwa:v0.7.17_cv1"
cpu: cpu
memory: memory
disk: "20 GB"
}
output {
File output_bam = "output.bam"
}
}
"""
# Configure advanced WDL features
config = Config()
# Call caching for expensive operations
config.wdlCallCaching = True
config.wdlCallCachingBackend = "file" # or "database"
# Docker/container support
config.enableWDLDockerSupport = True
config.dockerAppliance = "ubuntu:20.04"
# Resource optimization
config.wdlOptimizeResources = True
config.wdlResourceProfile = "high-throughput"
# Localization strategies
config.wdlLocalizationStrategy = "copy" # or "symlink", "hardlink"
config.wdlTmpDir = "/fast-tmp"
return config{ .api }
Tools for converting between workflow languages and translating to Toil's internal representation.
from toil.cwl.cwltoil import CWLWorkflow
from toil.wdl.wdltoil import WDLWorkflow
def workflow_introspection():
"""Inspect and analyze workflow specifications."""
# Load CWL workflow
cwl_workflow = CWLWorkflow.load("analysis.cwl")
# Inspect CWL structure
print("CWL Workflow Analysis:")
print(f"Version: {cwl_workflow.version}")
print(f"Class: {cwl_workflow.class_}")
print(f"Tools: {len(cwl_workflow.steps)}")
# Analyze resource requirements
total_cpu = 0
total_memory = 0
for step in cwl_workflow.steps:
if hasattr(step, 'requirements'):
for req in step.requirements:
if req.class_ == 'ResourceRequirement':
total_cpu += req.get('coresMin', 1)
total_memory += req.get('ramMin', 1024)
print(f"Total CPU cores needed: {total_cpu}")
print(f"Total memory needed: {total_memory} MB")
# Load WDL workflow
wdl_workflow = WDLWorkflow.load("pipeline.wdl")
# Inspect WDL structure
print("\nWDL Workflow Analysis:")
print(f"Version: {wdl_workflow.version}")
print(f"Workflow name: {wdl_workflow.name}")
print(f"Tasks: {len(wdl_workflow.tasks)}")
# Analyze WDL tasks
for task in wdl_workflow.tasks:
runtime = task.runtime
print(f"Task {task.name}:")
print(f" CPU: {runtime.get('cpu', 'default')}")
print(f" Memory: {runtime.get('memory', 'default')}")
print(f" Disk: {runtime.get('disk', 'default')}")
if 'docker' in runtime:
print(f" Docker: {runtime['docker']}")
def workflow_validation_and_linting():
"""Validate workflow specifications for common issues."""
def validate_cwl_workflow(cwl_file: str):
"""Validate CWL workflow specification."""
errors = []
warnings = []
try:
workflow = CWLWorkflow.load(cwl_file)
# Check for common issues
if not hasattr(workflow, 'requirements'):
warnings.append("No workflow-level requirements specified")
for step in workflow.steps:
# Check resource requirements
if not hasattr(step, 'requirements'):
warnings.append(f"Step {step.id} has no resource requirements")
# Check input/output connections
for inp in step.in_:
if not inp.source:
errors.append(f"Step {step.id} input {inp.id} has no source")
# Validate tool references
if not step.run:
errors.append(f"Step {step.id} has no tool reference")
except Exception as e:
errors.append(f"Failed to parse CWL: {str(e)}")
return {'errors': errors, 'warnings': warnings}
def validate_wdl_workflow(wdl_file: str):
"""Validate WDL workflow specification."""
errors = []
warnings = []
try:
workflow = WDLWorkflow.load(wdl_file)
# Check WDL syntax and structure
if not workflow.version:
errors.append("WDL version not specified")
# Validate task definitions
for task in workflow.tasks:
if not task.command:
errors.append(f"Task {task.name} has no command")
# Check runtime requirements
if not hasattr(task, 'runtime'):
warnings.append(f"Task {task.name} has no runtime requirements")
# Validate output specifications
if not task.outputs:
warnings.append(f"Task {task.name} has no outputs")
# Check workflow calls and connections
for call in workflow.calls:
if call.task_name not in [t.name for t in workflow.tasks]:
errors.append(f"Call references undefined task: {call.task_name}")
except Exception as e:
errors.append(f"Failed to parse WDL: {str(e)}")
return {'errors': errors, 'warnings': warnings}
# Example usage
cwl_result = validate_cwl_workflow("workflow.cwl")
wdl_result = validate_wdl_workflow("pipeline.wdl")
return {'cwl': cwl_result, 'wdl': wdl_result}{ .api }
Monitoring and debugging capabilities for workflow language execution.
from toil.cwl.utils import CWLLogger
from toil.wdl.utils import WDLLogger
import logging
def setup_workflow_monitoring():
"""Setup comprehensive monitoring for workflow execution."""
# Configure CWL-specific logging
cwl_logger = CWLLogger()
cwl_logger.setLevel(logging.DEBUG)
# Add handlers for different log types
file_handler = logging.FileHandler('cwl_execution.log')
file_handler.setFormatter(
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
)
cwl_logger.addHandler(file_handler)
# Configure WDL-specific logging
wdl_logger = WDLLogger()
wdl_logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setFormatter(
logging.Formatter('WDL: %(levelname)s - %(message)s')
)
wdl_logger.addHandler(console_handler)
return cwl_logger, wdl_logger
def workflow_progress_tracking():
"""Track workflow execution progress and performance."""
class WorkflowTracker:
def __init__(self):
self.start_time = None
self.step_times = {}
self.step_status = {}
self.resource_usage = {}
def start_workflow(self):
"""Mark workflow start."""
import time
self.start_time = time.time()
print("Workflow execution started")
def step_started(self, step_id: str):
"""Mark step start."""
import time
self.step_times[step_id] = {'start': time.time()}
self.step_status[step_id] = 'running'
print(f"Step started: {step_id}")
def step_completed(self, step_id: str, resources_used: dict = None):
"""Mark step completion."""
import time
end_time = time.time()
if step_id in self.step_times:
self.step_times[step_id]['end'] = end_time
duration = end_time - self.step_times[step_id]['start']
self.step_times[step_id]['duration'] = duration
print(f"Step completed: {step_id} ({duration:.2f}s)")
self.step_status[step_id] = 'completed'
if resources_used:
self.resource_usage[step_id] = resources_used
def workflow_completed(self):
"""Mark workflow completion and report summary."""
import time
if self.start_time:
total_time = time.time() - self.start_time
print(f"Workflow completed in {total_time:.2f}s")
# Report step durations
print("\nStep execution times:")
for step_id, times in self.step_times.items():
if 'duration' in times:
print(f" {step_id}: {times['duration']:.2f}s")
# Report resource usage
if self.resource_usage:
print("\nResource usage summary:")
total_cpu_hours = 0
total_memory_gb_hours = 0
for step_id, resources in self.resource_usage.items():
duration_hours = self.step_times[step_id].get('duration', 0) / 3600
cpu_hours = resources.get('cpu', 0) * duration_hours
memory_gb_hours = resources.get('memory_gb', 0) * duration_hours
total_cpu_hours += cpu_hours
total_memory_gb_hours += memory_gb_hours
print(f" {step_id}: {cpu_hours:.2f} CPU-hours, {memory_gb_hours:.2f} GB-hours")
print(f"\nTotal: {total_cpu_hours:.2f} CPU-hours, {total_memory_gb_hours:.2f} GB-hours")
return WorkflowTracker()
def debug_workflow_execution():
"""Debug workflow execution issues."""
def debug_cwl_step_failure(step_id: str, exit_code: int, stderr_log: str):
"""Debug CWL step failure."""
print(f"CWL Step Failed: {step_id}")
print(f"Exit code: {exit_code}")
# Analyze common failure patterns
if exit_code == 127:
print("Issue: Command not found")
print("Check: Docker image contains required tools")
elif exit_code == 137:
print("Issue: Process killed (likely OOM)")
print("Check: Increase memory requirements")
elif exit_code == 139:
print("Issue: Segmentation fault")
print("Check: Input data format or tool version")
# Parse stderr for specific errors
if "No space left on device" in stderr_log:
print("Issue: Insufficient disk space")
print("Check: Increase disk requirements or clean temp files")
elif "Permission denied" in stderr_log:
print("Issue: File permission problems")
print("Check: File ownership and Docker volume mounts")
# Suggest debugging steps
print("\nDebugging suggestions:")
print("1. Check input file formats and sizes")
print("2. Verify resource requirements (CPU, memory, disk)")
print("3. Test command manually with sample data")
print("4. Check Docker image and tool versions")
def debug_wdl_task_failure(task_name: str, error_msg: str):
"""Debug WDL task failure."""
print(f"WDL Task Failed: {task_name}")
print(f"Error: {error_msg}")
# Common WDL error patterns
if "localization" in error_msg.lower():
print("Issue: File localization failure")
print("Check: Input file paths and access permissions")
elif "runtime" in error_msg.lower():
print("Issue: Runtime requirement problem")
print("Check: Resource specifications in task runtime block")
elif "output" in error_msg.lower():
print("Issue: Output file collection failure")
print("Check: Output glob patterns and file generation")
print("\nDebugging steps:")
print("1. Verify all input files exist and are accessible")
print("2. Check runtime resource specifications")
print("3. Validate output glob patterns")
print("4. Test task command independently")
return debug_cwl_step_failure, debug_wdl_task_failure{ .api }
Tools and utilities for ensuring workflow compatibility across different execution environments.
def ensure_workflow_portability():
"""Ensure workflows are portable across different environments."""
def normalize_cwl_for_portability(cwl_workflow):
"""Modify CWL workflow for cross-platform compatibility."""
# Use standard Docker images
standard_images = {
'ubuntu': 'ubuntu:20.04',
'python': 'python:3.9-slim',
'r-base': 'r-base:4.1.0',
'bioconductor': 'bioconductor/bioconductor_docker:RELEASE_3_13'
}
# Replace custom images with standard ones where possible
for step in cwl_workflow.steps:
if hasattr(step.run, 'requirements'):
for req in step.run.requirements:
if req.class_ == 'DockerRequirement':
image = req.dockerPull
for key, standard_image in standard_images.items():
if key in image.lower():
req.dockerPull = standard_image
break
# Add software requirements as hints
for step in cwl_workflow.steps:
if not hasattr(step.run, 'hints'):
step.run.hints = []
# Add software requirements hint
software_hint = {
'class': 'SoftwareRequirement',
'packages': [
{'package': 'bash', 'version': ['>=4.0']},
{'package': 'coreutils', 'version': ['>=8.0']}
]
}
step.run.hints.append(software_hint)
return cwl_workflow
def validate_wdl_portability(wdl_workflow):
"""Validate WDL workflow for portability issues."""
portability_issues = []
for task in wdl_workflow.tasks:
# Check for hardcoded paths
command = task.command
if '/usr/local' in command or '/opt/' in command:
portability_issues.append(
f"Task {task.name}: Hardcoded paths in command"
)
# Check Docker image availability
runtime = task.runtime
if 'docker' in runtime:
docker_image = runtime['docker']
if 'localhost' in docker_image or 'private-registry' in docker_image:
portability_issues.append(
f"Task {task.name}: Uses private Docker registry"
)
# Check for platform-specific commands
platform_commands = ['sudo', 'yum', 'apt-get', 'brew']
for cmd in platform_commands:
if cmd in command:
portability_issues.append(
f"Task {task.name}: Uses platform-specific command '{cmd}'"
)
return portability_issues
def create_portable_workflow_template():
"""Create template for portable workflow development."""
cwl_template = """
cwlVersion: v1.2
class: Workflow
requirements:
- class: ScatterFeatureRequirement
- class: MultipleInputFeatureRequirement
- class: StepInputExpressionRequirement
hints:
- class: ResourceRequirement
coresMin: 1
ramMin: 1024
tmpdirMin: 1024
outdirMin: 1024
- class: DockerRequirement
dockerPull: ubuntu:20.04
inputs:
input_files:
type: File[]
doc: "Array of input files to process"
outputs:
processed_files:
type: File[]
outputSource: process_step/output_files
steps:
process_step:
run: process_tool.cwl
in:
inputs: input_files
out: [output_files]
scatter: inputs
"""
wdl_template = """
version 1.0
workflow PortableWorkflow {
input {
Array[File] input_files
String output_prefix = "processed"
}
scatter (input_file in input_files) {
call ProcessFile {
input:
input_file = input_file,
prefix = output_prefix
}
}
output {
Array[File] processed_files = ProcessFile.output_file
}
}
task ProcessFile {
input {
File input_file
String prefix
Int cpu = 1
String memory = "2 GB"
String disk = "10 GB"
}
command <<<
# Use standard POSIX commands only
basename_file=$(basename ~{input_file})
cp ~{input_file} ~{prefix}_${basename_file}
>>>
runtime {
docker: "ubuntu:20.04"
cpu: cpu
memory: memory
disks: "local-disk " + disk + " SSD"
}
output {
File output_file = "~{prefix}_*"
}
}
"""
return {'cwl': cwl_template, 'wdl': wdl_template}This workflow language integration provides comprehensive support for executing standardized workflows with full compatibility across different computing environments while leveraging Toil's advanced execution and scaling capabilities.
Install with Tessl CLI
npx tessl i tessl/pypi-toildocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10