CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-toil

Pipeline management software for clusters.

Overall
score

67%

Overview
Eval results
Files

utilities.mddocs/

Utilities and CLI Tools

Overview

Toil provides a comprehensive suite of command-line utilities and helper functions for workflow management, debugging, monitoring, and cluster operations. These tools cover the complete lifecycle from workflow development and testing to production deployment and maintenance. The utilities integrate seamlessly with Toil's core functionality and provide both interactive and scriptable interfaces for automation and integration with larger systems.

Capabilities

Main CLI Interface

{ .api }

The primary toil command provides the main interface for workflow execution and management.

from toil.utils.toilMain import main as toil_main
import sys

# Command-line workflow execution
def run_toil_workflow():
    """Execute workflow using main CLI interface."""
    
    # Basic workflow execution
    # toil workflow.py file:jobstore
    
    # With resource specifications
    # toil --batchSystem=local --maxCores=8 --maxMemory=16G workflow.py file:jobstore
    
    # Distributed execution
    # toil --batchSystem=slurm --maxNodes=50 --nodeTypes=compute workflow.py file:jobstore
    
    # Programmatic execution through main interface
    sys.argv = [
        'toil',
        '--batchSystem=local',
        '--jobStore=file:test-jobstore', 
        '--maxCores=4',
        '--maxMemory=8G',
        '--logLevel=INFO',
        '--stats',
        'workflow_script.py',
        'input_file.txt'
    ]
    
    # Execute workflow
    exit_code = toil_main()
    return exit_code

def advanced_toil_options():
    """Demonstrate advanced toil CLI options."""
    
    # Comprehensive workflow execution with all major options
    advanced_command = [
        'toil',
        
        # Job store and batch system
        '--jobStore=aws:us-west-2:my-bucket:workflow-run',
        '--batchSystem=kubernetes',
        
        # Resource limits
        '--maxCores=1000',
        '--maxMemory=2T', 
        '--maxDisk=10T',
        '--defaultMemory=4G',
        '--defaultCores=2',
        '--defaultDisk=10G',
        
        # Scaling and provisioning
        '--provisioner=aws',
        '--nodeTypes=m5.large,m5.xlarge:0.50,c5.2xlarge', 
        '--maxNodes=100',
        '--minNodes=5',
        '--targetTime=1800',  # Target job runtime in seconds
        
        # Fault tolerance  
        '--retryCount=3',
        '--rescueJobsFrequency=3600',
        '--maxJobDuration=86400',  # 24 hours max per job
        
        # Preemption and spot instances
        '--defaultPreemptible',
        '--preemptibleCompensation=1.5',
        '--preemptibleWorkerTimeout=1800',
        
        # Cleanup and management
        '--clean=onSuccess',
        '--cleanWorkDir=onSuccess',
        '--clusterStats=/tmp/cluster-stats',
        
        # Logging and monitoring  
        '--logLevel=INFO',
        '--logFile=/tmp/toil-workflow.log',
        '--stats',
        '--metrics',
        
        # Working directories
        '--workDir=/tmp/toil-work',
        '--coordinationDir=/shared/coordination',
        
        # Security and encryption
        '--sseKey=alias/toil-kms-key',
        '--encryptedFileStore',
        
        # Container support
        '--disableAutoDeployment=False',
        '--containerEngine=docker',
        
        # Workflow script and arguments
        'complex_workflow.py',
        '--input-dir=/data/inputs',
        '--output-dir=/data/outputs',
        '--reference-genome=/data/reference.fa',
        '--threads=16'
    ]
    
    return advanced_command

Workflow Status and Monitoring

{ .api }

Tools for monitoring workflow progress and status in real-time.

from toil.utils.toilStatus import main as status_main
from toil.utils.toilStats import main as stats_main

def monitor_workflow_status():
    """Monitor running workflow status."""
    
    # Check workflow status
    # toil status file:jobstore
    
    # Programmatic status checking
    sys.argv = ['toil-status', 'file:my-jobstore']
    status_info = status_main()
    
    # Detailed status with job breakdown
    sys.argv = [
        'toil-status',
        '--verbose', 
        '--failIfNotComplete',
        'aws:us-west-2:my-bucket:workflow'
    ]
    
    detailed_status = status_main()
    return detailed_status

