CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-toil

Pipeline management software for clusters.

Overall
score

67%

Overview
Eval results
Files

workflow-languages.mddocs/

Workflow Language Integration

Overview

Toil provides native support for CWL (Common Workflow Language) and WDL (Workflow Description Language), enabling seamless execution of standardized workflow specifications. The integration translates workflow descriptions into Toil's internal job graph while preserving the semantic meaning and execution requirements of the original specifications. This allows users to leverage existing workflows and tools while benefiting from Toil's distributed execution capabilities, cloud provisioning, and fault tolerance.

Capabilities

CWL (Common Workflow Language) Support

{ .api }

Comprehensive support for CWL v1.0+ workflows with full tool and workflow execution capabilities.

from toil.cwl import cwltoil
from toil.common import Config, Toil
import os

# CWL workflow execution using command line
def run_cwl_workflow_cli():
    """Execute CWL workflow using command-line interface."""
    
    # Basic CWL execution
    # toil-cwl-runner workflow.cwl inputs.yml
    
    # With Toil-specific options
    cmd = [
        "toil-cwl-runner",
        "--jobStore", "file:cwl-jobstore",
        "--batchSystem", "local",
        "--maxCores", "4",
        "--maxMemory", "8G",
        "--logLevel", "INFO",
        "workflow.cwl",
        "inputs.yml"
    ]
    
    # Advanced options
    advanced_cmd = [
        "toil-cwl-runner",
        "--jobStore", "aws:us-west-2:my-bucket:cwl-run",
        "--batchSystem", "kubernetes", 
        "--provisioner", "aws",
        "--nodeTypes", "m5.large,m5.xlarge",
        "--maxNodes", "10",
        "--defaultPreemptible",
        "--retryCount", "3",
        "--cleanWorkDir", "onSuccess",
        "--outdir", "/results",
        "--tmp-outdir-prefix", "/tmp/cwl-",
        "complex-workflow.cwl", 
        "production-inputs.json"
    ]

# Programmatic CWL execution
def run_cwl_workflow_programmatic():
    """Execute CWL workflow programmatically."""
    
    # Load CWL workflow and inputs
    cwl_file = "analysis-workflow.cwl"
    inputs_file = "sample-inputs.yml"
    
    # Configure Toil for CWL execution
    config = Config()
    config.jobStore = "file:cwl-analysis-store"
    config.batchSystem = "local"
    config.maxCores = 8
    config.maxMemory = "16G"
    config.retryCount = 2
    
    # CWL-specific configuration
    config.cwl = True
    config.cwlVersion = "v1.2"
    config.cwlTmpOutDir = "/tmp/cwl-tmp"
    config.cwlCachingDir = "/cache/cwl"
    
    # Execute workflow
    with Toil(config) as toil:
        result = cwltoil.main([
            cwl_file,
            inputs_file,
            "--jobStore", config.jobStore,
            "--batchSystem", config.batchSystem
        ])
        
        print(f"CWL workflow completed with result: {result}")
        return result

def advanced_cwl_features():
    """Demonstrate advanced CWL features and configuration."""
    
    # Docker container support
    cwl_with_docker = """
    cwlVersion: v1.2
    class: CommandLineTool
    
    requirements:
      DockerRequirement:
        dockerPull: ubuntu:20.04
      ResourceRequirement:
        coresMin: 2
        ramMin: 4096
        tmpdirMin: 1024
        outdirMin: 1024
    
    inputs:
      input_file:
        type: File
        inputBinding:
          position: 1
    
    outputs:
      output_file:
        type: File
        outputBinding:
          glob: "output.txt"
    
    baseCommand: ["bash", "-c"]
    arguments:
      - "cat $(inputs.input_file.path) | wc -l > output.txt"
    """
    
    # Execute with Docker support
    config = Config()
    config.disableChaining = True  # Required for some CWL features
    config.enableCWLDockerSupport = True
    config.dockerAppliance = "ubuntu:20.04"
    
    # Singularity container support (alternative to Docker)
    config.cwlUseSingularity = True
    config.singularityArgs = ["--cleanenv", "--containall"]
    
    # CWL caching for faster reruns
    config.cwlCaching = True
    config.cwlCachingDir = "/shared/cwl-cache"
    
    # Custom resource requirements mapping
    def map_cwl_resources(cwl_requirements):
        """Map CWL resource requirements to Toil resources."""
        
        toil_resources = {
            'memory': cwl_requirements.get('ramMin', 1024) * 1024 * 1024,  # Convert MB to bytes
            'cores': cwl_requirements.get('coresMin', 1),
            'disk': (cwl_requirements.get('tmpdirMin', 1024) + 
                    cwl_requirements.get('outdirMin', 1024)) * 1024 * 1024
        }
        
        return toil_resources

