CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-toil

Pipeline management software for clusters.

Overall
score

67%

Overview
Eval results
Files

batch-systems.mddocs/

Batch System Integration

Overview

Toil's batch system integration provides a unified interface for executing workflows across diverse computing environments. The system abstracts the complexity of different schedulers and execution environments, allowing workflows to run seamlessly on local machines, HPC clusters, cloud platforms, and container orchestration systems. Each batch system implementation handles job submission, monitoring, resource allocation, and cleanup according to the specific requirements of the target environment.

Capabilities

Abstract Batch System Interface

{ .api }

The AbstractBatchSystem provides the core interface that all batch systems implement.

from toil.batchSystems.abstractBatchSystem import (
    AbstractBatchSystem, 
    AbstractScalableBatchSystem,
    UpdatedBatchJobInfo,
    BatchJobExitReason
)
from toil.job import JobDescription
from typing import Optional, Dict, List

class CustomBatchSystem(AbstractBatchSystem):
    """Custom batch system implementation."""
    
    def issueBatchJob(self, jobNode: JobDescription) -> int:
        """Submit job to batch system and return job ID."""
        # Extract resource requirements
        memory_mb = jobNode.memory // (1024 * 1024)
        cores = jobNode.cores  
        disk_mb = jobNode.disk // (1024 * 1024)
        
        # Submit to underlying scheduler
        batch_job_id = self.submit_to_scheduler(
            command=jobNode.command,
            memory=memory_mb,
            cores=cores,
            disk=disk_mb
        )
        
        return batch_job_id
    
    def killBatchJobs(self, jobIDs: List[int]) -> None:
        """Terminate specified jobs."""
        for job_id in jobIDs:
            self.cancel_job(job_id)
    
    def getIssuedBatchJobIDs(self) -> List[int]:
        """Get list of all submitted job IDs."""
        return list(self.issued_jobs.keys())
    
    def getRunningBatchJobIDs(self) -> Dict[int, float]:
        """Get running jobs with their runtime in seconds."""
        running_jobs = {}
        for job_id in self.issued_jobs:
            if self.is_job_running(job_id):
                runtime = self.get_job_runtime(job_id)
                running_jobs[job_id] = runtime
        return running_jobs
    
    def getUpdatedBatchJob(self, maxWait: int) -> Optional[UpdatedBatchJobInfo]:
        """Poll for completed job, waiting up to maxWait seconds."""
        completed_job = self.poll_for_completion(maxWait)
        
        if completed_job:
            return UpdatedBatchJobInfo(
                jobID=completed_job.id,
                exitReason=BatchJobExitReason.FINISHED,
                wallTime=completed_job.wall_time,
                exitCode=completed_job.exit_code
            )
        return None
    
    def getSchedulingStatusMessage(self) -> Optional[str]:
        """Get current scheduling status for monitoring."""
        return f"Jobs queued: {self.get_queued_count()}, Running: {self.get_running_count()}"

# Scalable batch system for cloud environments
class CloudBatchSystem(AbstractScalableBatchSystem):
    """Scalable batch system with auto-provisioning."""
    
    def nodeTypes(self) -> List['NodeInfo']:
        """Get available node types for scaling."""
        from toil.batchSystems import NodeInfo
        
        return [
            NodeInfo(
                cores=4,
                memory=8 * 1024 * 1024 * 1024,  # 8GB
                disk=100 * 1024 * 1024 * 1024,  # 100GB
                preemptible=True,
                nodeType="m5.large"
            ),
            NodeInfo(
                cores=16, 
                memory=32 * 1024 * 1024 * 1024,  # 32GB
                disk=500 * 1024 * 1024 * 1024,   # 500GB
                preemptible=False,
                nodeType="m5.4xlarge"
            )
        ]
    
    def provisioner(self) -> Optional['AbstractProvisioner']:
        """Get associated provisioner for auto-scaling."""
        return self.cloud_provisioner

Local Batch System

{ .api }

The local batch system executes jobs directly on the local machine using multiprocessing.

from toil.batchSystems.singleMachine import SingleMachineBatchSystem
from toil.common import Config

# Configuration for local execution
config = Config()
config.batchSystem = "local"
config.maxCores = 8        # Maximum cores to use locally
config.maxMemory = "16G"   # Maximum memory to use
config.maxDisk = "100G"    # Maximum disk space

# Local batch system handles resource contention automatically
local_batch = SingleMachineBatchSystem(config)

# Jobs run as separate processes on local machine
# Resource limits enforced through process monitoring
# Automatic cleanup of failed processes

Slurm Batch System

{ .api }

Integration with Slurm workload manager for HPC cluster execution.

from toil.batchSystems.slurm import SlurmBatchSystem

# Slurm-specific configuration
config = Config()
config.batchSystem = "slurm"