def analyze_workflow_statistics():
    """Analyze workflow execution statistics."""
    
    # Basic statistics
    # toil stats file:jobstore
    
    # Comprehensive statistics analysis
    sys.argv = [
        'toil-stats', 
        '--raw',              # Raw statistics data
        '--pretty',           # Human-readable format
        '--categories=time,clock,wait,memory,disk',
        '--sortCategory=time',
        '--sortField=total',
        '--sortReverse',
        'file:completed-jobstore'
    ]
    
    stats_result = stats_main()
    
    # Export statistics to file
    sys.argv = [
        'toil-stats',
        '--outputFile=/tmp/workflow-stats.json',
        '--format=json', 
        'file:jobstore'
    ]
    
    stats_main()
    
    return stats_result

def real_time_monitoring():
    """Real-time workflow monitoring implementation."""
    
    import time
    import json
    from toil.statsAndLogging import StatsAndLogging
    
    class WorkflowMonitor:
        """Real-time workflow monitoring."""
        
        def __init__(self, jobstore_locator: str):
            self.jobstore = jobstore_locator
            self.stats_collector = StatsAndLogging()
            self.monitoring = True
        
        def start_monitoring(self, update_interval: int = 30):
            """Start real-time monitoring loop."""
            
            while self.monitoring:
                try:
                    # Get current status
                    status = self.get_workflow_status()
                    
                    # Get performance metrics
                    metrics = self.get_performance_metrics()
                    
                    # Display or log status
                    self.display_status(status, metrics)
                    
                    # Check for completion
                    if status.get('completed', False):
                        print("Workflow completed successfully!")
                        break
                    
                    if status.get('failed', False):
                        print("Workflow failed!")
                        self.handle_workflow_failure(status)
                        break
                        
                    time.sleep(update_interval)
                    
                except KeyboardInterrupt:
                    print("Monitoring stopped by user")
                    break
                except Exception as e:
                    print(f"Monitoring error: {e}")
                    time.sleep(5)
        
        def get_workflow_status(self) -> dict:
            """Get current workflow status."""
            
            # Implementation would call toil status programmatically
            status = {
                'total_jobs': 0,
                'completed_jobs': 0,
                'running_jobs': 0,
                'failed_jobs': 0,
                'queued_jobs': 0,
                'completion_percentage': 0.0,
                'estimated_time_remaining': None
            }
            
            # Populate status from job store
            # ... implementation details ...
            
            return status
        
        def get_performance_metrics(self) -> dict:
            """Get performance metrics."""
            
            metrics = {
                'cpu_utilization': 0.0,
                'memory_usage': 0.0,
                'network_io': 0.0,
                'disk_io': 0.0,
                'cost_per_hour': 0.0,
                'jobs_per_minute': 0.0
            }
            
            # Collect from various sources
            # ... implementation details ...
            
            return metrics
        
        def display_status(self, status: dict, metrics: dict):
            """Display formatted status information."""
            
            print("\n" + "="*60)
            print(f"Workflow Status - {time.strftime('%Y-%m-%d %H:%M:%S')}")
            print("="*60)
            
            print(f"Jobs: {status['completed_jobs']}/{status['total_jobs']} completed")
            print(f"Running: {status['running_jobs']}, Queued: {status['queued_jobs']}")
            print(f"Failed: {status['failed_jobs']}")
            print(f"Progress: {status['completion_percentage']:.1f}%")
            
            if status['estimated_time_remaining']:
                print(f"Est. remaining: {status['estimated_time_remaining']}")
            
            print(f"\nPerformance:")
            print(f"CPU Utilization: {metrics['cpu_utilization']:.1f}%")
            print(f"Memory Usage: {metrics['memory_usage']:.1f}%") 
            print(f"Cost/Hour: ${metrics['cost_per_hour']:.2f}")
            print(f"Jobs/Minute: {metrics['jobs_per_minute']:.1f}")
            
        def stop_monitoring(self):
            """Stop monitoring loop."""
            self.monitoring = False
    
    # Usage
    monitor = WorkflowMonitor("file:my-jobstore")
    monitor.start_monitoring(update_interval=10)