WDL (Workflow Description Language) Support

{ .api }

Native WDL workflow execution with support for WDL 1.0+ specifications.

from toil.wdl import wdltoil
from toil.common import Config, Toil

# WDL workflow execution using command line
def run_wdl_workflow_cli():
    """Execute WDL workflow using command-line interface."""
    
    # Basic WDL execution
    # toil-wdl-runner workflow.wdl inputs.json
    
    # With comprehensive options
    cmd = [
        "toil-wdl-runner",
        "--jobStore", "file:wdl-jobstore",
        "--batchSystem", "slurm",
        "--maxCores", "16", 
        "--maxMemory", "64G",
        "--defaultDisk", "10G",
        "--retryCount", "3",
        "--logLevel", "DEBUG",
        "pipeline.wdl",
        "pipeline_inputs.json"
    ]
    
    # Cloud execution with auto-scaling
    cloud_cmd = [
        "toil-wdl-runner", 
        "--jobStore", "aws:us-east-1:wdl-bucket:run-001",
        "--batchSystem", "mesos",
        "--provisioner", "aws",
        "--nodeTypes", "c5.large:0.50,c5.xlarge:0.75,c5.2xlarge",
        "--maxNodes", "50",
        "--defaultPreemptible",
        "--preemptibleCompensation", "1.5",
        "large-scale-analysis.wdl",
        "production-inputs.json"
    ]

# Programmatic WDL execution
def run_wdl_workflow_programmatic():
    """Execute WDL workflow programmatically."""
    
    wdl_file = "genomics-pipeline.wdl"
    inputs_file = "sample-cohort-inputs.json"
    
    # Configure for WDL execution
    config = Config()
    config.jobStore = "file:genomics-run"
    config.batchSystem = "kubernetes"
    config.maxCores = 32
    config.maxMemory = "128G"
    config.defaultDisk = "100G"
    
    # WDL-specific configuration
    config.wdl = True
    config.wdlVersion = "1.0"
    config.wdlCallCaching = True
    config.wdlCacheDir = "/cache/wdl-calls"
    
    # Execute WDL workflow
    with Toil(config) as toil:
        result = wdltoil.main([
            wdl_file,
            inputs_file, 
            "--jobStore", config.jobStore,
            "--batchSystem", config.batchSystem
        ])
        
        return result