# Slurm partition and account settings
config.slurmPartition = "compute"      # Slurm partition to use
config.slurmAccount = "my_account"     # Account for billing
config.slurmQoS = "normal"             # Quality of service

# Advanced Slurm options
config.slurmArgs = [
    "--constraint=cpu",                 # Node constraints
    "--exclusive",                      # Exclusive node access
    "--mail-type=FAIL",                # Email on failure
    "--mail-user=user@example.com"     # Email address
]

# GPU allocation in Slurm
config.slurmGres = "gpu:2"             # Request 2 GPUs per job

slurm_batch = SlurmBatchSystem(config)

# Job submission generates Slurm sbatch scripts
# Automatic SLURM_JOB_ID tracking
# Integration with Slurm accounting and limits

Kubernetes Batch System

{ .api }

Container-based job execution on Kubernetes clusters.

from toil.batchSystems.kubernetes import KubernetesBatchSystem

# Kubernetes configuration
config = Config()
config.batchSystem = "kubernetes"

# Kubernetes namespace and service account
config.kubernetesNamespace = "toil-workflows"
config.kubernetesServiceAccount = "toil-service"

# Container configuration
config.kubernetesDefaultImage = "ubuntu:20.04"
config.kubernetesDockerImage = "my-org/toil-worker:latest"

# Resource limits and requests
config.kubernetesNodeSelector = {"nodeType": "compute"}
config.kubernetesTolerationsJson = '[{"key": "dedicated", "operator": "Equal", "value": "toil"}]'

# Persistent volume configuration
config.kubernetesPersistentVolumeSize = "10G"
config.kubernetesStorageClass = "fast-ssd"

k8s_batch = KubernetesBatchSystem(config)

# Jobs run as Kubernetes Jobs/Pods
# Automatic volume mounting for job store access
# Integration with Kubernetes RBAC and networking

LSF Batch System

{ .api }

IBM LSF (Load Sharing Facility) integration for enterprise HPC environments.

from toil.batchSystems.lsf import LSFBatchSystem

# LSF configuration
config = Config()
config.batchSystem = "lsf"

# LSF queue and project settings
config.lsfQueue = "normal"            # LSF queue name
config.lsfProject = "research_proj"   # Project for accounting

# Resource specification
config.lsfArgs = [
    "-R", "select[mem>8000]",         # Memory requirements
    "-R", "span[hosts=1]",            # Single host allocation  
    "-W", "4:00"                      # Wall time limit
]

lsf_batch = LSFBatchSystem(config)

# Job submission using bsub command
# LSF job array support for parallel jobs  
# Integration with LSF resource reservation

AWS Batch System

{ .api }

Native integration with AWS Batch for cloud-native workflow execution.

from toil.batchSystems.awsBatch import AWSBatchSystem

# AWS Batch configuration
config = Config()
config.batchSystem = "aws_batch"

# AWS Batch job queue and definition
config.awsBatchJobQueue = "toil-job-queue"
config.awsBatchJobDefinition = "toil-worker"

# AWS region and credentials
config.awsRegion = "us-west-2" 
config.awsCredentials = "~/.aws/credentials"

# Container and compute environment settings
config.awsBatchComputeEnvironment = "toil-compute-env"
config.awsBatchDockerImage = "amazonlinux:2"

aws_batch = AWSBatchSystem(config)

# Jobs submitted to AWS Batch queues
# Automatic EC2 instance provisioning
# Integration with AWS IAM and VPC
# Support for Spot instances for cost optimization

Mesos Batch System

{ .api }

Apache Mesos integration for distributed computing frameworks.

from toil.batchSystems.mesos.batchSystem import MesosBatchSystem

# Mesos configuration  
config = Config()
config.batchSystem = "mesos"

# Mesos master and framework settings
config.mesosMaster = "zk://localhost:2181/mesos"  # Zookeeper URL
config.mesosFrameworkId = "toil-framework"        # Framework identifier

# Resource allocation
config.mesosRole = "production"                   # Mesos role
config.mesosCheckpoint = True                     # Enable checkpointing

mesos_batch = MesosBatchSystem(config)

# Framework registration with Mesos master
# Dynamic resource allocation and deallocation  
# Fault tolerance through framework checkpointing

Batch System Monitoring and Status

{ .api }

Comprehensive monitoring and status reporting across all batch systems.

from toil.batchSystems.abstractBatchSystem import BatchJobExitReason
import logging