Debugging and Troubleshooting Tools

{ .api }

Comprehensive debugging utilities for workflow development and troubleshooting.

from toil.utils.toilDebugJob import main as debug_job_main  
from toil.utils.toilDebugFile import main as debug_file_main

def debug_workflow_issues():
    """Debug workflow execution issues."""
    
    # Debug specific job
    # toil debug-job file:jobstore <job-id>
    
    sys.argv = [
        'toil-debug-job',
        '--printJobInfo',      # Print job information
        '--printJobChildren',  # Print child jobs  
        '--printJobFiles',     # Print associated files
        '--printJobLogging',   # Print job logs
        'file:my-jobstore',
        'job-id-12345'
    ]
    
    debug_job_main()
    
    # Debug file issues
    # toil debug-file file:jobstore <file-id>
    
    sys.argv = [
        'toil-debug-file', 
        '--printFileInfo',     # File metadata
        '--printFileContent',  # File contents (if small)
        '--saveFile=/tmp/debug-file',  # Save file locally
        'file:my-jobstore',
        'file-id-67890'
    ]
    
    debug_file_main()

def advanced_debugging_tools():
    """Advanced debugging and analysis tools."""
    
    class WorkflowDebugger:
        """Comprehensive workflow debugging toolkit."""
        
        def __init__(self, jobstore_locator: str):
            self.jobstore = jobstore_locator
        
        def analyze_failed_jobs(self):
            """Analyze failed jobs and common failure patterns."""
            
            failed_jobs = self.get_failed_jobs()
            
            failure_patterns = {
                'out_of_memory': 0,
                'timeout': 0, 
                'missing_files': 0,
                'command_not_found': 0,
                'permission_denied': 0,
                'network_error': 0,
                'unknown': 0
            }
            
            for job in failed_jobs:
                # Analyze failure reason
                exit_code = job.get('exit_code', 0)
                stderr = job.get('stderr', '')
                
                if exit_code == 137:  # SIGKILL - likely OOM
                    failure_patterns['out_of_memory'] += 1
                elif exit_code == 124:  # timeout  
                    failure_patterns['timeout'] += 1
                elif 'No such file' in stderr:
                    failure_patterns['missing_files'] += 1
                elif 'command not found' in stderr:
                    failure_patterns['command_not_found'] += 1
                elif 'Permission denied' in stderr:
                    failure_patterns['permission_denied'] += 1
                elif 'network' in stderr.lower():
                    failure_patterns['network_error'] += 1
                else:
                    failure_patterns['unknown'] += 1
            
            # Generate debugging report
            print("Failed Job Analysis:")
            print("="*50)
            for pattern, count in failure_patterns.items():
                if count > 0:
                    print(f"{pattern.replace('_', ' ').title()}: {count} jobs")
                    self.suggest_fixes(pattern, count)
            
            return failure_patterns
        
        def suggest_fixes(self, pattern: str, count: int):
            """Suggest fixes for common failure patterns."""
            
            suggestions = {
                'out_of_memory': [
                    "Increase memory requirements for affected jobs",
                    "Use streaming or chunked processing for large data",
                    "Check for memory leaks in job code"
                ],
                'timeout': [
                    "Increase job timeout limits", 
                    "Optimize algorithm efficiency",
                    "Split large jobs into smaller chunks"
                ],
                'missing_files': [
                    "Verify input file paths and availability",
                    "Check file transfer and staging",
                    "Ensure proper job dependencies"
                ],
                'command_not_found': [
                    "Install missing software in Docker images",
                    "Check PATH environment variable",
                    "Verify tool versions and compatibility"
                ],
                'permission_denied': [
                    "Fix file and directory permissions",
                    "Check Docker volume mounts",
                    "Verify user/group settings"
                ]
            }
            
            if pattern in suggestions:
                print(f"  Suggested fixes:")
                for suggestion in suggestions[pattern]:
                    print(f"    • {suggestion}")
        
        def trace_job_execution(self, job_id: str):
            """Trace job execution history and dependencies."""
            
            job_info = self.get_job_info(job_id)
            
            print(f"Job Execution Trace: {job_id}")
            print("="*50)
            print(f"Job Name: {job_info.get('name', 'Unknown')}")
            print(f"Status: {job_info.get('status', 'Unknown')}")
            print(f"Start Time: {job_info.get('start_time', 'Unknown')}")
            print(f"End Time: {job_info.get('end_time', 'Unknown')}")
            print(f"Duration: {job_info.get('duration', 'Unknown')}")
            print(f"Resources Used: CPU={job_info.get('cpu_used', 'N/A')}, "
                  f"Memory={job_info.get('memory_used', 'N/A')}")
            
            # Show dependencies
            predecessors = job_info.get('predecessors', [])
            if predecessors:
                print(f"\nPredecessor Jobs:")
                for pred_id in predecessors:
                    pred_info = self.get_job_info(pred_id)
                    print(f"  {pred_id}: {pred_info.get('status', 'Unknown')}")
            
            # Show children
            children = job_info.get('children', [])
            if children:
                print(f"\nChild Jobs:")
                for child_id in children:
                    child_info = self.get_job_info(child_id)
                    print(f"  {child_id}: {child_info.get('status', 'Unknown')}")
            
            # Show files
            input_files = job_info.get('input_files', [])
            output_files = job_info.get('output_files', [])
            
            if input_files:
                print(f"\nInput Files:")
                for file_id in input_files:
                    print(f"  {file_id}")
            
            if output_files:
                print(f"\nOutput Files:")
                for file_id in output_files:
                    print(f"  {file_id}")
        
        def performance_analysis(self):
            """Analyze workflow performance and bottlenecks."""
            
            jobs = self.get_all_jobs()
            
            # Resource utilization analysis
            cpu_utilization = []
            memory_utilization = []
            job_durations = []
            
            for job in jobs:
                if job.get('completed'):
                    cpu_util = job.get('cpu_utilization', 0)
                    mem_util = job.get('memory_utilization', 0) 
                    duration = job.get('duration', 0)
                    
                    cpu_utilization.append(cpu_util)
                    memory_utilization.append(mem_util)
                    job_durations.append(duration)
            
            # Calculate statistics
            if cpu_utilization:
                avg_cpu = sum(cpu_utilization) / len(cpu_utilization)
                avg_memory = sum(memory_utilization) / len(memory_utilization)
                avg_duration = sum(job_durations) / len(job_durations)
                
                print("Performance Analysis:")
                print("="*50)
                print(f"Average CPU Utilization: {avg_cpu:.1f}%")
                print(f"Average Memory Utilization: {avg_memory:.1f}%")
                print(f"Average Job Duration: {avg_duration:.2f}s")
                
                # Identify bottlenecks
                long_jobs = [j for j in jobs if j.get('duration', 0) > avg_duration * 3]
                if long_jobs:
                    print(f"\nLong-running jobs ({len(long_jobs)}):")
                    for job in sorted(long_jobs, key=lambda x: x.get('duration', 0), reverse=True)[:5]:
                        print(f"  {job['id']}: {job.get('duration', 0):.2f}s")
                
                # Resource inefficiency  
                inefficient_jobs = [j for j in jobs if j.get('cpu_utilization', 100) < 50]
                if inefficient_jobs:
                    print(f"\nResource-inefficient jobs ({len(inefficient_jobs)}):")
                    for job in inefficient_jobs[:5]:
                        print(f"  {job['id']}: {job.get('cpu_utilization', 0):.1f}% CPU")

