or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-systems.mdcore-workflow.mdfile-management.mdindex.mdjob-stores.mdprovisioning.mdutilities.mdworkflow-languages.md

batch-systems.mddocs/

0

# Batch System Integration

1

2

## Overview

3

4

Toil's batch system integration provides a unified interface for executing workflows across diverse computing environments. The system abstracts the complexity of different schedulers and execution environments, allowing workflows to run seamlessly on local machines, HPC clusters, cloud platforms, and container orchestration systems. Each batch system implementation handles job submission, monitoring, resource allocation, and cleanup according to the specific requirements of the target environment.

5

6

## Capabilities

7

8

### Abstract Batch System Interface

9

{ .api }

10

11

The `AbstractBatchSystem` provides the core interface that all batch systems implement.

12

13

```python

14

from toil.batchSystems.abstractBatchSystem import (

15

AbstractBatchSystem,

16

AbstractScalableBatchSystem,

17

UpdatedBatchJobInfo,

18

BatchJobExitReason

19

)

20

from toil.job import JobDescription

21

from typing import Optional, Dict, List

22

23

class CustomBatchSystem(AbstractBatchSystem):

24

"""Custom batch system implementation."""

25

26

def issueBatchJob(self, jobNode: JobDescription) -> int:

27

"""Submit job to batch system and return job ID."""

28

# Extract resource requirements

29

memory_mb = jobNode.memory // (1024 * 1024)

30

cores = jobNode.cores

31

disk_mb = jobNode.disk // (1024 * 1024)

32

33

# Submit to underlying scheduler

34

batch_job_id = self.submit_to_scheduler(

35

command=jobNode.command,

36

memory=memory_mb,

37

cores=cores,

38

disk=disk_mb

39

)

40

41

return batch_job_id

42

43

def killBatchJobs(self, jobIDs: List[int]) -> None:

44

"""Terminate specified jobs."""

45

for job_id in jobIDs:

46

self.cancel_job(job_id)

47

48

def getIssuedBatchJobIDs(self) -> List[int]:

49

"""Get list of all submitted job IDs."""

50

return list(self.issued_jobs.keys())

51

52

def getRunningBatchJobIDs(self) -> Dict[int, float]:

53

"""Get running jobs with their runtime in seconds."""

54

running_jobs = {}

55

for job_id in self.issued_jobs:

56

if self.is_job_running(job_id):

57

runtime = self.get_job_runtime(job_id)

58

running_jobs[job_id] = runtime

59

return running_jobs

60

61

def getUpdatedBatchJob(self, maxWait: int) -> Optional[UpdatedBatchJobInfo]:

62

"""Poll for completed job, waiting up to maxWait seconds."""

63

completed_job = self.poll_for_completion(maxWait)

64

65

if completed_job:

66

return UpdatedBatchJobInfo(

67

jobID=completed_job.id,

68

exitReason=BatchJobExitReason.FINISHED,

69

wallTime=completed_job.wall_time,

70

exitCode=completed_job.exit_code

71

)

72

return None

73

74

def getSchedulingStatusMessage(self) -> Optional[str]:

75

"""Get current scheduling status for monitoring."""

76

return f"Jobs queued: {self.get_queued_count()}, Running: {self.get_running_count()}"

77

78

# Scalable batch system for cloud environments

79

class CloudBatchSystem(AbstractScalableBatchSystem):

80

"""Scalable batch system with auto-provisioning."""

81

82

def nodeTypes(self) -> List['NodeInfo']:

83

"""Get available node types for scaling."""

84

from toil.batchSystems import NodeInfo

85

86

return [

87

NodeInfo(

88

cores=4,

89

memory=8 * 1024 * 1024 * 1024, # 8GB

90

disk=100 * 1024 * 1024 * 1024, # 100GB

91

preemptible=True,

92

nodeType="m5.large"

93

),

94

NodeInfo(

95

cores=16,

96

memory=32 * 1024 * 1024 * 1024, # 32GB

97

disk=500 * 1024 * 1024 * 1024, # 500GB

98

preemptible=False,

99

nodeType="m5.4xlarge"

100

)

101

]

102

103

def provisioner(self) -> Optional['AbstractProvisioner']:

104

"""Get associated provisioner for auto-scaling."""

105

return self.cloud_provisioner

106

```