def advanced_wdl_features():
    """Advanced WDL workflow features and configuration."""
    
    # WDL workflow with complex features
    wdl_workflow = """
    version 1.0
    
    workflow GenomicsAnalysis {
        input {
            File reference_genome
            Array[File] sample_fastqs
            String output_prefix
            Int? cpu_count = 4
            String? memory_gb = "8G"
        }
        
        # Scatter processing over samples
        scatter (fastq in sample_fastqs) {
            call AlignReads {
                input:
                    reference = reference_genome,
                    fastq_file = fastq,
                    cpu = cpu_count,
                    memory = memory_gb
            }
        }
        
        # Merge results
        call MergeAlignments {
            input:
                alignments = AlignReads.output_bam,
                output_name = output_prefix
        }
        
        output {
            File merged_bam = MergeAlignments.merged_bam
            Array[File] individual_bams = AlignReads.output_bam
        }
    }
    
    task AlignReads {
        input {
            File reference
            File fastq_file  
            Int cpu = 2
            String memory = "4G"
        }
        
        command <<<
            bwa mem -t ${cpu} ${reference} ${fastq_file} | samtools sort -o output.bam
        >>>
        
        runtime {
            docker: "biocontainers/bwa:v0.7.17_cv1"
            cpu: cpu
            memory: memory
            disk: "20 GB"
        }
        
        output {
            File output_bam = "output.bam"
        }
    }
    """
    
    # Configure advanced WDL features
    config = Config()
    
    # Call caching for expensive operations
    config.wdlCallCaching = True
    config.wdlCallCachingBackend = "file"  # or "database"
    
    # Docker/container support
    config.enableWDLDockerSupport = True
    config.dockerAppliance = "ubuntu:20.04"
    
    # Resource optimization
    config.wdlOptimizeResources = True
    config.wdlResourceProfile = "high-throughput"
    
    # Localization strategies  
    config.wdlLocalizationStrategy = "copy"  # or "symlink", "hardlink"
    config.wdlTmpDir = "/fast-tmp"
    
    return config

Workflow Conversion and Translation

{ .api }

Tools for converting between workflow languages and translating to Toil's internal representation.

from toil.cwl.cwltoil import CWLWorkflow
from toil.wdl.wdltoil import WDLWorkflow

def workflow_introspection():
    """Inspect and analyze workflow specifications."""
    
    # Load CWL workflow
    cwl_workflow = CWLWorkflow.load("analysis.cwl")
    
    # Inspect CWL structure
    print("CWL Workflow Analysis:")
    print(f"Version: {cwl_workflow.version}")
    print(f"Class: {cwl_workflow.class_}")
    print(f"Tools: {len(cwl_workflow.steps)}")
    
    # Analyze resource requirements
    total_cpu = 0
    total_memory = 0
    
    for step in cwl_workflow.steps:
        if hasattr(step, 'requirements'):
            for req in step.requirements:
                if req.class_ == 'ResourceRequirement':
                    total_cpu += req.get('coresMin', 1)
                    total_memory += req.get('ramMin', 1024)
    
    print(f"Total CPU cores needed: {total_cpu}")
    print(f"Total memory needed: {total_memory} MB")
    
    # Load WDL workflow
    wdl_workflow = WDLWorkflow.load("pipeline.wdl")
    
    # Inspect WDL structure
    print("\nWDL Workflow Analysis:")
    print(f"Version: {wdl_workflow.version}")
    print(f"Workflow name: {wdl_workflow.name}")
    print(f"Tasks: {len(wdl_workflow.tasks)}")
    
    # Analyze WDL tasks
    for task in wdl_workflow.tasks:
        runtime = task.runtime
        print(f"Task {task.name}:")
        print(f"  CPU: {runtime.get('cpu', 'default')}")
        print(f"  Memory: {runtime.get('memory', 'default')}")
        print(f"  Disk: {runtime.get('disk', 'default')}")
        if 'docker' in runtime:
            print(f"  Docker: {runtime['docker']}")

