0
# Core Workflow Management
1
2
## Overview
3
4
Toil's core workflow management provides the fundamental building blocks for creating, scheduling, and executing computational pipelines. The system centers around three key concepts: Jobs (units of work), Promises (result handling), and the Workflow Context (execution environment). This enables creation of complex DAG (Directed Acyclic Graph) workflows with sophisticated resource management and error handling.
5
6
## Capabilities
7
8
### Job Definition and Execution
9
{ .api }
10
11
The `Job` class is the fundamental unit of work in Toil workflows. Jobs can be defined as classes or using function decorators.
12
13
```python
14
from toil.job import Job
15
from toil.fileStores import AbstractFileStore
16
17
# Class-based job definition
18
class ProcessingJob(Job):
19
def __init__(self, input_data, memory=None, cores=None, disk=None,
20
accelerators=None, preemptible=True, checkpoint=False,
21
displayName=None):
22
# Resource requirements: memory/disk in bytes, cores as int/float
23
super().__init__(
24
memory=memory or 1024*1024*1024, # 1GB default
25
cores=cores or 1, # 1 core default
26
disk=disk or 1024*1024*1024, # 1GB default
27
accelerators=accelerators or [], # No accelerators default
28
preemptible=preemptible, # Allow preemption
29
checkpoint=checkpoint, # Checkpointing disabled
30
displayName=displayName # Job display name
31
)
32
self.input_data = input_data
33
34
def run(self, fileStore: AbstractFileStore) -> str:
35
"""Execute job logic. Must return serializable result."""
36
fileStore.logToMaster(f"Processing: {self.input_data}")
37
# Perform actual work
38
result = self.input_data.upper()
39
return result
40
41
# Function-based job definition
42
@Job.wrapJobFn
43
def simple_task(job, data, multiplier=2):
44
"""Function automatically wrapped as job with default resources."""
45
job.fileStore.logToMaster(f"Task processing: {data}")
46
return data * multiplier
47
48
# Custom resource function job
49
@Job.wrapJobFn(memory="2G", cores=2, disk="500M")
50
def resource_intensive_task(job, large_dataset):
51
"""Function with explicit resource requirements."""
52
return process_large_data(large_dataset)
53
```
54
55
### Resource Requirements and Accelerators
56
{ .api }
57
58
Toil supports detailed resource specifications including GPU/accelerator requirements.
59
60
```python
61
from toil.job import Job, AcceleratorRequirement, parse_accelerator
62
63
# GPU-enabled job
64
class MLTrainingJob(Job):
65
def __init__(self, model_config):
66
# Define GPU requirements
67
gpu_requirement: AcceleratorRequirement = {
68
"count": 2, # Number of GPUs
69
"kind": "gpu", # Accelerator type
70
"model": "Tesla V100", # Specific GPU model (optional)
71
"brand": "nvidia", # GPU brand (optional)
72
"api": "cuda" # API interface (optional)
73
}
74
75
super().__init__(
76
memory=16*1024*1024*1024, # 16GB RAM
77
cores=8, # 8 CPU cores
78
disk=100*1024*1024*1024, # 100GB disk
79
accelerators=[gpu_requirement],
80
preemptible=False # Don't preempt GPU jobs
81
)
82
self.model_config = model_config
83
84
def run(self, fileStore):
85
# Access GPU resources for training
86
return train_model(self.model_config)
87
88
# Parse accelerator from string specification
89
gpu_spec = parse_accelerator("2:nvidia:tesla_v100:cuda")
90
# Returns: {"count": 2, "kind": "gpu", "brand": "nvidia", "model": "tesla_v100", "api": "cuda"}
91
```
92
93
### Job Scheduling and Dependencies
94
{ .api }
95
96
Toil provides flexible job scheduling patterns including sequential, parallel, and conditional execution.
97
98
```python
99
from toil.job import Job
100
101
class WorkflowController(Job):
102
def run(self, fileStore):
103
# Parallel execution - children run concurrently
104
child1 = DataPreprocessingJob("dataset1")
105
child2 = DataPreprocessingJob("dataset2")
106
child3 = DataPreprocessingJob("dataset3")
107
108
self.addChild(child1)
109
self.addChild(child2)
110
self.addChild(child3)
111
112
# Sequential execution - follow-on runs after children complete
113
merge_job = MergeDataJob()
114
self.addFollowOn(merge_job)
115
116
# Analysis runs after merge completes
117
analysis_job = AnalysisJob()
118
merge_job.addFollowOn(analysis_job)
119
120
# Service job - runs alongside other jobs
121
monitoring_service = MonitoringService()
122
self.addService(monitoring_service)
123
124
return "Workflow initiated"
125
126
# Conditional job execution based on results
127
@Job.wrapJobFn
128
def conditional_processor(job, input_file):
129
# Check input characteristics
130
if needs_preprocessing(input_file):
131
preprocess_job = PreprocessingJob(input_file)
132
job.addChild(preprocess_job)
133
return preprocess_job.rv() # Return preprocessed result
134
else:
135
return input_file # Return original file
136
```
137
138
### Promise-Based Result Handling
139
{ .api }
140
141
Promises enable jobs to reference results from other jobs before they complete execution.
142
143
```python
144
from toil.job import Job, Promise
145
146
class PipelineJob(Job):
147
def run(self, fileStore):
148
# Create processing jobs
149
step1 = ProcessStep1Job("input_data")
150
step2 = ProcessStep2Job()
151
step3 = ProcessStep3Job()
152
153
# Chain jobs using promises
154
self.addChild(step1)
155
156
# step2 will receive result of step1 when it completes
157
step2_with_input = Job.wrapJobFn(step2.run, step1.rv())
158
step1.addFollowOn(step2_with_input)
159
160
# step3 receives results from both step1 and step2
161
step3_with_inputs = Job.wrapJobFn(
162
step3.run,
163
step1.rv(), # Promise for step1 result
164
step2_with_input.rv() # Promise for step2 result
165
)
166
step2_with_input.addFollowOn(step3_with_inputs)
167
168
return step3_with_inputs.rv() # Return final result promise
169
170
# Function using multiple promise results
171
@Job.wrapJobFn
172
def combine_results(job, result1_promise: Promise, result2_promise: Promise, result3_promise: Promise):
173
"""Function receives resolved promise values as arguments."""
174
# Promises are automatically resolved to actual values
175
combined = f"{result1_promise} + {result2_promise} + {result3_promise}"
176
job.fileStore.logToMaster(f"Combined results: {combined}")
177
return combined
178
```
179
180
### Workflow Configuration
181
{ .api }
182
183
The `Config` class provides comprehensive workflow configuration options.
184
185
```python
186
from toil.common import Config
187
from toil.lib.conversions import human2bytes
188
189
# Create and configure workflow
190
config = Config()
191
192
# Job store configuration
193
config.jobStore = "file:/tmp/my-job-store" # Local file store
194
# config.jobStore = "aws:us-west-2:my-toil-bucket" # AWS S3 store
195
196
# Batch system configuration
197
config.batchSystem = "local" # Local execution
198
# config.batchSystem = "slurm" # Slurm cluster
199
# config.batchSystem = "kubernetes" # Kubernetes cluster
200
201
# Resource defaults
202
config.defaultMemory = human2bytes("2G") # Default job memory
203
config.defaultCores = 1 # Default CPU cores
204
config.defaultDisk = human2bytes("1G") # Default disk space
205
206
# Resource limits
207
config.maxCores = 32 # Maximum cores per job
208
config.maxMemory = human2bytes("64G") # Maximum memory per job
209
config.maxDisk = human2bytes("1T") # Maximum disk per job
210
211
# Error handling
212
config.retryCount = 3 # Job retry attempts
213
config.rescueJobsFrequency = 60 # Rescue job check interval
214
215
# Logging and monitoring
216
config.logLevel = "INFO" # Log verbosity
217
config.stats = True # Enable statistics collection
218
219
# Cleanup configuration
220
config.clean = "onSuccess" # Clean on workflow success
221
# config.clean = "always" # Always clean
222
# config.clean = "never" # Never clean
223
224
# Working directory
225
config.workDir = "/tmp/toil-work" # Temporary file location
226
227
# Preemptible job configuration
228
config.preemptibleWorkerTimeout = 1800 # Preemptible timeout (seconds)
229
config.defaultPreemptible = True # Jobs preemptible by default
230
```
231
232
### Workflow Execution Context
233
{ .api }
234
235
The `Toil` context manager handles workflow lifecycle and provides execution environment.
236
237
```python
238
from toil.common import Toil, Config
239
from toil.exceptions import FailedJobsException
240
241
def run_workflow():
242
config = Config()
243
config.jobStore = "file:/tmp/workflow-store"
244
config.batchSystem = "local"
245
246
try:
247
with Toil(config) as toil:
248
# Create root job
249
root_job = MainWorkflowJob("input_parameters")
250
251
# Start fresh workflow
252
if not toil.config.restart:
253
result = toil.start(root_job)
254
print(f"Workflow completed: {result}")
255
else:
256
# Restart failed workflow
257
result = toil.restart()
258
print(f"Workflow restarted: {result}")
259
260
return result
261
262
except FailedJobsException as e:
263
print(f"Workflow failed with {e.numberOfFailedJobs} failed jobs")
264
print(f"Job store: {e.jobStoreLocator}")
265
return None
266
267
# Alternative: manual context management
268
def manual_workflow_execution():
269
config = Config()
270
config.jobStore = "file:/tmp/manual-store"
271
272
toil = Toil(config)
273
try:
274
# Initialize workflow
275
toil.__enter__()
276
277
# Execute workflow
278
root_job = SimpleJob("data")
279
result = toil.start(root_job)
280
281
return result
282
finally:
283
# Cleanup
284
toil.__exit__(None, None, None)
285
```
286
287
### Advanced Job Patterns
288
{ .api }
289
290
Sophisticated job patterns for complex workflow requirements.
291
292
```python
293
from toil.job import Job, PromisedRequirement
294
295
class DynamicResourceJob(Job):
296
"""Job with resource requirements determined at runtime."""
297
298
def __init__(self, size_calculator_promise):
299
# Use promise to determine resources dynamically
300
dynamic_memory = PromisedRequirement(
301
lambda size: size * 1024 * 1024, # Convert MB to bytes
302
size_calculator_promise
303
)
304
305
super().__init__(
306
memory=dynamic_memory,
307
cores=1,
308
disk="1G"
309
)
310
self.size_promise = size_calculator_promise
311
312
def run(self, fileStore):
313
# Access resolved size value
314
actual_size = self.size_promise # Automatically resolved
315
fileStore.logToMaster(f"Processing size: {actual_size}MB")
316
return f"Processed {actual_size}MB of data"
317
318
class ServiceJob(Job):
319
"""Long-running service job."""
320
321
def __init__(self):
322
super().__init__(memory="512M", cores=1, disk="100M")
323
324
def run(self, fileStore):
325
# Start service process
326
service_process = start_monitoring_service()
327
328
# Service runs until workflow completes
329
try:
330
while True:
331
time.sleep(10)
332
if should_stop_service():
333
break
334
finally:
335
service_process.stop()
336
337
return "Service completed"
338
339
# Checkpointing for fault tolerance
340
class CheckpointedJob(Job):
341
def __init__(self, data):
342
super().__init__(
343
memory="1G",
344
cores=1,
345
disk="1G",
346
checkpoint=True # Enable checkpointing
347
)
348
self.data = data
349
350
def run(self, fileStore):
351
# Job can be restarted from checkpoint on failure
352
checkpoint_file = fileStore.getLocalTempFile()
353
354
# Save intermediate state
355
with open(checkpoint_file, 'w') as f:
356
json.dump(self.data, f)
357
358
# Long-running computation
359
result = expensive_computation(self.data)
360
361
return result
362
```
363
364
### Error Handling and Debugging
365
{ .api }
366
367
Comprehensive error handling and debugging capabilities for workflow development.
368
369
```python
370
from toil.job import Job, JobException
371
from toil.exceptions import FailedJobsException
372
373
class RobustJob(Job):
374
"""Job with comprehensive error handling."""
375
376
def run(self, fileStore):
377
try:
378
# Potentially failing operation
379
result = risky_operation()
380
381
except ValueError as e:
382
# Log error and continue
383
fileStore.logToMaster(f"Handled ValueError: {e}", level=logging.WARNING)
384
result = default_result()
385
386
except Exception as e:
387
# Fatal error - job will be retried
388
fileStore.logToMaster(f"Job failed: {e}", level=logging.ERROR)
389
raise JobException(f"Unrecoverable error: {e}")
390
391
return result
392
393
# Workflow-level error handling
394
def resilient_workflow():
395
config = Config()
396
config.retryCount = 5 # Retry failed jobs 5 times
397
398
try:
399
with Toil(config) as toil:
400
root_job = MainJob("input")
401
result = toil.start(root_job)
402
403
except FailedJobsException as e:
404
# Handle workflow failure
405
print(f"Workflow failed: {e.numberOfFailedJobs} jobs failed")
406
407
# Optionally restart from failure point
408
config.restart = True
409
with Toil(config) as toil:
410
result = toil.restart()
411
412
return result
413
```
414
415
This core workflow management system provides the foundation for building scalable, fault-tolerant computational pipelines with sophisticated resource management and flexible job scheduling patterns.