107

108

### Local Batch System

109

{ .api }

110

111

The local batch system executes jobs directly on the local machine using multiprocessing.

112

113

```python

114

from toil.batchSystems.singleMachine import SingleMachineBatchSystem

115

from toil.common import Config

116

117

# Configuration for local execution

118

config = Config()

119

config.batchSystem = "local"

120

config.maxCores = 8 # Maximum cores to use locally

121

config.maxMemory = "16G" # Maximum memory to use

122

config.maxDisk = "100G" # Maximum disk space

123

124

# Local batch system handles resource contention automatically

125

local_batch = SingleMachineBatchSystem(config)

126

127

# Jobs run as separate processes on local machine

128

# Resource limits enforced through process monitoring

129

# Automatic cleanup of failed processes

130

```

131

132

### Slurm Batch System

133

{ .api }

134

135

Integration with Slurm workload manager for HPC cluster execution.

136

137

```python

138

from toil.batchSystems.slurm import SlurmBatchSystem

139

140

# Slurm-specific configuration

141

config = Config()

142

config.batchSystem = "slurm"

143

144

# Slurm partition and account settings

145

config.slurmPartition = "compute" # Slurm partition to use

146

config.slurmAccount = "my_account" # Account for billing

147

config.slurmQoS = "normal" # Quality of service

148

149

# Advanced Slurm options

150

config.slurmArgs = [

151

"--constraint=cpu", # Node constraints

152

"--exclusive", # Exclusive node access

153

"--mail-type=FAIL", # Email on failure

154

"--mail-user=user@example.com" # Email address

155

]

156

157

# GPU allocation in Slurm

158

config.slurmGres = "gpu:2" # Request 2 GPUs per job

159

160

slurm_batch = SlurmBatchSystem(config)

161

162

# Job submission generates Slurm sbatch scripts

163

# Automatic SLURM_JOB_ID tracking

164

# Integration with Slurm accounting and limits

165

```

166

167

### Kubernetes Batch System

168

{ .api }

169

170

Container-based job execution on Kubernetes clusters.

171

172

```python

173

from toil.batchSystems.kubernetes import KubernetesBatchSystem

174

175

# Kubernetes configuration

176

config = Config()

177

config.batchSystem = "kubernetes"

178

179

# Kubernetes namespace and service account

180

config.kubernetesNamespace = "toil-workflows"

181

config.kubernetesServiceAccount = "toil-service"

182

183

# Container configuration

184

config.kubernetesDefaultImage = "ubuntu:20.04"

185

config.kubernetesDockerImage = "my-org/toil-worker:latest"

186

187

# Resource limits and requests

188

config.kubernetesNodeSelector = {"nodeType": "compute"}

189

config.kubernetesTolerationsJson = '[{"key": "dedicated", "operator": "Equal", "value": "toil"}]'

190

191

# Persistent volume configuration

192

config.kubernetesPersistentVolumeSize = "10G"

193

config.kubernetesStorageClass = "fast-ssd"

194

195

k8s_batch = KubernetesBatchSystem(config)

196

197

# Jobs run as Kubernetes Jobs/Pods

198

# Automatic volume mounting for job store access

199

# Integration with Kubernetes RBAC and networking

200

```

201

202

### LSF Batch System

203

{ .api }

204

205

IBM LSF (Load Sharing Facility) integration for enterprise HPC environments.

206

207