def workflow_validation_and_linting():
    """Validate workflow specifications for common issues."""
    
    def validate_cwl_workflow(cwl_file: str):
        """Validate CWL workflow specification."""
        
        errors = []
        warnings = []
        
        try:
            workflow = CWLWorkflow.load(cwl_file)
            
            # Check for common issues
            if not hasattr(workflow, 'requirements'):
                warnings.append("No workflow-level requirements specified")
            
            for step in workflow.steps:
                # Check resource requirements
                if not hasattr(step, 'requirements'):
                    warnings.append(f"Step {step.id} has no resource requirements")
                
                # Check input/output connections
                for inp in step.in_:
                    if not inp.source:
                        errors.append(f"Step {step.id} input {inp.id} has no source")
                
                # Validate tool references
                if not step.run:
                    errors.append(f"Step {step.id} has no tool reference")
            
        except Exception as e:
            errors.append(f"Failed to parse CWL: {str(e)}")
        
        return {'errors': errors, 'warnings': warnings}
    
    def validate_wdl_workflow(wdl_file: str):
        """Validate WDL workflow specification."""
        
        errors = []
        warnings = []
        
        try:
            workflow = WDLWorkflow.load(wdl_file)
            
            # Check WDL syntax and structure
            if not workflow.version:
                errors.append("WDL version not specified")
            
            # Validate task definitions
            for task in workflow.tasks:
                if not task.command:
                    errors.append(f"Task {task.name} has no command")
                
                # Check runtime requirements
                if not hasattr(task, 'runtime'):
                    warnings.append(f"Task {task.name} has no runtime requirements")
                
                # Validate output specifications
                if not task.outputs:
                    warnings.append(f"Task {task.name} has no outputs")
            
            # Check workflow calls and connections
            for call in workflow.calls:
                if call.task_name not in [t.name for t in workflow.tasks]:
                    errors.append(f"Call references undefined task: {call.task_name}")
        
        except Exception as e:
            errors.append(f"Failed to parse WDL: {str(e)}")
        
        return {'errors': errors, 'warnings': warnings}
    
    # Example usage
    cwl_result = validate_cwl_workflow("workflow.cwl")
    wdl_result = validate_wdl_workflow("pipeline.wdl")
    
    return {'cwl': cwl_result, 'wdl': wdl_result}

Workflow Execution Monitoring

{ .api }

Monitoring and debugging capabilities for workflow language execution.

from toil.cwl.utils import CWLLogger
from toil.wdl.utils import WDLLogger
import logging

def setup_workflow_monitoring():
    """Setup comprehensive monitoring for workflow execution."""
    
    # Configure CWL-specific logging
    cwl_logger = CWLLogger()
    cwl_logger.setLevel(logging.DEBUG)
    
    # Add handlers for different log types
    file_handler = logging.FileHandler('cwl_execution.log')
    file_handler.setFormatter(
        logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    )
    cwl_logger.addHandler(file_handler)
    
    # Configure WDL-specific logging
    wdl_logger = WDLLogger()
    wdl_logger.setLevel(logging.INFO)
    
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(
        logging.Formatter('WDL: %(levelname)s - %(message)s')
    )
    wdl_logger.addHandler(console_handler)
    
    return cwl_logger, wdl_logger

def workflow_progress_tracking():
    """Track workflow execution progress and performance."""
    
    class WorkflowTracker:
        def __init__(self):
            self.start_time = None
            self.step_times = {}
            self.step_status = {}
            self.resource_usage = {}
        
        def start_workflow(self):
            """Mark workflow start."""
            import time
            self.start_time = time.time()
            print("Workflow execution started")
        
        def step_started(self, step_id: str):
            """Mark step start."""
            import time
            self.step_times[step_id] = {'start': time.time()}
            self.step_status[step_id] = 'running'
            print(f"Step started: {step_id}")
        
        def step_completed(self, step_id: str, resources_used: dict = None):
            """Mark step completion."""
            import time
            end_time = time.time()
            
            if step_id in self.step_times:
                self.step_times[step_id]['end'] = end_time
                duration = end_time - self.step_times[step_id]['start']
                self.step_times[step_id]['duration'] = duration
                
                print(f"Step completed: {step_id} ({duration:.2f}s)")
            
            self.step_status[step_id] = 'completed'
            
            if resources_used:
                self.resource_usage[step_id] = resources_used
        
        def workflow_completed(self):
            """Mark workflow completion and report summary."""
            import time
            
            if self.start_time:
                total_time = time.time() - self.start_time
                print(f"Workflow completed in {total_time:.2f}s")
                
                # Report step durations
                print("\nStep execution times:")
                for step_id, times in self.step_times.items():
                    if 'duration' in times:
                        print(f"  {step_id}: {times['duration']:.2f}s")
                
                # Report resource usage
                if self.resource_usage:
                    print("\nResource usage summary:")
                    total_cpu_hours = 0
                    total_memory_gb_hours = 0
                    
                    for step_id, resources in self.resource_usage.items():
                        duration_hours = self.step_times[step_id].get('duration', 0) / 3600
                        cpu_hours = resources.get('cpu', 0) * duration_hours
                        memory_gb_hours = resources.get('memory_gb', 0) * duration_hours
                        
                        total_cpu_hours += cpu_hours
                        total_memory_gb_hours += memory_gb_hours
                        
                        print(f"  {step_id}: {cpu_hours:.2f} CPU-hours, {memory_gb_hours:.2f} GB-hours")
                    
                    print(f"\nTotal: {total_cpu_hours:.2f} CPU-hours, {total_memory_gb_hours:.2f} GB-hours")
    
    return WorkflowTracker()

