0
# Workflow Language Integration
1
2
## Overview
3
4
Toil provides native support for CWL (Common Workflow Language) and WDL (Workflow Description Language), enabling seamless execution of standardized workflow specifications. The integration translates workflow descriptions into Toil's internal job graph while preserving the semantic meaning and execution requirements of the original specifications. This allows users to leverage existing workflows and tools while benefiting from Toil's distributed execution capabilities, cloud provisioning, and fault tolerance.
5
6
## Capabilities
7
8
### CWL (Common Workflow Language) Support
9
{ .api }
10
11
Comprehensive support for CWL v1.0+ workflows with full tool and workflow execution capabilities.
12
13
```python
14
from toil.cwl import cwltoil
15
from toil.common import Config, Toil
16
import os
17
18
# CWL workflow execution using command line
19
def run_cwl_workflow_cli():
20
"""Execute CWL workflow using command-line interface."""
21
22
# Basic CWL execution
23
# toil-cwl-runner workflow.cwl inputs.yml
24
25
# With Toil-specific options
26
cmd = [
27
"toil-cwl-runner",
28
"--jobStore", "file:cwl-jobstore",
29
"--batchSystem", "local",
30
"--maxCores", "4",
31
"--maxMemory", "8G",
32
"--logLevel", "INFO",
33
"workflow.cwl",
34
"inputs.yml"
35
]
36
37
# Advanced options
38
advanced_cmd = [
39
"toil-cwl-runner",
40
"--jobStore", "aws:us-west-2:my-bucket:cwl-run",
41
"--batchSystem", "kubernetes",
42
"--provisioner", "aws",
43
"--nodeTypes", "m5.large,m5.xlarge",
44
"--maxNodes", "10",
45
"--defaultPreemptible",
46
"--retryCount", "3",
47
"--cleanWorkDir", "onSuccess",
48
"--outdir", "/results",
49
"--tmp-outdir-prefix", "/tmp/cwl-",
50
"complex-workflow.cwl",
51
"production-inputs.json"
52
]
53
54
# Programmatic CWL execution
55
def run_cwl_workflow_programmatic():
56
"""Execute CWL workflow programmatically."""
57
58
# Load CWL workflow and inputs
59
cwl_file = "analysis-workflow.cwl"
60
inputs_file = "sample-inputs.yml"
61
62
# Configure Toil for CWL execution
63
config = Config()
64
config.jobStore = "file:cwl-analysis-store"
65
config.batchSystem = "local"
66
config.maxCores = 8
67
config.maxMemory = "16G"
68
config.retryCount = 2
69
70
# CWL-specific configuration
71
config.cwl = True
72
config.cwlVersion = "v1.2"
73
config.cwlTmpOutDir = "/tmp/cwl-tmp"
74
config.cwlCachingDir = "/cache/cwl"
75
76
# Execute workflow
77
with Toil(config) as toil:
78
result = cwltoil.main([
79
cwl_file,
80
inputs_file,
81
"--jobStore", config.jobStore,
82
"--batchSystem", config.batchSystem
83
])
84
85
print(f"CWL workflow completed with result: {result}")
86
return result
87
88
def advanced_cwl_features():
89
"""Demonstrate advanced CWL features and configuration."""
90
91
# Docker container support
92
cwl_with_docker = """
93
cwlVersion: v1.2
94
class: CommandLineTool
95
96
requirements:
97
DockerRequirement:
98
dockerPull: ubuntu:20.04
99
ResourceRequirement:
100
coresMin: 2
101
ramMin: 4096
102
tmpdirMin: 1024
103
outdirMin: 1024
104
105
inputs:
106
input_file:
107
type: File
108
inputBinding:
109
position: 1
110
111
outputs:
112
output_file:
113
type: File
114
outputBinding:
115
glob: "output.txt"
116
117
baseCommand: ["bash", "-c"]
118
arguments:
119
- "cat $(inputs.input_file.path) | wc -l > output.txt"
120
"""
121
122
# Execute with Docker support
123
config = Config()
124
config.disableChaining = True # Required for some CWL features
125
config.enableCWLDockerSupport = True
126
config.dockerAppliance = "ubuntu:20.04"
127
128
# Singularity container support (alternative to Docker)
129
config.cwlUseSingularity = True
130
config.singularityArgs = ["--cleanenv", "--containall"]
131
132
# CWL caching for faster reruns
133
config.cwlCaching = True
134
config.cwlCachingDir = "/shared/cwl-cache"
135
136
# Custom resource requirements mapping
137
def map_cwl_resources(cwl_requirements):
138
"""Map CWL resource requirements to Toil resources."""
139
140
toil_resources = {
141
'memory': cwl_requirements.get('ramMin', 1024) * 1024 * 1024, # Convert MB to bytes
142
'cores': cwl_requirements.get('coresMin', 1),
143
'disk': (cwl_requirements.get('tmpdirMin', 1024) +
144
cwl_requirements.get('outdirMin', 1024)) * 1024 * 1024
145
}
146
147
return toil_resources
148
```
149
150
### WDL (Workflow Description Language) Support
151
{ .api }
152
153
Native WDL workflow execution with support for WDL 1.0+ specifications.
154
155
```python
156
from toil.wdl import wdltoil
157
from toil.common import Config, Toil
158
159
# WDL workflow execution using command line
160
def run_wdl_workflow_cli():
161
"""Execute WDL workflow using command-line interface."""
162
163
# Basic WDL execution
164
# toil-wdl-runner workflow.wdl inputs.json
165
166
# With comprehensive options
167
cmd = [
168
"toil-wdl-runner",
169
"--jobStore", "file:wdl-jobstore",
170
"--batchSystem", "slurm",
171
"--maxCores", "16",
172
"--maxMemory", "64G",
173
"--defaultDisk", "10G",
174
"--retryCount", "3",
175
"--logLevel", "DEBUG",
176
"pipeline.wdl",
177
"pipeline_inputs.json"
178
]
179
180
# Cloud execution with auto-scaling
181
cloud_cmd = [
182
"toil-wdl-runner",
183
"--jobStore", "aws:us-east-1:wdl-bucket:run-001",
184
"--batchSystem", "mesos",
185
"--provisioner", "aws",
186
"--nodeTypes", "c5.large:0.50,c5.xlarge:0.75,c5.2xlarge",
187
"--maxNodes", "50",
188
"--defaultPreemptible",
189
"--preemptibleCompensation", "1.5",
190
"large-scale-analysis.wdl",
191
"production-inputs.json"
192
]
193
194
# Programmatic WDL execution
195
def run_wdl_workflow_programmatic():
196
"""Execute WDL workflow programmatically."""
197
198
wdl_file = "genomics-pipeline.wdl"
199
inputs_file = "sample-cohort-inputs.json"
200
201
# Configure for WDL execution
202
config = Config()
203
config.jobStore = "file:genomics-run"
204
config.batchSystem = "kubernetes"
205
config.maxCores = 32
206
config.maxMemory = "128G"
207
config.defaultDisk = "100G"
208
209
# WDL-specific configuration
210
config.wdl = True
211
config.wdlVersion = "1.0"
212
config.wdlCallCaching = True
213
config.wdlCacheDir = "/cache/wdl-calls"
214
215
# Execute WDL workflow
216
with Toil(config) as toil:
217
result = wdltoil.main([
218
wdl_file,
219
inputs_file,
220
"--jobStore", config.jobStore,
221
"--batchSystem", config.batchSystem
222
])
223
224
return result
225
226
def advanced_wdl_features():
227
"""Advanced WDL workflow features and configuration."""
228
229
# WDL workflow with complex features
230
wdl_workflow = """
231
version 1.0
232
233
workflow GenomicsAnalysis {
234
input {
235
File reference_genome
236
Array[File] sample_fastqs
237
String output_prefix
238
Int? cpu_count = 4
239
String? memory_gb = "8G"
240
}
241
242
# Scatter processing over samples
243
scatter (fastq in sample_fastqs) {
244
call AlignReads {
245
input:
246
reference = reference_genome,
247
fastq_file = fastq,
248
cpu = cpu_count,
249
memory = memory_gb
250
}
251
}
252
253
# Merge results
254
call MergeAlignments {
255
input:
256
alignments = AlignReads.output_bam,
257
output_name = output_prefix
258
}
259
260
output {
261
File merged_bam = MergeAlignments.merged_bam
262
Array[File] individual_bams = AlignReads.output_bam
263
}
264
}
265
266
task AlignReads {
267
input {
268
File reference
269
File fastq_file
270
Int cpu = 2
271
String memory = "4G"
272
}
273
274
command <<<
275
bwa mem -t ${cpu} ${reference} ${fastq_file} | samtools sort -o output.bam
276
>>>
277
278
runtime {
279
docker: "biocontainers/bwa:v0.7.17_cv1"
280
cpu: cpu
281
memory: memory
282
disk: "20 GB"
283
}
284
285
output {
286
File output_bam = "output.bam"
287
}
288
}
289
"""
290
291
# Configure advanced WDL features
292
config = Config()
293
294
# Call caching for expensive operations
295
config.wdlCallCaching = True
296
config.wdlCallCachingBackend = "file" # or "database"
297
298
# Docker/container support
299
config.enableWDLDockerSupport = True
300
config.dockerAppliance = "ubuntu:20.04"
301
302
# Resource optimization
303
config.wdlOptimizeResources = True
304
config.wdlResourceProfile = "high-throughput"
305
306
# Localization strategies
307
config.wdlLocalizationStrategy = "copy" # or "symlink", "hardlink"
308
config.wdlTmpDir = "/fast-tmp"
309
310
return config
311
```
312
313
### Workflow Conversion and Translation
314
{ .api }
315
316
Tools for converting between workflow languages and translating to Toil's internal representation.
317
318
```python
319
from toil.cwl.cwltoil import CWLWorkflow
320
from toil.wdl.wdltoil import WDLWorkflow
321
322
def workflow_introspection():
323
"""Inspect and analyze workflow specifications."""
324
325
# Load CWL workflow
326
cwl_workflow = CWLWorkflow.load("analysis.cwl")
327
328
# Inspect CWL structure
329
print("CWL Workflow Analysis:")
330
print(f"Version: {cwl_workflow.version}")
331
print(f"Class: {cwl_workflow.class_}")
332
print(f"Tools: {len(cwl_workflow.steps)}")
333
334
# Analyze resource requirements
335
total_cpu = 0
336
total_memory = 0
337
338
for step in cwl_workflow.steps:
339
if hasattr(step, 'requirements'):
340
for req in step.requirements:
341
if req.class_ == 'ResourceRequirement':
342
total_cpu += req.get('coresMin', 1)
343
total_memory += req.get('ramMin', 1024)
344
345
print(f"Total CPU cores needed: {total_cpu}")
346
print(f"Total memory needed: {total_memory} MB")
347
348
# Load WDL workflow
349
wdl_workflow = WDLWorkflow.load("pipeline.wdl")
350
351
# Inspect WDL structure
352
print("\nWDL Workflow Analysis:")
353
print(f"Version: {wdl_workflow.version}")
354
print(f"Workflow name: {wdl_workflow.name}")
355
print(f"Tasks: {len(wdl_workflow.tasks)}")
356
357
# Analyze WDL tasks
358
for task in wdl_workflow.tasks:
359
runtime = task.runtime
360
print(f"Task {task.name}:")
361
print(f" CPU: {runtime.get('cpu', 'default')}")
362
print(f" Memory: {runtime.get('memory', 'default')}")
363
print(f" Disk: {runtime.get('disk', 'default')}")
364
if 'docker' in runtime:
365
print(f" Docker: {runtime['docker']}")
366
367
def workflow_validation_and_linting():
368
"""Validate workflow specifications for common issues."""
369
370
def validate_cwl_workflow(cwl_file: str):
371
"""Validate CWL workflow specification."""
372
373
errors = []
374
warnings = []
375
376
try:
377
workflow = CWLWorkflow.load(cwl_file)
378
379
# Check for common issues
380
if not hasattr(workflow, 'requirements'):
381
warnings.append("No workflow-level requirements specified")
382
383
for step in workflow.steps:
384
# Check resource requirements
385
if not hasattr(step, 'requirements'):
386
warnings.append(f"Step {step.id} has no resource requirements")
387
388
# Check input/output connections
389
for inp in step.in_:
390
if not inp.source:
391
errors.append(f"Step {step.id} input {inp.id} has no source")
392
393
# Validate tool references
394
if not step.run:
395
errors.append(f"Step {step.id} has no tool reference")
396
397
except Exception as e:
398
errors.append(f"Failed to parse CWL: {str(e)}")
399
400
return {'errors': errors, 'warnings': warnings}
401
402
def validate_wdl_workflow(wdl_file: str):
403
"""Validate WDL workflow specification."""
404
405
errors = []
406
warnings = []
407
408
try:
409
workflow = WDLWorkflow.load(wdl_file)
410
411
# Check WDL syntax and structure
412
if not workflow.version:
413
errors.append("WDL version not specified")
414
415
# Validate task definitions
416
for task in workflow.tasks:
417
if not task.command:
418
errors.append(f"Task {task.name} has no command")
419
420
# Check runtime requirements
421
if not hasattr(task, 'runtime'):
422
warnings.append(f"Task {task.name} has no runtime requirements")
423
424
# Validate output specifications
425
if not task.outputs:
426
warnings.append(f"Task {task.name} has no outputs")
427
428
# Check workflow calls and connections
429
for call in workflow.calls:
430
if call.task_name not in [t.name for t in workflow.tasks]:
431
errors.append(f"Call references undefined task: {call.task_name}")
432
433
except Exception as e:
434
errors.append(f"Failed to parse WDL: {str(e)}")
435
436
return {'errors': errors, 'warnings': warnings}
437
438
# Example usage
439
cwl_result = validate_cwl_workflow("workflow.cwl")
440
wdl_result = validate_wdl_workflow("pipeline.wdl")
441
442
return {'cwl': cwl_result, 'wdl': wdl_result}
443
```
444
445
### Workflow Execution Monitoring
446
{ .api }
447
448
Monitoring and debugging capabilities for workflow language execution.
449
450
```python
451
from toil.cwl.utils import CWLLogger
452
from toil.wdl.utils import WDLLogger
453
import logging
454
455
def setup_workflow_monitoring():
456
"""Setup comprehensive monitoring for workflow execution."""
457
458
# Configure CWL-specific logging
459
cwl_logger = CWLLogger()
460
cwl_logger.setLevel(logging.DEBUG)
461
462
# Add handlers for different log types
463
file_handler = logging.FileHandler('cwl_execution.log')
464
file_handler.setFormatter(
465
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
466
)
467
cwl_logger.addHandler(file_handler)
468
469
# Configure WDL-specific logging
470
wdl_logger = WDLLogger()
471
wdl_logger.setLevel(logging.INFO)
472
473
console_handler = logging.StreamHandler()
474
console_handler.setFormatter(
475
logging.Formatter('WDL: %(levelname)s - %(message)s')
476
)
477
wdl_logger.addHandler(console_handler)
478
479
return cwl_logger, wdl_logger
480
481
def workflow_progress_tracking():
482
"""Track workflow execution progress and performance."""
483
484
class WorkflowTracker:
485
def __init__(self):
486
self.start_time = None
487
self.step_times = {}
488
self.step_status = {}
489
self.resource_usage = {}
490
491
def start_workflow(self):
492
"""Mark workflow start."""
493
import time
494
self.start_time = time.time()
495
print("Workflow execution started")
496
497
def step_started(self, step_id: str):
498
"""Mark step start."""
499
import time
500
self.step_times[step_id] = {'start': time.time()}
501
self.step_status[step_id] = 'running'
502
print(f"Step started: {step_id}")
503
504
def step_completed(self, step_id: str, resources_used: dict = None):
505
"""Mark step completion."""
506
import time
507
end_time = time.time()
508
509
if step_id in self.step_times:
510
self.step_times[step_id]['end'] = end_time
511
duration = end_time - self.step_times[step_id]['start']
512
self.step_times[step_id]['duration'] = duration
513
514
print(f"Step completed: {step_id} ({duration:.2f}s)")
515
516
self.step_status[step_id] = 'completed'
517
518
if resources_used:
519
self.resource_usage[step_id] = resources_used
520
521
def workflow_completed(self):
522
"""Mark workflow completion and report summary."""
523
import time
524
525
if self.start_time:
526
total_time = time.time() - self.start_time
527
print(f"Workflow completed in {total_time:.2f}s")
528
529
# Report step durations
530
print("\nStep execution times:")
531
for step_id, times in self.step_times.items():
532
if 'duration' in times:
533
print(f" {step_id}: {times['duration']:.2f}s")
534
535
# Report resource usage
536
if self.resource_usage:
537
print("\nResource usage summary:")
538
total_cpu_hours = 0
539
total_memory_gb_hours = 0
540
541
for step_id, resources in self.resource_usage.items():
542
duration_hours = self.step_times[step_id].get('duration', 0) / 3600
543
cpu_hours = resources.get('cpu', 0) * duration_hours
544
memory_gb_hours = resources.get('memory_gb', 0) * duration_hours
545
546
total_cpu_hours += cpu_hours
547
total_memory_gb_hours += memory_gb_hours
548
549
print(f" {step_id}: {cpu_hours:.2f} CPU-hours, {memory_gb_hours:.2f} GB-hours")
550
551
print(f"\nTotal: {total_cpu_hours:.2f} CPU-hours, {total_memory_gb_hours:.2f} GB-hours")
552
553
return WorkflowTracker()
554
555
def debug_workflow_execution():
556
"""Debug workflow execution issues."""
557
558
def debug_cwl_step_failure(step_id: str, exit_code: int, stderr_log: str):
559
"""Debug CWL step failure."""
560
561
print(f"CWL Step Failed: {step_id}")
562
print(f"Exit code: {exit_code}")
563
564
# Analyze common failure patterns
565
if exit_code == 127:
566
print("Issue: Command not found")
567
print("Check: Docker image contains required tools")
568
elif exit_code == 137:
569
print("Issue: Process killed (likely OOM)")
570
print("Check: Increase memory requirements")
571
elif exit_code == 139:
572
print("Issue: Segmentation fault")
573
print("Check: Input data format or tool version")
574
575
# Parse stderr for specific errors
576
if "No space left on device" in stderr_log:
577
print("Issue: Insufficient disk space")
578
print("Check: Increase disk requirements or clean temp files")
579
elif "Permission denied" in stderr_log:
580
print("Issue: File permission problems")
581
print("Check: File ownership and Docker volume mounts")
582
583
# Suggest debugging steps
584
print("\nDebugging suggestions:")
585
print("1. Check input file formats and sizes")
586
print("2. Verify resource requirements (CPU, memory, disk)")
587
print("3. Test command manually with sample data")
588
print("4. Check Docker image and tool versions")
589
590
def debug_wdl_task_failure(task_name: str, error_msg: str):
591
"""Debug WDL task failure."""
592
593
print(f"WDL Task Failed: {task_name}")
594
print(f"Error: {error_msg}")
595
596
# Common WDL error patterns
597
if "localization" in error_msg.lower():
598
print("Issue: File localization failure")
599
print("Check: Input file paths and access permissions")
600
elif "runtime" in error_msg.lower():
601
print("Issue: Runtime requirement problem")
602
print("Check: Resource specifications in task runtime block")
603
elif "output" in error_msg.lower():
604
print("Issue: Output file collection failure")
605
print("Check: Output glob patterns and file generation")
606
607
print("\nDebugging steps:")
608
print("1. Verify all input files exist and are accessible")
609
print("2. Check runtime resource specifications")
610
print("3. Validate output glob patterns")
611
print("4. Test task command independently")
612
613
return debug_cwl_step_failure, debug_wdl_task_failure
614
```
615
616
### Cross-Platform Workflow Compatibility
617
{ .api }
618
619
Tools and utilities for ensuring workflow compatibility across different execution environments.
620
621
```python
622
def ensure_workflow_portability():
623
"""Ensure workflows are portable across different environments."""
624
625
def normalize_cwl_for_portability(cwl_workflow):
626
"""Modify CWL workflow for cross-platform compatibility."""
627
628
# Use standard Docker images
629
standard_images = {
630
'ubuntu': 'ubuntu:20.04',
631
'python': 'python:3.9-slim',
632
'r-base': 'r-base:4.1.0',
633
'bioconductor': 'bioconductor/bioconductor_docker:RELEASE_3_13'
634
}
635
636
# Replace custom images with standard ones where possible
637
for step in cwl_workflow.steps:
638
if hasattr(step.run, 'requirements'):
639
for req in step.run.requirements:
640
if req.class_ == 'DockerRequirement':
641
image = req.dockerPull
642
for key, standard_image in standard_images.items():
643
if key in image.lower():
644
req.dockerPull = standard_image
645
break
646
647
# Add software requirements as hints
648
for step in cwl_workflow.steps:
649
if not hasattr(step.run, 'hints'):
650
step.run.hints = []
651
652
# Add software requirements hint
653
software_hint = {
654
'class': 'SoftwareRequirement',
655
'packages': [
656
{'package': 'bash', 'version': ['>=4.0']},
657
{'package': 'coreutils', 'version': ['>=8.0']}
658
]
659
}
660
step.run.hints.append(software_hint)
661
662
return cwl_workflow
663
664
def validate_wdl_portability(wdl_workflow):
665
"""Validate WDL workflow for portability issues."""
666
667
portability_issues = []
668
669
for task in wdl_workflow.tasks:
670
# Check for hardcoded paths
671
command = task.command
672
if '/usr/local' in command or '/opt/' in command:
673
portability_issues.append(
674
f"Task {task.name}: Hardcoded paths in command"
675
)
676
677
# Check Docker image availability
678
runtime = task.runtime
679
if 'docker' in runtime:
680
docker_image = runtime['docker']
681
if 'localhost' in docker_image or 'private-registry' in docker_image:
682
portability_issues.append(
683
f"Task {task.name}: Uses private Docker registry"
684
)
685
686
# Check for platform-specific commands
687
platform_commands = ['sudo', 'yum', 'apt-get', 'brew']
688
for cmd in platform_commands:
689
if cmd in command:
690
portability_issues.append(
691
f"Task {task.name}: Uses platform-specific command '{cmd}'"
692
)
693
694
return portability_issues
695
696
def create_portable_workflow_template():
697
"""Create template for portable workflow development."""
698
699
cwl_template = """
700
cwlVersion: v1.2
701
class: Workflow
702
703
requirements:
704
- class: ScatterFeatureRequirement
705
- class: MultipleInputFeatureRequirement
706
- class: StepInputExpressionRequirement
707
708
hints:
709
- class: ResourceRequirement
710
coresMin: 1
711
ramMin: 1024
712
tmpdirMin: 1024
713
outdirMin: 1024
714
- class: DockerRequirement
715
dockerPull: ubuntu:20.04
716
717
inputs:
718
input_files:
719
type: File[]
720
doc: "Array of input files to process"
721
722
outputs:
723
processed_files:
724
type: File[]
725
outputSource: process_step/output_files
726
727
steps:
728
process_step:
729
run: process_tool.cwl
730
in:
731
inputs: input_files
732
out: [output_files]
733
scatter: inputs
734
"""
735
736
wdl_template = """
737
version 1.0
738
739
workflow PortableWorkflow {
740
input {
741
Array[File] input_files
742
String output_prefix = "processed"
743
}
744
745
scatter (input_file in input_files) {
746
call ProcessFile {
747
input:
748
input_file = input_file,
749
prefix = output_prefix
750
}
751
}
752
753
output {
754
Array[File] processed_files = ProcessFile.output_file
755
}
756
}
757
758
task ProcessFile {
759
input {
760
File input_file
761
String prefix
762
Int cpu = 1
763
String memory = "2 GB"
764
String disk = "10 GB"
765
}
766
767
command <<<
768
# Use standard POSIX commands only
769
basename_file=$(basename ~{input_file})
770
cp ~{input_file} ~{prefix}_${basename_file}
771
>>>
772
773
runtime {
774
docker: "ubuntu:20.04"
775
cpu: cpu
776
memory: memory
777
disks: "local-disk " + disk + " SSD"
778
}
779
780
output {
781
File output_file = "~{prefix}_*"
782
}
783
}
784
"""
785
786
return {'cwl': cwl_template, 'wdl': wdl_template}
787
```
788
789
This workflow language integration provides comprehensive support for executing standardized workflows with full compatibility across different computing environments while leveraging Toil's advanced execution and scaling capabilities.