Cleanup and Maintenance Tools

{ .api }

Tools for cleaning up job stores and maintaining workflow environments.

from toil.utils.toilClean import main as clean_main
from toil.utils.toilKill import main as kill_main

def cleanup_workflows():
    """Clean up workflow artifacts and job stores."""
    
    # Clean completed workflow
    # toil clean file:jobstore
    
    sys.argv = ['toil-clean', 'file:completed-jobstore']
    clean_main()
    
    # Comprehensive cleanup with options
    sys.argv = [
        'toil-clean',
        '--cleanWorkDir',      # Clean working directories
        '--cleanJobStore',     # Clean job store completely  
        'aws:us-west-2:my-bucket:old-workflow'
    ]
    
    clean_main()
    
    # Kill running workflow
    # toil kill file:jobstore
    
    sys.argv = ['toil-kill', 'file:running-jobstore']
    kill_main()

def maintenance_utilities():
    """Workflow maintenance and housekeeping utilities."""
    
    class WorkflowMaintenance:
        """Workflow maintenance toolkit."""
        
        def cleanup_old_jobstores(self, days_old: int = 30):
            """Clean up job stores older than specified days."""
            
            import os
            import time
            
            # Find old job stores (implementation depends on storage type)
            current_time = time.time()
            cutoff_time = current_time - (days_old * 86400)
            
            old_jobstores = []
            
            # For file-based job stores
            jobstore_dir = "/tmp/toil-jobstores"
            if os.path.exists(jobstore_dir):
                for item in os.listdir(jobstore_dir):
                    item_path = os.path.join(jobstore_dir, item)
                    if os.path.isdir(item_path):
                        mtime = os.path.getmtime(item_path)
                        if mtime < cutoff_time:
                            old_jobstores.append(f"file:{item_path}")
            
            # Clean up old job stores
            for jobstore in old_jobstores:
                try:
                    print(f"Cleaning old jobstore: {jobstore}")
                    sys.argv = ['toil-clean', jobstore]
                    clean_main()
                except Exception as e:
                    print(f"Failed to clean {jobstore}: {e}")
            
            return len(old_jobstores)
        
        def archive_completed_workflows(self, archive_dir: str):
            """Archive completed workflow results."""
            
            import shutil
            import json
            
            completed_workflows = self.find_completed_workflows()
            
            for workflow in completed_workflows:
                try:
                    # Create archive directory
                    workflow_archive = os.path.join(
                        archive_dir, 
                        f"workflow_{workflow['id']}_{workflow['completion_date']}"
                    )
                    os.makedirs(workflow_archive, exist_ok=True)
                    
                    # Archive workflow results
                    if workflow['output_files']:
                        output_archive = os.path.join(workflow_archive, "outputs")
                        os.makedirs(output_archive, exist_ok=True)
                        
                        for output_file in workflow['output_files']:
                            shutil.copy2(output_file, output_archive)
                    
                    # Archive statistics
                    stats_file = os.path.join(workflow_archive, "statistics.json")
                    with open(stats_file, 'w') as f:
                        json.dump(workflow['statistics'], f, indent=2)
                    
                    # Archive logs
                    if workflow.get('log_files'):
                        log_archive = os.path.join(workflow_archive, "logs")
                        os.makedirs(log_archive, exist_ok=True)
                        
                        for log_file in workflow['log_files']:
                            shutil.copy2(log_file, log_archive)
                    
                    print(f"Archived workflow {workflow['id']} to {workflow_archive}")
                    
                except Exception as e:
                    print(f"Failed to archive workflow {workflow['id']}: {e}")
        
        def monitor_disk_usage(self, threshold_percent: float = 85.0):
            """Monitor and alert on high disk usage."""
            
            import shutil
            
            # Check various Toil directories
            directories_to_check = [
                "/tmp/toil-work",
                "/var/tmp/toil",
                "/tmp/toil-jobstores"
            ]
            
            alerts = []
            
            for directory in directories_to_check:
                if os.path.exists(directory):
                    total, used, free = shutil.disk_usage(directory)
                    usage_percent = (used / total) * 100
                    
                    if usage_percent > threshold_percent:
                        alerts.append({
                            'directory': directory,
                            'usage_percent': usage_percent,
                            'free_gb': free / (1024**3),
                            'total_gb': total / (1024**3)
                        })
            
            if alerts:
                print("DISK USAGE ALERTS:")
                print("="*50)
                for alert in alerts:
                    print(f"Directory: {alert['directory']}")
                    print(f"Usage: {alert['usage_percent']:.1f}%") 
                    print(f"Free: {alert['free_gb']:.1f} GB")
                    print(f"Total: {alert['total_gb']:.1f} GB")
                    print("-" * 30)
                
                # Suggest cleanup actions
                print("Suggested Actions:")
                print("• Run 'toil clean' on completed job stores")
                print("• Archive old workflow outputs")
                print("• Clear temporary directories")
                print("• Check for large log files")
            
            return alerts

