Pipeline management software for clusters.
Overall
score
67%
Toil's batch system integration provides a unified interface for executing workflows across diverse computing environments. The system abstracts the complexity of different schedulers and execution environments, allowing workflows to run seamlessly on local machines, HPC clusters, cloud platforms, and container orchestration systems. Each batch system implementation handles job submission, monitoring, resource allocation, and cleanup according to the specific requirements of the target environment.
{ .api }
The AbstractBatchSystem provides the core interface that all batch systems implement.
from toil.batchSystems.abstractBatchSystem import (
AbstractBatchSystem,
AbstractScalableBatchSystem,
UpdatedBatchJobInfo,
BatchJobExitReason
)
from toil.job import JobDescription
from typing import Optional, Dict, List
class CustomBatchSystem(AbstractBatchSystem):
"""Custom batch system implementation."""
def issueBatchJob(self, jobNode: JobDescription) -> int:
"""Submit job to batch system and return job ID."""
# Extract resource requirements
memory_mb = jobNode.memory // (1024 * 1024)
cores = jobNode.cores
disk_mb = jobNode.disk // (1024 * 1024)
# Submit to underlying scheduler
batch_job_id = self.submit_to_scheduler(
command=jobNode.command,
memory=memory_mb,
cores=cores,
disk=disk_mb
)
return batch_job_id
def killBatchJobs(self, jobIDs: List[int]) -> None:
"""Terminate specified jobs."""
for job_id in jobIDs:
self.cancel_job(job_id)
def getIssuedBatchJobIDs(self) -> List[int]:
"""Get list of all submitted job IDs."""
return list(self.issued_jobs.keys())
def getRunningBatchJobIDs(self) -> Dict[int, float]:
"""Get running jobs with their runtime in seconds."""
running_jobs = {}
for job_id in self.issued_jobs:
if self.is_job_running(job_id):
runtime = self.get_job_runtime(job_id)
running_jobs[job_id] = runtime
return running_jobs
def getUpdatedBatchJob(self, maxWait: int) -> Optional[UpdatedBatchJobInfo]:
"""Poll for completed job, waiting up to maxWait seconds."""
completed_job = self.poll_for_completion(maxWait)
if completed_job:
return UpdatedBatchJobInfo(
jobID=completed_job.id,
exitReason=BatchJobExitReason.FINISHED,
wallTime=completed_job.wall_time,
exitCode=completed_job.exit_code
)
return None
def getSchedulingStatusMessage(self) -> Optional[str]:
"""Get current scheduling status for monitoring."""
return f"Jobs queued: {self.get_queued_count()}, Running: {self.get_running_count()}"
# Scalable batch system for cloud environments
class CloudBatchSystem(AbstractScalableBatchSystem):
"""Scalable batch system with auto-provisioning."""
def nodeTypes(self) -> List['NodeInfo']:
"""Get available node types for scaling."""
from toil.batchSystems import NodeInfo
return [
NodeInfo(
cores=4,
memory=8 * 1024 * 1024 * 1024, # 8GB
disk=100 * 1024 * 1024 * 1024, # 100GB
preemptible=True,
nodeType="m5.large"
),
NodeInfo(
cores=16,
memory=32 * 1024 * 1024 * 1024, # 32GB
disk=500 * 1024 * 1024 * 1024, # 500GB
preemptible=False,
nodeType="m5.4xlarge"
)
]
def provisioner(self) -> Optional['AbstractProvisioner']:
"""Get associated provisioner for auto-scaling."""
return self.cloud_provisioner{ .api }
The local batch system executes jobs directly on the local machine using multiprocessing.
from toil.batchSystems.singleMachine import SingleMachineBatchSystem
from toil.common import Config
# Configuration for local execution
config = Config()
config.batchSystem = "local"
config.maxCores = 8 # Maximum cores to use locally
config.maxMemory = "16G" # Maximum memory to use
config.maxDisk = "100G" # Maximum disk space
# Local batch system handles resource contention automatically
local_batch = SingleMachineBatchSystem(config)
# Jobs run as separate processes on local machine
# Resource limits enforced through process monitoring
# Automatic cleanup of failed processes{ .api }
Integration with Slurm workload manager for HPC cluster execution.
from toil.batchSystems.slurm import SlurmBatchSystem
# Slurm-specific configuration
config = Config()
config.batchSystem = "slurm"
# Slurm partition and account settings
config.slurmPartition = "compute" # Slurm partition to use
config.slurmAccount = "my_account" # Account for billing
config.slurmQoS = "normal" # Quality of service
# Advanced Slurm options
config.slurmArgs = [
"--constraint=cpu", # Node constraints
"--exclusive", # Exclusive node access
"--mail-type=FAIL", # Email on failure
"--mail-user=user@example.com" # Email address
]
# GPU allocation in Slurm
config.slurmGres = "gpu:2" # Request 2 GPUs per job
slurm_batch = SlurmBatchSystem(config)
# Job submission generates Slurm sbatch scripts
# Automatic SLURM_JOB_ID tracking
# Integration with Slurm accounting and limits{ .api }
Container-based job execution on Kubernetes clusters.
from toil.batchSystems.kubernetes import KubernetesBatchSystem
# Kubernetes configuration
config = Config()
config.batchSystem = "kubernetes"
# Kubernetes namespace and service account
config.kubernetesNamespace = "toil-workflows"
config.kubernetesServiceAccount = "toil-service"
# Container configuration
config.kubernetesDefaultImage = "ubuntu:20.04"
config.kubernetesDockerImage = "my-org/toil-worker:latest"
# Resource limits and requests
config.kubernetesNodeSelector = {"nodeType": "compute"}
config.kubernetesTolerationsJson = '[{"key": "dedicated", "operator": "Equal", "value": "toil"}]'
# Persistent volume configuration
config.kubernetesPersistentVolumeSize = "10G"
config.kubernetesStorageClass = "fast-ssd"
k8s_batch = KubernetesBatchSystem(config)
# Jobs run as Kubernetes Jobs/Pods
# Automatic volume mounting for job store access
# Integration with Kubernetes RBAC and networking{ .api }
IBM LSF (Load Sharing Facility) integration for enterprise HPC environments.
from toil.batchSystems.lsf import LSFBatchSystem
# LSF configuration
config = Config()
config.batchSystem = "lsf"
# LSF queue and project settings
config.lsfQueue = "normal" # LSF queue name
config.lsfProject = "research_proj" # Project for accounting
# Resource specification
config.lsfArgs = [
"-R", "select[mem>8000]", # Memory requirements
"-R", "span[hosts=1]", # Single host allocation
"-W", "4:00" # Wall time limit
]
lsf_batch = LSFBatchSystem(config)
# Job submission using bsub command
# LSF job array support for parallel jobs
# Integration with LSF resource reservation{ .api }
Native integration with AWS Batch for cloud-native workflow execution.
from toil.batchSystems.awsBatch import AWSBatchSystem
# AWS Batch configuration
config = Config()
config.batchSystem = "aws_batch"
# AWS Batch job queue and definition
config.awsBatchJobQueue = "toil-job-queue"
config.awsBatchJobDefinition = "toil-worker"
# AWS region and credentials
config.awsRegion = "us-west-2"
config.awsCredentials = "~/.aws/credentials"
# Container and compute environment settings
config.awsBatchComputeEnvironment = "toil-compute-env"
config.awsBatchDockerImage = "amazonlinux:2"
aws_batch = AWSBatchSystem(config)
# Jobs submitted to AWS Batch queues
# Automatic EC2 instance provisioning
# Integration with AWS IAM and VPC
# Support for Spot instances for cost optimization{ .api }
Apache Mesos integration for distributed computing frameworks.
from toil.batchSystems.mesos.batchSystem import MesosBatchSystem
# Mesos configuration
config = Config()
config.batchSystem = "mesos"
# Mesos master and framework settings
config.mesosMaster = "zk://localhost:2181/mesos" # Zookeeper URL
config.mesosFrameworkId = "toil-framework" # Framework identifier
# Resource allocation
config.mesosRole = "production" # Mesos role
config.mesosCheckpoint = True # Enable checkpointing
mesos_batch = MesosBatchSystem(config)
# Framework registration with Mesos master
# Dynamic resource allocation and deallocation
# Fault tolerance through framework checkpointing{ .api }
Comprehensive monitoring and status reporting across all batch systems.
from toil.batchSystems.abstractBatchSystem import BatchJobExitReason
import logging
def monitor_batch_system(batch_system: AbstractBatchSystem):
"""Monitor batch system status and job progress."""
# Get current job status
issued_jobs = batch_system.getIssuedBatchJobIDs()
running_jobs = batch_system.getRunningBatchJobIDs()
print(f"Issued jobs: {len(issued_jobs)}")
print(f"Running jobs: {len(running_jobs)}")
# Check for completed jobs
while True:
updated_job = batch_system.getUpdatedBatchJob(maxWait=10)
if updated_job is None:
continue
job_id = updated_job.jobID
exit_reason = updated_job.exitReason
wall_time = updated_job.wallTime
exit_code = updated_job.exitCode
if exit_reason == BatchJobExitReason.FINISHED:
if exit_code == 0:
print(f"Job {job_id} completed successfully in {wall_time}s")
else:
print(f"Job {job_id} failed with exit code {exit_code}")
elif exit_reason == BatchJobExitReason.FAILED:
print(f"Job {job_id} failed due to batch system error")
elif exit_reason == BatchJobExitReason.KILLED:
print(f"Job {job_id} was killed")
elif exit_reason == BatchJobExitReason.ERROR:
print(f"Job {job_id} encountered an error")
# Get scheduling status message
status_msg = batch_system.getSchedulingStatusMessage()
if status_msg:
print(f"Scheduler status: {status_msg}"){ .api }
Advanced resource management and node type specification for scalable batch systems.
from toil.batchSystems import NodeInfo
from toil.provisioners.abstractProvisioner import AbstractProvisioner
class NodeInfo:
"""Information about available compute nodes."""
def __init__(self, cores: int, memory: int, disk: int,
preemptible: bool, nodeType: str):
self.cores = cores # CPU cores available
self.memory = memory # Memory in bytes
self.disk = disk # Disk space in bytes
self.preemptible = preemptible # Whether node can be preempted
self.nodeType = nodeType # Cloud provider node type
# Define available node types
def get_cloud_node_types():
"""Define node types for cloud auto-scaling."""
return [
NodeInfo(
cores=2,
memory=4 * 1024**3, # 4GB
disk=50 * 1024**3, # 50GB
preemptible=True,
nodeType="t3.small"
),
NodeInfo(
cores=8,
memory=32 * 1024**3, # 32GB
disk=200 * 1024**3, # 200GB
preemptible=False,
nodeType="m5.2xlarge"
),
NodeInfo(
cores=32,
memory=128 * 1024**3, # 128GB
disk=1000 * 1024**3, # 1TB
preemptible=False,
nodeType="m5.8xlarge"
)
]
# Node filtering for job placement
def custom_node_filter(batch_system: AbstractScalableBatchSystem):
"""Filter nodes based on custom criteria."""
def node_filtering_func(node_info: dict) -> bool:
# Only use nodes with sufficient resources
min_memory = 8 * 1024**3 # 8GB minimum
min_cores = 4 # 4 cores minimum
return (node_info.get('memory', 0) >= min_memory and
node_info.get('cores', 0) >= min_cores)
# Apply custom filtering
batch_system.nodeFiltering = node_filtering_func{ .api }
Robust error handling and recovery mechanisms for batch system failures.
from toil.batchSystems.abstractBatchSystem import (
InsufficientSystemResources,
AcquisitionTimeoutException,
DeadlockException
)
def handle_batch_system_errors():
"""Handle common batch system errors."""
try:
# Submit job to batch system
job_id = batch_system.issueBatchJob(job_description)
except InsufficientSystemResources as e:
# Handle resource shortage
logging.warning(f"Insufficient resources: {e}")
# Reduce resource requirements or wait
time.sleep(60)
except AcquisitionTimeoutException as e:
# Handle timeout acquiring resources
logging.error(f"Resource acquisition timeout: {e}")
# Retry with different parameters
except DeadlockException as e:
# Handle batch system deadlock
logging.critical(f"Batch system deadlock: {e}")
# May require manual intervention
def robust_job_submission(batch_system, job_description, max_retries=3):
"""Submit job with automatic retry on failure."""
for attempt in range(max_retries):
try:
return batch_system.issueBatchJob(job_description)
except Exception as e:
logging.warning(f"Job submission attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoffThis batch system integration enables Toil workflows to execute seamlessly across diverse computing environments while maintaining consistent interfaces and robust error handling capabilities.
Install with Tessl CLI
npx tessl i tessl/pypi-toildocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10