```python

208

from toil.batchSystems.lsf import LSFBatchSystem

209

210

# LSF configuration

211

config = Config()

212

config.batchSystem = "lsf"

213

214

# LSF queue and project settings

215

config.lsfQueue = "normal" # LSF queue name

216

config.lsfProject = "research_proj" # Project for accounting

217

218

# Resource specification

219

config.lsfArgs = [

220

"-R", "select[mem>8000]", # Memory requirements

221

"-R", "span[hosts=1]", # Single host allocation

222

"-W", "4:00" # Wall time limit

223

]

224

225

lsf_batch = LSFBatchSystem(config)

226

227

# Job submission using bsub command

228

# LSF job array support for parallel jobs

229

# Integration with LSF resource reservation

230

```

231

232

### AWS Batch System

233

{ .api }

234

235

Native integration with AWS Batch for cloud-native workflow execution.

236

237

```python

238

from toil.batchSystems.awsBatch import AWSBatchSystem

239

240

# AWS Batch configuration

241

config = Config()

242

config.batchSystem = "aws_batch"

243

244

# AWS Batch job queue and definition

245

config.awsBatchJobQueue = "toil-job-queue"

246

config.awsBatchJobDefinition = "toil-worker"

247

248

# AWS region and credentials

249

config.awsRegion = "us-west-2"

250

config.awsCredentials = "~/.aws/credentials"

251

252

# Container and compute environment settings

253

config.awsBatchComputeEnvironment = "toil-compute-env"

254

config.awsBatchDockerImage = "amazonlinux:2"

255

256

aws_batch = AWSBatchSystem(config)

257

258

# Jobs submitted to AWS Batch queues

259

# Automatic EC2 instance provisioning

260

# Integration with AWS IAM and VPC

261

# Support for Spot instances for cost optimization

262

```

263

264

### Mesos Batch System

265

{ .api }

266

267

Apache Mesos integration for distributed computing frameworks.

268

269

```python

270

from toil.batchSystems.mesos.batchSystem import MesosBatchSystem

271

272

# Mesos configuration

273

config = Config()

274

config.batchSystem = "mesos"

275

276

# Mesos master and framework settings

277

config.mesosMaster = "zk://localhost:2181/mesos" # Zookeeper URL

278

config.mesosFrameworkId = "toil-framework" # Framework identifier

279

280

# Resource allocation

281

config.mesosRole = "production" # Mesos role

282

config.mesosCheckpoint = True # Enable checkpointing

283

284

mesos_batch = MesosBatchSystem(config)

285

286

# Framework registration with Mesos master

287

# Dynamic resource allocation and deallocation

288

# Fault tolerance through framework checkpointing

289

```

290

291

### Batch System Monitoring and Status

292

{ .api }

293

294

Comprehensive monitoring and status reporting across all batch systems.

295

296

```python

297

from toil.batchSystems.abstractBatchSystem import BatchJobExitReason

298

import logging

299

300

def monitor_batch_system(batch_system: AbstractBatchSystem):

301

"""Monitor batch system status and job progress."""

302

303

# Get current job status

304

issued_jobs = batch_system.getIssuedBatchJobIDs()

305

running_jobs = batch_system.getRunningBatchJobIDs()

306

307

print(f"Issued jobs: {len(issued_jobs)}")

308

print(f"Running jobs: {len(running_jobs)}")

309

310

# Check for completed jobs

311

while True:

312

updated_job = batch_system.getUpdatedBatchJob(maxWait=10)

313

314

if updated_job is None:

315

continue

316

317

job_id = updated_job.jobID

318

exit_reason = updated_job.exitReason

319

wall_time = updated_job.wallTime

320

exit_code = updated_job.exitCode

321

322

if exit_reason == BatchJobExitReason.FINISHED:

323

if exit_code == 0:

324

print(f"Job {job_id} completed successfully in {wall_time}s")

325

else:

326

print(f"Job {job_id} failed with exit code {exit_code}")

327

328

elif exit_reason == BatchJobExitReason.FAILED:

329

print(f"Job {job_id} failed due to batch system error")

330

331

elif exit_reason == BatchJobExitReason.KILLED:

332

print(f"Job {job_id} was killed")

333

334

elif exit_reason == BatchJobExitReason.ERROR:

335

print(f"Job {job_id} encountered an error")

336

337

# Get scheduling status message

338

status_msg = batch_system.getSchedulingStatusMessage()

339

if status_msg:

340

print(f"Scheduler status: {status_msg}")

341

```