def debug_workflow_execution():
    """Debug workflow execution issues."""
    
    def debug_cwl_step_failure(step_id: str, exit_code: int, stderr_log: str):
        """Debug CWL step failure."""
        
        print(f"CWL Step Failed: {step_id}")
        print(f"Exit code: {exit_code}")
        
        # Analyze common failure patterns
        if exit_code == 127:
            print("Issue: Command not found")
            print("Check: Docker image contains required tools")
        elif exit_code == 137:
            print("Issue: Process killed (likely OOM)")
            print("Check: Increase memory requirements")
        elif exit_code == 139:
            print("Issue: Segmentation fault")
            print("Check: Input data format or tool version")
        
        # Parse stderr for specific errors
        if "No space left on device" in stderr_log:
            print("Issue: Insufficient disk space")
            print("Check: Increase disk requirements or clean temp files")
        elif "Permission denied" in stderr_log:
            print("Issue: File permission problems")
            print("Check: File ownership and Docker volume mounts")
        
        # Suggest debugging steps
        print("\nDebugging suggestions:")
        print("1. Check input file formats and sizes")
        print("2. Verify resource requirements (CPU, memory, disk)")
        print("3. Test command manually with sample data")
        print("4. Check Docker image and tool versions")
    
    def debug_wdl_task_failure(task_name: str, error_msg: str):
        """Debug WDL task failure."""
        
        print(f"WDL Task Failed: {task_name}")
        print(f"Error: {error_msg}")
        
        # Common WDL error patterns
        if "localization" in error_msg.lower():
            print("Issue: File localization failure")
            print("Check: Input file paths and access permissions")
        elif "runtime" in error_msg.lower():
            print("Issue: Runtime requirement problem")
            print("Check: Resource specifications in task runtime block")
        elif "output" in error_msg.lower():
            print("Issue: Output file collection failure")  
            print("Check: Output glob patterns and file generation")
        
        print("\nDebugging steps:")
        print("1. Verify all input files exist and are accessible")
        print("2. Check runtime resource specifications")
        print("3. Validate output glob patterns")
        print("4. Test task command independently")
    
    return debug_cwl_step_failure, debug_wdl_task_failure

Cross-Platform Workflow Compatibility

{ .api }

Tools and utilities for ensuring workflow compatibility across different execution environments.