Cluster Management Tools

{ .api }

Command-line tools for managing cloud clusters and distributed environments.

from toil.utils.toilLaunchCluster import main as launch_cluster_main
from toil.utils.toilDestroyCluster import main as destroy_cluster_main
from toil.utils.toilSshCluster import main as ssh_cluster_main
from toil.utils.toilRsyncCluster import main as rsync_cluster_main

def cluster_management():
    """Manage cloud clusters for distributed execution."""
    
    # Launch cluster
    # toil launch-cluster my-cluster --nodeTypes=m5.large,m5.xlarge --maxNodes=50
    
    sys.argv = [
        'toil-launch-cluster',
        'research-cluster',
        '--provisioner=aws',
        '--nodeTypes=m5.large,m5.xlarge:0.50,c5.2xlarge',
        '--maxNodes=100',
        '--zone=us-west-2a',
        '--keyPairName=research-keypair',
        '--leaderNodeType=m5.large',
        '--nodeStorage=100',
        '--preemptibleWorkers',
        '--logLevel=INFO'
    ]
    
    launch_cluster_main()
    
    # SSH to cluster leader
    # toil ssh-cluster my-cluster
    
    sys.argv = ['toil-ssh-cluster', 'research-cluster']
    ssh_cluster_main()
    
    # Sync files to cluster
    # toil rsync-cluster -r local-dir my-cluster:remote-dir
    
    sys.argv = [
        'toil-rsync-cluster',
        '--recursive',
        '/local/data/',
        'research-cluster:/shared/data/'
    ]
    
    rsync_cluster_main()
    
    # Destroy cluster
    # toil destroy-cluster my-cluster
    
    sys.argv = ['toil-destroy-cluster', 'research-cluster']
    destroy_cluster_main()

