Pipeline management software for clusters.
Overall
score
67%
Toil provides a comprehensive suite of command-line utilities and helper functions for workflow management, debugging, monitoring, and cluster operations. These tools cover the complete lifecycle from workflow development and testing to production deployment and maintenance. The utilities integrate seamlessly with Toil's core functionality and provide both interactive and scriptable interfaces for automation and integration with larger systems.
{ .api }
The primary toil command provides the main interface for workflow execution and management.
from toil.utils.toilMain import main as toil_main
import sys
# Command-line workflow execution
def run_toil_workflow():
"""Execute workflow using main CLI interface."""
# Basic workflow execution
# toil workflow.py file:jobstore
# With resource specifications
# toil --batchSystem=local --maxCores=8 --maxMemory=16G workflow.py file:jobstore
# Distributed execution
# toil --batchSystem=slurm --maxNodes=50 --nodeTypes=compute workflow.py file:jobstore
# Programmatic execution through main interface
sys.argv = [
'toil',
'--batchSystem=local',
'--jobStore=file:test-jobstore',
'--maxCores=4',
'--maxMemory=8G',
'--logLevel=INFO',
'--stats',
'workflow_script.py',
'input_file.txt'
]
# Execute workflow
exit_code = toil_main()
return exit_code
def advanced_toil_options():
"""Demonstrate advanced toil CLI options."""
# Comprehensive workflow execution with all major options
advanced_command = [
'toil',
# Job store and batch system
'--jobStore=aws:us-west-2:my-bucket:workflow-run',
'--batchSystem=kubernetes',
# Resource limits
'--maxCores=1000',
'--maxMemory=2T',
'--maxDisk=10T',
'--defaultMemory=4G',
'--defaultCores=2',
'--defaultDisk=10G',
# Scaling and provisioning
'--provisioner=aws',
'--nodeTypes=m5.large,m5.xlarge:0.50,c5.2xlarge',
'--maxNodes=100',
'--minNodes=5',
'--targetTime=1800', # Target job runtime in seconds
# Fault tolerance
'--retryCount=3',
'--rescueJobsFrequency=3600',
'--maxJobDuration=86400', # 24 hours max per job
# Preemption and spot instances
'--defaultPreemptible',
'--preemptibleCompensation=1.5',
'--preemptibleWorkerTimeout=1800',
# Cleanup and management
'--clean=onSuccess',
'--cleanWorkDir=onSuccess',
'--clusterStats=/tmp/cluster-stats',
# Logging and monitoring
'--logLevel=INFO',
'--logFile=/tmp/toil-workflow.log',
'--stats',
'--metrics',
# Working directories
'--workDir=/tmp/toil-work',
'--coordinationDir=/shared/coordination',
# Security and encryption
'--sseKey=alias/toil-kms-key',
'--encryptedFileStore',
# Container support
'--disableAutoDeployment=False',
'--containerEngine=docker',
# Workflow script and arguments
'complex_workflow.py',
'--input-dir=/data/inputs',
'--output-dir=/data/outputs',
'--reference-genome=/data/reference.fa',
'--threads=16'
]
return advanced_command{ .api }
Tools for monitoring workflow progress and status in real-time.
from toil.utils.toilStatus import main as status_main
from toil.utils.toilStats import main as stats_main
def monitor_workflow_status():
"""Monitor running workflow status."""
# Check workflow status
# toil status file:jobstore
# Programmatic status checking
sys.argv = ['toil-status', 'file:my-jobstore']
status_info = status_main()
# Detailed status with job breakdown
sys.argv = [
'toil-status',
'--verbose',
'--failIfNotComplete',
'aws:us-west-2:my-bucket:workflow'
]
detailed_status = status_main()
return detailed_status
def analyze_workflow_statistics():
"""Analyze workflow execution statistics."""
# Basic statistics
# toil stats file:jobstore
# Comprehensive statistics analysis
sys.argv = [
'toil-stats',
'--raw', # Raw statistics data
'--pretty', # Human-readable format
'--categories=time,clock,wait,memory,disk',
'--sortCategory=time',
'--sortField=total',
'--sortReverse',
'file:completed-jobstore'
]
stats_result = stats_main()
# Export statistics to file
sys.argv = [
'toil-stats',
'--outputFile=/tmp/workflow-stats.json',
'--format=json',
'file:jobstore'
]
stats_main()
return stats_result
def real_time_monitoring():
"""Real-time workflow monitoring implementation."""
import time
import json
from toil.statsAndLogging import StatsAndLogging
class WorkflowMonitor:
"""Real-time workflow monitoring."""
def __init__(self, jobstore_locator: str):
self.jobstore = jobstore_locator
self.stats_collector = StatsAndLogging()
self.monitoring = True
def start_monitoring(self, update_interval: int = 30):
"""Start real-time monitoring loop."""
while self.monitoring:
try:
# Get current status
status = self.get_workflow_status()
# Get performance metrics
metrics = self.get_performance_metrics()
# Display or log status
self.display_status(status, metrics)
# Check for completion
if status.get('completed', False):
print("Workflow completed successfully!")
break
if status.get('failed', False):
print("Workflow failed!")
self.handle_workflow_failure(status)
break
time.sleep(update_interval)
except KeyboardInterrupt:
print("Monitoring stopped by user")
break
except Exception as e:
print(f"Monitoring error: {e}")
time.sleep(5)
def get_workflow_status(self) -> dict:
"""Get current workflow status."""
# Implementation would call toil status programmatically
status = {
'total_jobs': 0,
'completed_jobs': 0,
'running_jobs': 0,
'failed_jobs': 0,
'queued_jobs': 0,
'completion_percentage': 0.0,
'estimated_time_remaining': None
}
# Populate status from job store
# ... implementation details ...
return status
def get_performance_metrics(self) -> dict:
"""Get performance metrics."""
metrics = {
'cpu_utilization': 0.0,
'memory_usage': 0.0,
'network_io': 0.0,
'disk_io': 0.0,
'cost_per_hour': 0.0,
'jobs_per_minute': 0.0
}
# Collect from various sources
# ... implementation details ...
return metrics
def display_status(self, status: dict, metrics: dict):
"""Display formatted status information."""
print("\n" + "="*60)
print(f"Workflow Status - {time.strftime('%Y-%m-%d %H:%M:%S')}")
print("="*60)
print(f"Jobs: {status['completed_jobs']}/{status['total_jobs']} completed")
print(f"Running: {status['running_jobs']}, Queued: {status['queued_jobs']}")
print(f"Failed: {status['failed_jobs']}")
print(f"Progress: {status['completion_percentage']:.1f}%")
if status['estimated_time_remaining']:
print(f"Est. remaining: {status['estimated_time_remaining']}")
print(f"\nPerformance:")
print(f"CPU Utilization: {metrics['cpu_utilization']:.1f}%")
print(f"Memory Usage: {metrics['memory_usage']:.1f}%")
print(f"Cost/Hour: ${metrics['cost_per_hour']:.2f}")
print(f"Jobs/Minute: {metrics['jobs_per_minute']:.1f}")
def stop_monitoring(self):
"""Stop monitoring loop."""
self.monitoring = False
# Usage
monitor = WorkflowMonitor("file:my-jobstore")
monitor.start_monitoring(update_interval=10){ .api }
Comprehensive debugging utilities for workflow development and troubleshooting.
from toil.utils.toilDebugJob import main as debug_job_main
from toil.utils.toilDebugFile import main as debug_file_main
def debug_workflow_issues():
"""Debug workflow execution issues."""
# Debug specific job
# toil debug-job file:jobstore <job-id>
sys.argv = [
'toil-debug-job',
'--printJobInfo', # Print job information
'--printJobChildren', # Print child jobs
'--printJobFiles', # Print associated files
'--printJobLogging', # Print job logs
'file:my-jobstore',
'job-id-12345'
]
debug_job_main()
# Debug file issues
# toil debug-file file:jobstore <file-id>
sys.argv = [
'toil-debug-file',
'--printFileInfo', # File metadata
'--printFileContent', # File contents (if small)
'--saveFile=/tmp/debug-file', # Save file locally
'file:my-jobstore',
'file-id-67890'
]
debug_file_main()
def advanced_debugging_tools():
"""Advanced debugging and analysis tools."""
class WorkflowDebugger:
"""Comprehensive workflow debugging toolkit."""
def __init__(self, jobstore_locator: str):
self.jobstore = jobstore_locator
def analyze_failed_jobs(self):
"""Analyze failed jobs and common failure patterns."""
failed_jobs = self.get_failed_jobs()
failure_patterns = {
'out_of_memory': 0,
'timeout': 0,
'missing_files': 0,
'command_not_found': 0,
'permission_denied': 0,
'network_error': 0,
'unknown': 0
}
for job in failed_jobs:
# Analyze failure reason
exit_code = job.get('exit_code', 0)
stderr = job.get('stderr', '')
if exit_code == 137: # SIGKILL - likely OOM
failure_patterns['out_of_memory'] += 1
elif exit_code == 124: # timeout
failure_patterns['timeout'] += 1
elif 'No such file' in stderr:
failure_patterns['missing_files'] += 1
elif 'command not found' in stderr:
failure_patterns['command_not_found'] += 1
elif 'Permission denied' in stderr:
failure_patterns['permission_denied'] += 1
elif 'network' in stderr.lower():
failure_patterns['network_error'] += 1
else:
failure_patterns['unknown'] += 1
# Generate debugging report
print("Failed Job Analysis:")
print("="*50)
for pattern, count in failure_patterns.items():
if count > 0:
print(f"{pattern.replace('_', ' ').title()}: {count} jobs")
self.suggest_fixes(pattern, count)
return failure_patterns
def suggest_fixes(self, pattern: str, count: int):
"""Suggest fixes for common failure patterns."""
suggestions = {
'out_of_memory': [
"Increase memory requirements for affected jobs",
"Use streaming or chunked processing for large data",
"Check for memory leaks in job code"
],
'timeout': [
"Increase job timeout limits",
"Optimize algorithm efficiency",
"Split large jobs into smaller chunks"
],
'missing_files': [
"Verify input file paths and availability",
"Check file transfer and staging",
"Ensure proper job dependencies"
],
'command_not_found': [
"Install missing software in Docker images",
"Check PATH environment variable",
"Verify tool versions and compatibility"
],
'permission_denied': [
"Fix file and directory permissions",
"Check Docker volume mounts",
"Verify user/group settings"
]
}
if pattern in suggestions:
print(f" Suggested fixes:")
for suggestion in suggestions[pattern]:
print(f" • {suggestion}")
def trace_job_execution(self, job_id: str):
"""Trace job execution history and dependencies."""
job_info = self.get_job_info(job_id)
print(f"Job Execution Trace: {job_id}")
print("="*50)
print(f"Job Name: {job_info.get('name', 'Unknown')}")
print(f"Status: {job_info.get('status', 'Unknown')}")
print(f"Start Time: {job_info.get('start_time', 'Unknown')}")
print(f"End Time: {job_info.get('end_time', 'Unknown')}")
print(f"Duration: {job_info.get('duration', 'Unknown')}")
print(f"Resources Used: CPU={job_info.get('cpu_used', 'N/A')}, "
f"Memory={job_info.get('memory_used', 'N/A')}")
# Show dependencies
predecessors = job_info.get('predecessors', [])
if predecessors:
print(f"\nPredecessor Jobs:")
for pred_id in predecessors:
pred_info = self.get_job_info(pred_id)
print(f" {pred_id}: {pred_info.get('status', 'Unknown')}")
# Show children
children = job_info.get('children', [])
if children:
print(f"\nChild Jobs:")
for child_id in children:
child_info = self.get_job_info(child_id)
print(f" {child_id}: {child_info.get('status', 'Unknown')}")
# Show files
input_files = job_info.get('input_files', [])
output_files = job_info.get('output_files', [])
if input_files:
print(f"\nInput Files:")
for file_id in input_files:
print(f" {file_id}")
if output_files:
print(f"\nOutput Files:")
for file_id in output_files:
print(f" {file_id}")
def performance_analysis(self):
"""Analyze workflow performance and bottlenecks."""
jobs = self.get_all_jobs()
# Resource utilization analysis
cpu_utilization = []
memory_utilization = []
job_durations = []
for job in jobs:
if job.get('completed'):
cpu_util = job.get('cpu_utilization', 0)
mem_util = job.get('memory_utilization', 0)
duration = job.get('duration', 0)
cpu_utilization.append(cpu_util)
memory_utilization.append(mem_util)
job_durations.append(duration)
# Calculate statistics
if cpu_utilization:
avg_cpu = sum(cpu_utilization) / len(cpu_utilization)
avg_memory = sum(memory_utilization) / len(memory_utilization)
avg_duration = sum(job_durations) / len(job_durations)
print("Performance Analysis:")
print("="*50)
print(f"Average CPU Utilization: {avg_cpu:.1f}%")
print(f"Average Memory Utilization: {avg_memory:.1f}%")
print(f"Average Job Duration: {avg_duration:.2f}s")
# Identify bottlenecks
long_jobs = [j for j in jobs if j.get('duration', 0) > avg_duration * 3]
if long_jobs:
print(f"\nLong-running jobs ({len(long_jobs)}):")
for job in sorted(long_jobs, key=lambda x: x.get('duration', 0), reverse=True)[:5]:
print(f" {job['id']}: {job.get('duration', 0):.2f}s")
# Resource inefficiency
inefficient_jobs = [j for j in jobs if j.get('cpu_utilization', 100) < 50]
if inefficient_jobs:
print(f"\nResource-inefficient jobs ({len(inefficient_jobs)}):")
for job in inefficient_jobs[:5]:
print(f" {job['id']}: {job.get('cpu_utilization', 0):.1f}% CPU"){ .api }
Tools for cleaning up job stores and maintaining workflow environments.
from toil.utils.toilClean import main as clean_main
from toil.utils.toilKill import main as kill_main
def cleanup_workflows():
"""Clean up workflow artifacts and job stores."""
# Clean completed workflow
# toil clean file:jobstore
sys.argv = ['toil-clean', 'file:completed-jobstore']
clean_main()
# Comprehensive cleanup with options
sys.argv = [
'toil-clean',
'--cleanWorkDir', # Clean working directories
'--cleanJobStore', # Clean job store completely
'aws:us-west-2:my-bucket:old-workflow'
]
clean_main()
# Kill running workflow
# toil kill file:jobstore
sys.argv = ['toil-kill', 'file:running-jobstore']
kill_main()
def maintenance_utilities():
"""Workflow maintenance and housekeeping utilities."""
class WorkflowMaintenance:
"""Workflow maintenance toolkit."""
def cleanup_old_jobstores(self, days_old: int = 30):
"""Clean up job stores older than specified days."""
import os
import time
# Find old job stores (implementation depends on storage type)
current_time = time.time()
cutoff_time = current_time - (days_old * 86400)
old_jobstores = []
# For file-based job stores
jobstore_dir = "/tmp/toil-jobstores"
if os.path.exists(jobstore_dir):
for item in os.listdir(jobstore_dir):
item_path = os.path.join(jobstore_dir, item)
if os.path.isdir(item_path):
mtime = os.path.getmtime(item_path)
if mtime < cutoff_time:
old_jobstores.append(f"file:{item_path}")
# Clean up old job stores
for jobstore in old_jobstores:
try:
print(f"Cleaning old jobstore: {jobstore}")
sys.argv = ['toil-clean', jobstore]
clean_main()
except Exception as e:
print(f"Failed to clean {jobstore}: {e}")
return len(old_jobstores)
def archive_completed_workflows(self, archive_dir: str):
"""Archive completed workflow results."""
import shutil
import json
completed_workflows = self.find_completed_workflows()
for workflow in completed_workflows:
try:
# Create archive directory
workflow_archive = os.path.join(
archive_dir,
f"workflow_{workflow['id']}_{workflow['completion_date']}"
)
os.makedirs(workflow_archive, exist_ok=True)
# Archive workflow results
if workflow['output_files']:
output_archive = os.path.join(workflow_archive, "outputs")
os.makedirs(output_archive, exist_ok=True)
for output_file in workflow['output_files']:
shutil.copy2(output_file, output_archive)
# Archive statistics
stats_file = os.path.join(workflow_archive, "statistics.json")
with open(stats_file, 'w') as f:
json.dump(workflow['statistics'], f, indent=2)
# Archive logs
if workflow.get('log_files'):
log_archive = os.path.join(workflow_archive, "logs")
os.makedirs(log_archive, exist_ok=True)
for log_file in workflow['log_files']:
shutil.copy2(log_file, log_archive)
print(f"Archived workflow {workflow['id']} to {workflow_archive}")
except Exception as e:
print(f"Failed to archive workflow {workflow['id']}: {e}")
def monitor_disk_usage(self, threshold_percent: float = 85.0):
"""Monitor and alert on high disk usage."""
import shutil
# Check various Toil directories
directories_to_check = [
"/tmp/toil-work",
"/var/tmp/toil",
"/tmp/toil-jobstores"
]
alerts = []
for directory in directories_to_check:
if os.path.exists(directory):
total, used, free = shutil.disk_usage(directory)
usage_percent = (used / total) * 100
if usage_percent > threshold_percent:
alerts.append({
'directory': directory,
'usage_percent': usage_percent,
'free_gb': free / (1024**3),
'total_gb': total / (1024**3)
})
if alerts:
print("DISK USAGE ALERTS:")
print("="*50)
for alert in alerts:
print(f"Directory: {alert['directory']}")
print(f"Usage: {alert['usage_percent']:.1f}%")
print(f"Free: {alert['free_gb']:.1f} GB")
print(f"Total: {alert['total_gb']:.1f} GB")
print("-" * 30)
# Suggest cleanup actions
print("Suggested Actions:")
print("• Run 'toil clean' on completed job stores")
print("• Archive old workflow outputs")
print("• Clear temporary directories")
print("• Check for large log files")
return alerts{ .api }
Command-line tools for managing cloud clusters and distributed environments.
from toil.utils.toilLaunchCluster import main as launch_cluster_main
from toil.utils.toilDestroyCluster import main as destroy_cluster_main
from toil.utils.toilSshCluster import main as ssh_cluster_main
from toil.utils.toilRsyncCluster import main as rsync_cluster_main
def cluster_management():
"""Manage cloud clusters for distributed execution."""
# Launch cluster
# toil launch-cluster my-cluster --nodeTypes=m5.large,m5.xlarge --maxNodes=50
sys.argv = [
'toil-launch-cluster',
'research-cluster',
'--provisioner=aws',
'--nodeTypes=m5.large,m5.xlarge:0.50,c5.2xlarge',
'--maxNodes=100',
'--zone=us-west-2a',
'--keyPairName=research-keypair',
'--leaderNodeType=m5.large',
'--nodeStorage=100',
'--preemptibleWorkers',
'--logLevel=INFO'
]
launch_cluster_main()
# SSH to cluster leader
# toil ssh-cluster my-cluster
sys.argv = ['toil-ssh-cluster', 'research-cluster']
ssh_cluster_main()
# Sync files to cluster
# toil rsync-cluster -r local-dir my-cluster:remote-dir
sys.argv = [
'toil-rsync-cluster',
'--recursive',
'/local/data/',
'research-cluster:/shared/data/'
]
rsync_cluster_main()
# Destroy cluster
# toil destroy-cluster my-cluster
sys.argv = ['toil-destroy-cluster', 'research-cluster']
destroy_cluster_main()
def advanced_cluster_operations():
"""Advanced cluster management operations."""
class ClusterOperations:
"""Advanced cluster operation utilities."""
def deploy_workflow_to_cluster(self, cluster_name: str,
workflow_script: str,
input_data: str):
"""Deploy and run workflow on cluster."""
# Copy workflow and data to cluster
print(f"Deploying to cluster: {cluster_name}")
# Sync workflow files
sys.argv = [
'toil-rsync-cluster',
'--recursive',
os.path.dirname(workflow_script),
f'{cluster_name}:/toil/workflows/'
]
rsync_cluster_main()
# Sync input data
sys.argv = [
'toil-rsync-cluster',
'--recursive',
input_data,
f'{cluster_name}:/toil/data/'
]
rsync_cluster_main()
# Execute workflow on cluster
remote_command = f"""
cd /toil &&
toil --batchSystem=mesos \\
--jobStore=aws:us-west-2:results:workflow-run \\
--provisioner=aws \\
--nodeTypes=m5.large,m5.xlarge \\
--maxNodes=50 \\
--stats \\
workflows/{os.path.basename(workflow_script)} \\
data/
"""
# SSH and execute
sys.argv = [
'toil-ssh-cluster',
cluster_name,
remote_command
]
ssh_cluster_main()
def monitor_cluster_resources(self, cluster_name: str):
"""Monitor cluster resource usage."""
monitoring_script = """
# Get node information
kubectl get nodes -o wide
# Check resource usage
kubectl top nodes
# Check running pods
kubectl get pods --all-namespaces
# System resource usage
free -h
df -h
# Toil-specific monitoring
ps aux | grep toil
# Check logs
tail -n 50 /var/log/toil/*.log
"""
sys.argv = [
'toil-ssh-cluster',
cluster_name,
monitoring_script
]
ssh_cluster_main()
def backup_cluster_data(self, cluster_name: str, backup_location: str):
"""Backup important cluster data."""
# Sync results back to local
sys.argv = [
'toil-rsync-cluster',
'--recursive',
f'{cluster_name}:/toil/results/',
os.path.join(backup_location, 'results')
]
rsync_cluster_main()
# Sync logs
sys.argv = [
'toil-rsync-cluster',
'--recursive',
f'{cluster_name}:/var/log/toil/',
os.path.join(backup_location, 'logs')
]
rsync_cluster_main()
# Sync job store (if file-based)
sys.argv = [
'toil-rsync-cluster',
'--recursive',
f'{cluster_name}:/toil/jobstore/',
os.path.join(backup_location, 'jobstore')
]
rsync_cluster_main(){ .api }
Tools for managing Toil configuration files and settings.
from toil.utils.toilConfig import main as config_main
from toil.common import get_default_config_path, ensure_config, generate_config, update_config
def configuration_management():
"""Manage Toil configuration files and settings."""
# Generate default configuration
default_config_path = get_default_config_path()
ensure_config(default_config_path)
# Generate custom configuration
custom_config = "/etc/toil/production.conf"
generate_config(custom_config)
# Update configuration values
update_config(custom_config, "batchSystem", "slurm")
update_config(custom_config, "maxCores", "64")
update_config(custom_config, "defaultMemory", "8G")
# Use config management CLI
sys.argv = [
'toil-config',
'--set', 'jobStore=aws:us-west-2:my-bucket',
'--set', 'batchSystem=kubernetes',
'--set', 'maxNodes=100',
'--config-file', custom_config
]
config_main()
def configuration_templates():
"""Provide configuration templates for different use cases."""
templates = {
'local_development': {
'batchSystem': 'local',
'maxCores': 4,
'maxMemory': '8G',
'jobStore': 'file:/tmp/toil-dev',
'logLevel': 'DEBUG',
'stats': True
},
'hpc_cluster': {
'batchSystem': 'slurm',
'maxCores': 1000,
'maxMemory': '2T',
'jobStore': 'file:/shared/toil-jobs',
'workDir': '/tmp/toil-work',
'logLevel': 'INFO',
'retryCount': 3,
'rescueJobsFrequency': 3600
},
'cloud_production': {
'batchSystem': 'kubernetes',
'provisioner': 'aws',
'jobStore': 'aws:us-west-2:production-bucket:workflows',
'nodeTypes': ['m5.large', 'm5.xlarge', 'c5.2xlarge'],
'maxNodes': 500,
'defaultPreemptible': True,
'preemptibleCompensation': 1.5,
'sseKey': 'alias/toil-production-key',
'clean': 'onSuccess',
'stats': True,
'logLevel': 'INFO'
}
}
def create_config_from_template(template_name: str, output_file: str):
"""Create configuration file from template."""
if template_name not in templates:
raise ValueError(f"Unknown template: {template_name}")
template = templates[template_name]
# Generate base config
generate_config(output_file)
# Apply template values
for key, value in template.items():
update_config(output_file, key, value)
print(f"Configuration created: {output_file}")
print(f"Template used: {template_name}")
return create_config_from_templateThis comprehensive utilities and CLI tools suite provides complete workflow lifecycle management from development through production deployment with extensive monitoring, debugging, and maintenance capabilities.
Install with Tessl CLI
npx tessl i tessl/pypi-toildocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10