342

343

### Resource Management and Node Information

344

{ .api }

345

346

Advanced resource management and node type specification for scalable batch systems.

347

348

```python

349

from toil.batchSystems import NodeInfo

350

from toil.provisioners.abstractProvisioner import AbstractProvisioner

351

352

class NodeInfo:

353

"""Information about available compute nodes."""

354

355

def __init__(self, cores: int, memory: int, disk: int,

356

preemptible: bool, nodeType: str):

357

self.cores = cores # CPU cores available

358

self.memory = memory # Memory in bytes

359

self.disk = disk # Disk space in bytes

360

self.preemptible = preemptible # Whether node can be preempted

361

self.nodeType = nodeType # Cloud provider node type

362

363

# Define available node types

364

def get_cloud_node_types():

365

"""Define node types for cloud auto-scaling."""

366

return [

367

NodeInfo(

368

cores=2,

369

memory=4 * 1024**3, # 4GB

370

disk=50 * 1024**3, # 50GB

371

preemptible=True,

372

nodeType="t3.small"

373

),

374

NodeInfo(

375

cores=8,

376

memory=32 * 1024**3, # 32GB

377

disk=200 * 1024**3, # 200GB

378

preemptible=False,

379

nodeType="m5.2xlarge"

380

),

381

NodeInfo(

382

cores=32,

383

memory=128 * 1024**3, # 128GB

384

disk=1000 * 1024**3, # 1TB

385

preemptible=False,

386

nodeType="m5.8xlarge"

387

)

388

]

389

390

# Node filtering for job placement

391

def custom_node_filter(batch_system: AbstractScalableBatchSystem):

392

"""Filter nodes based on custom criteria."""

393

394

def node_filtering_func(node_info: dict) -> bool:

395

# Only use nodes with sufficient resources

396

min_memory = 8 * 1024**3 # 8GB minimum

397

min_cores = 4 # 4 cores minimum

398

399

return (node_info.get('memory', 0) >= min_memory and

400

node_info.get('cores', 0) >= min_cores)

401

402

# Apply custom filtering

403

batch_system.nodeFiltering = node_filtering_func

404

```

405

406

### Exception Handling and Error Recovery

407

{ .api }

408

409

Robust error handling and recovery mechanisms for batch system failures.

410

411

```python

412

from toil.batchSystems.abstractBatchSystem import (

413

InsufficientSystemResources,

414

AcquisitionTimeoutException,

415

DeadlockException

416

)

417

418

def handle_batch_system_errors():

419

"""Handle common batch system errors."""

420

421

try:

422

# Submit job to batch system

423

job_id = batch_system.issueBatchJob(job_description)

424

425

except InsufficientSystemResources as e:

426

# Handle resource shortage

427

logging.warning(f"Insufficient resources: {e}")

428

# Reduce resource requirements or wait

429

time.sleep(60)

430

431

except AcquisitionTimeoutException as e:

432

# Handle timeout acquiring resources

433

logging.error(f"Resource acquisition timeout: {e}")

434

# Retry with different parameters

435

436

except DeadlockException as e:

437

# Handle batch system deadlock

438

logging.critical(f"Batch system deadlock: {e}")

439

# May require manual intervention

440

441

def robust_job_submission(batch_system, job_description, max_retries=3):

442

"""Submit job with automatic retry on failure."""

443

444

for attempt in range(max_retries):

445

try:

446

return batch_system.issueBatchJob(job_description)

447

448

except Exception as e:

449

logging.warning(f"Job submission attempt {attempt + 1} failed: {e}")

450

if attempt == max_retries - 1:

451

raise

452

time.sleep(2 ** attempt) # Exponential backoff

453

```

454

455

This batch system integration enables Toil workflows to execute seamlessly across diverse computing environments while maintaining consistent interfaces and robust error handling capabilities.