def advanced_cluster_operations():
    """Advanced cluster management operations."""
    
    class ClusterOperations:
        """Advanced cluster operation utilities."""
        
        def deploy_workflow_to_cluster(self, cluster_name: str, 
                                     workflow_script: str, 
                                     input_data: str):
            """Deploy and run workflow on cluster."""
            
            # Copy workflow and data to cluster
            print(f"Deploying to cluster: {cluster_name}")
            
            # Sync workflow files
            sys.argv = [
                'toil-rsync-cluster',
                '--recursive',
                os.path.dirname(workflow_script),
                f'{cluster_name}:/toil/workflows/'
            ]
            rsync_cluster_main()
            
            # Sync input data
            sys.argv = [
                'toil-rsync-cluster', 
                '--recursive',
                input_data,
                f'{cluster_name}:/toil/data/'
            ]
            rsync_cluster_main()
            
            # Execute workflow on cluster
            remote_command = f"""
                cd /toil &&
                toil --batchSystem=mesos \\
                     --jobStore=aws:us-west-2:results:workflow-run \\
                     --provisioner=aws \\
                     --nodeTypes=m5.large,m5.xlarge \\
                     --maxNodes=50 \\
                     --stats \\
                     workflows/{os.path.basename(workflow_script)} \\
                     data/
            """
            
            # SSH and execute
            sys.argv = [
                'toil-ssh-cluster',
                cluster_name,
                remote_command
            ]
            
            ssh_cluster_main()
        
        def monitor_cluster_resources(self, cluster_name: str):
            """Monitor cluster resource usage."""
            
            monitoring_script = """
                # Get node information
                kubectl get nodes -o wide
                
                # Check resource usage
                kubectl top nodes
                
                # Check running pods
                kubectl get pods --all-namespaces
                
                # System resource usage
                free -h
                df -h
                
                # Toil-specific monitoring
                ps aux | grep toil
                
                # Check logs
                tail -n 50 /var/log/toil/*.log
            """
            
            sys.argv = [
                'toil-ssh-cluster',
                cluster_name,
                monitoring_script
            ]
            
            ssh_cluster_main()
        
        def backup_cluster_data(self, cluster_name: str, backup_location: str):
            """Backup important cluster data."""
            
            # Sync results back to local
            sys.argv = [
                'toil-rsync-cluster',
                '--recursive',
                f'{cluster_name}:/toil/results/',
                os.path.join(backup_location, 'results')
            ]
            rsync_cluster_main()
            
            # Sync logs
            sys.argv = [
                'toil-rsync-cluster',
                '--recursive',  
                f'{cluster_name}:/var/log/toil/',
                os.path.join(backup_location, 'logs')
            ]
            rsync_cluster_main()
            
            # Sync job store (if file-based)
            sys.argv = [
                'toil-rsync-cluster',
                '--recursive',
                f'{cluster_name}:/toil/jobstore/',
                os.path.join(backup_location, 'jobstore')
            ]
            rsync_cluster_main()