def monitor_batch_system(batch_system: AbstractBatchSystem):
    """Monitor batch system status and job progress."""
    
    # Get current job status
    issued_jobs = batch_system.getIssuedBatchJobIDs()
    running_jobs = batch_system.getRunningBatchJobIDs() 
    
    print(f"Issued jobs: {len(issued_jobs)}")
    print(f"Running jobs: {len(running_jobs)}")
    
    # Check for completed jobs
    while True:
        updated_job = batch_system.getUpdatedBatchJob(maxWait=10)
        
        if updated_job is None:
            continue
            
        job_id = updated_job.jobID
        exit_reason = updated_job.exitReason
        wall_time = updated_job.wallTime
        exit_code = updated_job.exitCode
        
        if exit_reason == BatchJobExitReason.FINISHED:
            if exit_code == 0:
                print(f"Job {job_id} completed successfully in {wall_time}s")
            else:
                print(f"Job {job_id} failed with exit code {exit_code}")
                
        elif exit_reason == BatchJobExitReason.FAILED:
            print(f"Job {job_id} failed due to batch system error")
            
        elif exit_reason == BatchJobExitReason.KILLED:
            print(f"Job {job_id} was killed")
            
        elif exit_reason == BatchJobExitReason.ERROR:
            print(f"Job {job_id} encountered an error")

    # Get scheduling status message
    status_msg = batch_system.getSchedulingStatusMessage()
    if status_msg:
        print(f"Scheduler status: {status_msg}")

Resource Management and Node Information

{ .api }

Advanced resource management and node type specification for scalable batch systems.

from toil.batchSystems import NodeInfo
from toil.provisioners.abstractProvisioner import AbstractProvisioner

class NodeInfo:
    """Information about available compute nodes."""
    
    def __init__(self, cores: int, memory: int, disk: int, 
                 preemptible: bool, nodeType: str):
        self.cores = cores           # CPU cores available
        self.memory = memory         # Memory in bytes
        self.disk = disk            # Disk space in bytes  
        self.preemptible = preemptible  # Whether node can be preempted
        self.nodeType = nodeType    # Cloud provider node type

# Define available node types
def get_cloud_node_types():
    """Define node types for cloud auto-scaling."""
    return [
        NodeInfo(
            cores=2,
            memory=4 * 1024**3,      # 4GB
            disk=50 * 1024**3,       # 50GB
            preemptible=True,
            nodeType="t3.small"
        ),
        NodeInfo(
            cores=8, 
            memory=32 * 1024**3,     # 32GB
            disk=200 * 1024**3,      # 200GB
            preemptible=False,
            nodeType="m5.2xlarge"
        ),
        NodeInfo(
            cores=32,
            memory=128 * 1024**3,    # 128GB 
            disk=1000 * 1024**3,     # 1TB
            preemptible=False,
            nodeType="m5.8xlarge"
        )
    ]

# Node filtering for job placement
def custom_node_filter(batch_system: AbstractScalableBatchSystem):
    """Filter nodes based on custom criteria."""
    
    def node_filtering_func(node_info: dict) -> bool:
        # Only use nodes with sufficient resources
        min_memory = 8 * 1024**3  # 8GB minimum
        min_cores = 4             # 4 cores minimum
        
        return (node_info.get('memory', 0) >= min_memory and 
                node_info.get('cores', 0) >= min_cores)
    
    # Apply custom filtering
    batch_system.nodeFiltering = node_filtering_func

Exception Handling and Error Recovery

{ .api }

Robust error handling and recovery mechanisms for batch system failures.

from toil.batchSystems.abstractBatchSystem import (
    InsufficientSystemResources,
    AcquisitionTimeoutException, 
    DeadlockException
)

def handle_batch_system_errors():
    """Handle common batch system errors."""
    
    try:
        # Submit job to batch system
        job_id = batch_system.issueBatchJob(job_description)
        
    except InsufficientSystemResources as e:
        # Handle resource shortage
        logging.warning(f"Insufficient resources: {e}")
        # Reduce resource requirements or wait
        time.sleep(60)
        
    except AcquisitionTimeoutException as e:
        # Handle timeout acquiring resources
        logging.error(f"Resource acquisition timeout: {e}")
        # Retry with different parameters
        
    except DeadlockException as e:
        # Handle batch system deadlock
        logging.critical(f"Batch system deadlock: {e}")
        # May require manual intervention
        
def robust_job_submission(batch_system, job_description, max_retries=3):
    """Submit job with automatic retry on failure."""
    
    for attempt in range(max_retries):
        try:
            return batch_system.issueBatchJob(job_description)
            
        except Exception as e:
            logging.warning(f"Job submission attempt {attempt + 1} failed: {e}")
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)  # Exponential backoff

This batch system integration enables Toil workflows to execute seamlessly across diverse computing environments while maintaining consistent interfaces and robust error handling capabilities.

Install with Tessl CLI

npx tessl i tessl/pypi-toil

docs

batch-systems.md

core-workflow.md

file-management.md

index.md

job-stores.md

provisioning.md

utilities.md

workflow-languages.md

tile.json