0
# Batch System Integration
1
2
## Overview
3
4
Toil's batch system integration provides a unified interface for executing workflows across diverse computing environments. The system abstracts the complexity of different schedulers and execution environments, allowing workflows to run seamlessly on local machines, HPC clusters, cloud platforms, and container orchestration systems. Each batch system implementation handles job submission, monitoring, resource allocation, and cleanup according to the specific requirements of the target environment.
5
6
## Capabilities
7
8
### Abstract Batch System Interface
9
{ .api }
10
11
The `AbstractBatchSystem` provides the core interface that all batch systems implement.
12
13
```python
14
from toil.batchSystems.abstractBatchSystem import (
15
AbstractBatchSystem,
16
AbstractScalableBatchSystem,
17
UpdatedBatchJobInfo,
18
BatchJobExitReason
19
)
20
from toil.job import JobDescription
21
from typing import Optional, Dict, List
22
23
class CustomBatchSystem(AbstractBatchSystem):
24
"""Custom batch system implementation."""
25
26
def issueBatchJob(self, jobNode: JobDescription) -> int:
27
"""Submit job to batch system and return job ID."""
28
# Extract resource requirements
29
memory_mb = jobNode.memory // (1024 * 1024)
30
cores = jobNode.cores
31
disk_mb = jobNode.disk // (1024 * 1024)
32
33
# Submit to underlying scheduler
34
batch_job_id = self.submit_to_scheduler(
35
command=jobNode.command,
36
memory=memory_mb,
37
cores=cores,
38
disk=disk_mb
39
)
40
41
return batch_job_id
42
43
def killBatchJobs(self, jobIDs: List[int]) -> None:
44
"""Terminate specified jobs."""
45
for job_id in jobIDs:
46
self.cancel_job(job_id)
47
48
def getIssuedBatchJobIDs(self) -> List[int]:
49
"""Get list of all submitted job IDs."""
50
return list(self.issued_jobs.keys())
51
52
def getRunningBatchJobIDs(self) -> Dict[int, float]:
53
"""Get running jobs with their runtime in seconds."""
54
running_jobs = {}
55
for job_id in self.issued_jobs:
56
if self.is_job_running(job_id):
57
runtime = self.get_job_runtime(job_id)
58
running_jobs[job_id] = runtime
59
return running_jobs
60
61
def getUpdatedBatchJob(self, maxWait: int) -> Optional[UpdatedBatchJobInfo]:
62
"""Poll for completed job, waiting up to maxWait seconds."""
63
completed_job = self.poll_for_completion(maxWait)
64
65
if completed_job:
66
return UpdatedBatchJobInfo(
67
jobID=completed_job.id,
68
exitReason=BatchJobExitReason.FINISHED,
69
wallTime=completed_job.wall_time,
70
exitCode=completed_job.exit_code
71
)
72
return None
73
74
def getSchedulingStatusMessage(self) -> Optional[str]:
75
"""Get current scheduling status for monitoring."""
76
return f"Jobs queued: {self.get_queued_count()}, Running: {self.get_running_count()}"
77
78
# Scalable batch system for cloud environments
79
class CloudBatchSystem(AbstractScalableBatchSystem):
80
"""Scalable batch system with auto-provisioning."""
81
82
def nodeTypes(self) -> List['NodeInfo']:
83
"""Get available node types for scaling."""
84
from toil.batchSystems import NodeInfo
85
86
return [
87
NodeInfo(
88
cores=4,
89
memory=8 * 1024 * 1024 * 1024, # 8GB
90
disk=100 * 1024 * 1024 * 1024, # 100GB
91
preemptible=True,
92
nodeType="m5.large"
93
),
94
NodeInfo(
95
cores=16,
96
memory=32 * 1024 * 1024 * 1024, # 32GB
97
disk=500 * 1024 * 1024 * 1024, # 500GB
98
preemptible=False,
99
nodeType="m5.4xlarge"
100
)
101
]
102
103
def provisioner(self) -> Optional['AbstractProvisioner']:
104
"""Get associated provisioner for auto-scaling."""
105
return self.cloud_provisioner
106
```
107
108
### Local Batch System
109
{ .api }
110
111
The local batch system executes jobs directly on the local machine using multiprocessing.
112
113
```python
114
from toil.batchSystems.singleMachine import SingleMachineBatchSystem
115
from toil.common import Config
116
117
# Configuration for local execution
118
config = Config()
119
config.batchSystem = "local"
120
config.maxCores = 8 # Maximum cores to use locally
121
config.maxMemory = "16G" # Maximum memory to use
122
config.maxDisk = "100G" # Maximum disk space
123
124
# Local batch system handles resource contention automatically
125
local_batch = SingleMachineBatchSystem(config)
126
127
# Jobs run as separate processes on local machine
128
# Resource limits enforced through process monitoring
129
# Automatic cleanup of failed processes
130
```
131
132
### Slurm Batch System
133
{ .api }
134
135
Integration with Slurm workload manager for HPC cluster execution.
136
137
```python
138
from toil.batchSystems.slurm import SlurmBatchSystem
139
140
# Slurm-specific configuration
141
config = Config()
142
config.batchSystem = "slurm"
143
144
# Slurm partition and account settings
145
config.slurmPartition = "compute" # Slurm partition to use
146
config.slurmAccount = "my_account" # Account for billing
147
config.slurmQoS = "normal" # Quality of service
148
149
# Advanced Slurm options
150
config.slurmArgs = [
151
"--constraint=cpu", # Node constraints
152
"--exclusive", # Exclusive node access
153
"--mail-type=FAIL", # Email on failure
154
"--mail-user=user@example.com" # Email address
155
]
156
157
# GPU allocation in Slurm
158
config.slurmGres = "gpu:2" # Request 2 GPUs per job
159
160
slurm_batch = SlurmBatchSystem(config)
161
162
# Job submission generates Slurm sbatch scripts
163
# Automatic SLURM_JOB_ID tracking
164
# Integration with Slurm accounting and limits
165
```
166
167
### Kubernetes Batch System
168
{ .api }
169
170
Container-based job execution on Kubernetes clusters.
171
172
```python
173
from toil.batchSystems.kubernetes import KubernetesBatchSystem
174
175
# Kubernetes configuration
176
config = Config()
177
config.batchSystem = "kubernetes"
178
179
# Kubernetes namespace and service account
180
config.kubernetesNamespace = "toil-workflows"
181
config.kubernetesServiceAccount = "toil-service"
182
183
# Container configuration
184
config.kubernetesDefaultImage = "ubuntu:20.04"
185
config.kubernetesDockerImage = "my-org/toil-worker:latest"
186
187
# Resource limits and requests
188
config.kubernetesNodeSelector = {"nodeType": "compute"}
189
config.kubernetesTolerationsJson = '[{"key": "dedicated", "operator": "Equal", "value": "toil"}]'
190
191
# Persistent volume configuration
192
config.kubernetesPersistentVolumeSize = "10G"
193
config.kubernetesStorageClass = "fast-ssd"
194
195
k8s_batch = KubernetesBatchSystem(config)
196
197
# Jobs run as Kubernetes Jobs/Pods
198
# Automatic volume mounting for job store access
199
# Integration with Kubernetes RBAC and networking
200
```
201
202
### LSF Batch System
203
{ .api }
204
205
IBM LSF (Load Sharing Facility) integration for enterprise HPC environments.
206
207
```python
208
from toil.batchSystems.lsf import LSFBatchSystem
209
210
# LSF configuration
211
config = Config()
212
config.batchSystem = "lsf"
213
214
# LSF queue and project settings
215
config.lsfQueue = "normal" # LSF queue name
216
config.lsfProject = "research_proj" # Project for accounting
217
218
# Resource specification
219
config.lsfArgs = [
220
"-R", "select[mem>8000]", # Memory requirements
221
"-R", "span[hosts=1]", # Single host allocation
222
"-W", "4:00" # Wall time limit
223
]
224
225
lsf_batch = LSFBatchSystem(config)
226
227
# Job submission using bsub command
228
# LSF job array support for parallel jobs
229
# Integration with LSF resource reservation
230
```
231
232
### AWS Batch System
233
{ .api }
234
235
Native integration with AWS Batch for cloud-native workflow execution.
236
237
```python
238
from toil.batchSystems.awsBatch import AWSBatchSystem
239
240
# AWS Batch configuration
241
config = Config()
242
config.batchSystem = "aws_batch"
243
244
# AWS Batch job queue and definition
245
config.awsBatchJobQueue = "toil-job-queue"
246
config.awsBatchJobDefinition = "toil-worker"
247
248
# AWS region and credentials
249
config.awsRegion = "us-west-2"
250
config.awsCredentials = "~/.aws/credentials"
251
252
# Container and compute environment settings
253
config.awsBatchComputeEnvironment = "toil-compute-env"
254
config.awsBatchDockerImage = "amazonlinux:2"
255
256
aws_batch = AWSBatchSystem(config)
257
258
# Jobs submitted to AWS Batch queues
259
# Automatic EC2 instance provisioning
260
# Integration with AWS IAM and VPC
261
# Support for Spot instances for cost optimization
262
```
263
264
### Mesos Batch System
265
{ .api }
266
267
Apache Mesos integration for distributed computing frameworks.
268
269
```python
270
from toil.batchSystems.mesos.batchSystem import MesosBatchSystem
271
272
# Mesos configuration
273
config = Config()
274
config.batchSystem = "mesos"
275
276
# Mesos master and framework settings
277
config.mesosMaster = "zk://localhost:2181/mesos" # Zookeeper URL
278
config.mesosFrameworkId = "toil-framework" # Framework identifier
279
280
# Resource allocation
281
config.mesosRole = "production" # Mesos role
282
config.mesosCheckpoint = True # Enable checkpointing
283
284
mesos_batch = MesosBatchSystem(config)
285
286
# Framework registration with Mesos master
287
# Dynamic resource allocation and deallocation
288
# Fault tolerance through framework checkpointing
289
```
290
291
### Batch System Monitoring and Status
292
{ .api }
293
294
Comprehensive monitoring and status reporting across all batch systems.
295
296
```python
297
from toil.batchSystems.abstractBatchSystem import BatchJobExitReason
298
import logging
299
300
def monitor_batch_system(batch_system: AbstractBatchSystem):
301
"""Monitor batch system status and job progress."""
302
303
# Get current job status
304
issued_jobs = batch_system.getIssuedBatchJobIDs()
305
running_jobs = batch_system.getRunningBatchJobIDs()
306
307
print(f"Issued jobs: {len(issued_jobs)}")
308
print(f"Running jobs: {len(running_jobs)}")
309
310
# Check for completed jobs
311
while True:
312
updated_job = batch_system.getUpdatedBatchJob(maxWait=10)
313
314
if updated_job is None:
315
continue
316
317
job_id = updated_job.jobID
318
exit_reason = updated_job.exitReason
319
wall_time = updated_job.wallTime
320
exit_code = updated_job.exitCode
321
322
if exit_reason == BatchJobExitReason.FINISHED:
323
if exit_code == 0:
324
print(f"Job {job_id} completed successfully in {wall_time}s")
325
else:
326
print(f"Job {job_id} failed with exit code {exit_code}")
327
328
elif exit_reason == BatchJobExitReason.FAILED:
329
print(f"Job {job_id} failed due to batch system error")
330
331
elif exit_reason == BatchJobExitReason.KILLED:
332
print(f"Job {job_id} was killed")
333
334
elif exit_reason == BatchJobExitReason.ERROR:
335
print(f"Job {job_id} encountered an error")
336
337
# Get scheduling status message
338
status_msg = batch_system.getSchedulingStatusMessage()
339
if status_msg:
340
print(f"Scheduler status: {status_msg}")
341
```
342
343
### Resource Management and Node Information
344
{ .api }
345
346
Advanced resource management and node type specification for scalable batch systems.
347
348
```python
349
from toil.batchSystems import NodeInfo
350
from toil.provisioners.abstractProvisioner import AbstractProvisioner
351
352
class NodeInfo:
353
"""Information about available compute nodes."""
354
355
def __init__(self, cores: int, memory: int, disk: int,
356
preemptible: bool, nodeType: str):
357
self.cores = cores # CPU cores available
358
self.memory = memory # Memory in bytes
359
self.disk = disk # Disk space in bytes
360
self.preemptible = preemptible # Whether node can be preempted
361
self.nodeType = nodeType # Cloud provider node type
362
363
# Define available node types
364
def get_cloud_node_types():
365
"""Define node types for cloud auto-scaling."""
366
return [
367
NodeInfo(
368
cores=2,
369
memory=4 * 1024**3, # 4GB
370
disk=50 * 1024**3, # 50GB
371
preemptible=True,
372
nodeType="t3.small"
373
),
374
NodeInfo(
375
cores=8,
376
memory=32 * 1024**3, # 32GB
377
disk=200 * 1024**3, # 200GB
378
preemptible=False,
379
nodeType="m5.2xlarge"
380
),
381
NodeInfo(
382
cores=32,
383
memory=128 * 1024**3, # 128GB
384
disk=1000 * 1024**3, # 1TB
385
preemptible=False,
386
nodeType="m5.8xlarge"
387
)
388
]
389
390
# Node filtering for job placement
391
def custom_node_filter(batch_system: AbstractScalableBatchSystem):
392
"""Filter nodes based on custom criteria."""
393
394
def node_filtering_func(node_info: dict) -> bool:
395
# Only use nodes with sufficient resources
396
min_memory = 8 * 1024**3 # 8GB minimum
397
min_cores = 4 # 4 cores minimum
398
399
return (node_info.get('memory', 0) >= min_memory and
400
node_info.get('cores', 0) >= min_cores)
401
402
# Apply custom filtering
403
batch_system.nodeFiltering = node_filtering_func
404
```
405
406
### Exception Handling and Error Recovery
407
{ .api }
408
409
Robust error handling and recovery mechanisms for batch system failures.
410
411
```python
412
from toil.batchSystems.abstractBatchSystem import (
413
InsufficientSystemResources,
414
AcquisitionTimeoutException,
415
DeadlockException
416
)
417
418
def handle_batch_system_errors():
419
"""Handle common batch system errors."""
420
421
try:
422
# Submit job to batch system
423
job_id = batch_system.issueBatchJob(job_description)
424
425
except InsufficientSystemResources as e:
426
# Handle resource shortage
427
logging.warning(f"Insufficient resources: {e}")
428
# Reduce resource requirements or wait
429
time.sleep(60)
430
431
except AcquisitionTimeoutException as e:
432
# Handle timeout acquiring resources
433
logging.error(f"Resource acquisition timeout: {e}")
434
# Retry with different parameters
435
436
except DeadlockException as e:
437
# Handle batch system deadlock
438
logging.critical(f"Batch system deadlock: {e}")
439
# May require manual intervention
440
441
def robust_job_submission(batch_system, job_description, max_retries=3):
442
"""Submit job with automatic retry on failure."""
443
444
for attempt in range(max_retries):
445
try:
446
return batch_system.issueBatchJob(job_description)
447
448
except Exception as e:
449
logging.warning(f"Job submission attempt {attempt + 1} failed: {e}")
450
if attempt == max_retries - 1:
451
raise
452
time.sleep(2 ** attempt) # Exponential backoff
453
```
454
455
This batch system integration enables Toil workflows to execute seamlessly across diverse computing environments while maintaining consistent interfaces and robust error handling capabilities.