def ensure_workflow_portability():
    """Ensure workflows are portable across different environments."""
    
    def normalize_cwl_for_portability(cwl_workflow):
        """Modify CWL workflow for cross-platform compatibility."""
        
        # Use standard Docker images
        standard_images = {
            'ubuntu': 'ubuntu:20.04',
            'python': 'python:3.9-slim',
            'r-base': 'r-base:4.1.0',
            'bioconductor': 'bioconductor/bioconductor_docker:RELEASE_3_13'
        }
        
        # Replace custom images with standard ones where possible
        for step in cwl_workflow.steps:
            if hasattr(step.run, 'requirements'):
                for req in step.run.requirements:
                    if req.class_ == 'DockerRequirement':
                        image = req.dockerPull
                        for key, standard_image in standard_images.items():
                            if key in image.lower():
                                req.dockerPull = standard_image
                                break
        
        # Add software requirements as hints
        for step in cwl_workflow.steps:
            if not hasattr(step.run, 'hints'):
                step.run.hints = []
            
            # Add software requirements hint
            software_hint = {
                'class': 'SoftwareRequirement',
                'packages': [
                    {'package': 'bash', 'version': ['>=4.0']},
                    {'package': 'coreutils', 'version': ['>=8.0']}
                ]
            }
            step.run.hints.append(software_hint)
        
        return cwl_workflow
    
    def validate_wdl_portability(wdl_workflow):
        """Validate WDL workflow for portability issues."""
        
        portability_issues = []
        
        for task in wdl_workflow.tasks:
            # Check for hardcoded paths
            command = task.command
            if '/usr/local' in command or '/opt/' in command:
                portability_issues.append(
                    f"Task {task.name}: Hardcoded paths in command"
                )
            
            # Check Docker image availability
            runtime = task.runtime
            if 'docker' in runtime:
                docker_image = runtime['docker']
                if 'localhost' in docker_image or 'private-registry' in docker_image:
                    portability_issues.append(
                        f"Task {task.name}: Uses private Docker registry"
                    )
            
            # Check for platform-specific commands
            platform_commands = ['sudo', 'yum', 'apt-get', 'brew']
            for cmd in platform_commands:
                if cmd in command:
                    portability_issues.append(
                        f"Task {task.name}: Uses platform-specific command '{cmd}'"
                    )
        
        return portability_issues

def create_portable_workflow_template():
    """Create template for portable workflow development."""
    
    cwl_template = """
    cwlVersion: v1.2
    class: Workflow
    
    requirements:
      - class: ScatterFeatureRequirement
      - class: MultipleInputFeatureRequirement
      - class: StepInputExpressionRequirement
    
    hints:
      - class: ResourceRequirement
        coresMin: 1
        ramMin: 1024
        tmpdirMin: 1024
        outdirMin: 1024
      - class: DockerRequirement
        dockerPull: ubuntu:20.04
    
    inputs:
      input_files:
        type: File[]
        doc: "Array of input files to process"
    
    outputs:
      processed_files:
        type: File[]
        outputSource: process_step/output_files
    
    steps:
      process_step:
        run: process_tool.cwl
        in:
          inputs: input_files
        out: [output_files]
        scatter: inputs
    """
    
    wdl_template = """
    version 1.0
    
    workflow PortableWorkflow {
        input {
            Array[File] input_files
            String output_prefix = "processed"
        }
        
        scatter (input_file in input_files) {
            call ProcessFile {
                input:
                    input_file = input_file,
                    prefix = output_prefix
            }
        }
        
        output {
            Array[File] processed_files = ProcessFile.output_file
        }
    }
    
    task ProcessFile {
        input {
            File input_file
            String prefix
            Int cpu = 1
            String memory = "2 GB"
            String disk = "10 GB"
        }
        
        command <<<
            # Use standard POSIX commands only
            basename_file=$(basename ~{input_file})
            cp ~{input_file} ~{prefix}_${basename_file}
        >>>
        
        runtime {
            docker: "ubuntu:20.04"
            cpu: cpu
            memory: memory
            disks: "local-disk " + disk + " SSD"
        }
        
        output {
            File output_file = "~{prefix}_*"
        }
    }
    """
    
    return {'cwl': cwl_template, 'wdl': wdl_template}

This workflow language integration provides comprehensive support for executing standardized workflows with full compatibility across different computing environments while leveraging Toil's advanced execution and scaling capabilities.

Install with Tessl CLI

npx tessl i tessl/pypi-toil

docs

batch-systems.md

core-workflow.md

file-management.md

index.md

job-stores.md

provisioning.md

utilities.md

workflow-languages.md

tile.json