Configuration Management

{ .api }

Tools for managing Toil configuration files and settings.

from toil.utils.toilConfig import main as config_main
from toil.common import get_default_config_path, ensure_config, generate_config, update_config

def configuration_management():
    """Manage Toil configuration files and settings."""
    
    # Generate default configuration
    default_config_path = get_default_config_path()
    ensure_config(default_config_path)
    
    # Generate custom configuration
    custom_config = "/etc/toil/production.conf"
    generate_config(custom_config)
    
    # Update configuration values
    update_config(custom_config, "batchSystem", "slurm")
    update_config(custom_config, "maxCores", "64")
    update_config(custom_config, "defaultMemory", "8G")
    
    # Use config management CLI
    sys.argv = [
        'toil-config',
        '--set', 'jobStore=aws:us-west-2:my-bucket',
        '--set', 'batchSystem=kubernetes',
        '--set', 'maxNodes=100',
        '--config-file', custom_config
    ]
    
    config_main()

def configuration_templates():
    """Provide configuration templates for different use cases."""
    
    templates = {
        'local_development': {
            'batchSystem': 'local',
            'maxCores': 4,
            'maxMemory': '8G',
            'jobStore': 'file:/tmp/toil-dev',
            'logLevel': 'DEBUG',
            'stats': True
        },
        
        'hpc_cluster': {
            'batchSystem': 'slurm',
            'maxCores': 1000,
            'maxMemory': '2T',
            'jobStore': 'file:/shared/toil-jobs',
            'workDir': '/tmp/toil-work',
            'logLevel': 'INFO',
            'retryCount': 3,
            'rescueJobsFrequency': 3600
        },
        
        'cloud_production': {
            'batchSystem': 'kubernetes',
            'provisioner': 'aws',
            'jobStore': 'aws:us-west-2:production-bucket:workflows',
            'nodeTypes': ['m5.large', 'm5.xlarge', 'c5.2xlarge'],
            'maxNodes': 500,
            'defaultPreemptible': True,
            'preemptibleCompensation': 1.5,
            'sseKey': 'alias/toil-production-key',
            'clean': 'onSuccess',
            'stats': True,
            'logLevel': 'INFO'
        }
    }
    
    def create_config_from_template(template_name: str, output_file: str):
        """Create configuration file from template."""
        
        if template_name not in templates:
            raise ValueError(f"Unknown template: {template_name}")
        
        template = templates[template_name]
        
        # Generate base config
        generate_config(output_file)
        
        # Apply template values
        for key, value in template.items():
            update_config(output_file, key, value)
        
        print(f"Configuration created: {output_file}")
        print(f"Template used: {template_name}")
    
    return create_config_from_template

This comprehensive utilities and CLI tools suite provides complete workflow lifecycle management from development through production deployment with extensive monitoring, debugging, and maintenance capabilities.

Install with Tessl CLI

npx tessl i tessl/pypi-toil

docs

batch-systems.md

core-workflow.md

file-management.md

index.md

job-stores.md

provisioning.md

utilities.md

workflow-languages.md

tile.json