or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-systems.mdcore-workflow.mdfile-management.mdindex.mdjob-stores.mdprovisioning.mdutilities.mdworkflow-languages.md

workflow-languages.mddocs/

0

# Workflow Language Integration

1

2

## Overview

3

4

Toil provides native support for CWL (Common Workflow Language) and WDL (Workflow Description Language), enabling seamless execution of standardized workflow specifications. The integration translates workflow descriptions into Toil's internal job graph while preserving the semantic meaning and execution requirements of the original specifications. This allows users to leverage existing workflows and tools while benefiting from Toil's distributed execution capabilities, cloud provisioning, and fault tolerance.

5

6

## Capabilities

7

8

### CWL (Common Workflow Language) Support

9

{ .api }

10

11

Comprehensive support for CWL v1.0+ workflows with full tool and workflow execution capabilities.

12

13

```python

14

from toil.cwl import cwltoil

15

from toil.common import Config, Toil

16

import os

17

18

# CWL workflow execution using command line

19

def run_cwl_workflow_cli():

20

"""Execute CWL workflow using command-line interface."""

21

22

# Basic CWL execution

23

# toil-cwl-runner workflow.cwl inputs.yml

24

25

# With Toil-specific options

26

cmd = [

27

"toil-cwl-runner",

28

"--jobStore", "file:cwl-jobstore",

29

"--batchSystem", "local",

30

"--maxCores", "4",

31

"--maxMemory", "8G",

32

"--logLevel", "INFO",

33

"workflow.cwl",

34

"inputs.yml"

35

]

36

37

# Advanced options

38

advanced_cmd = [

39

"toil-cwl-runner",

40

"--jobStore", "aws:us-west-2:my-bucket:cwl-run",

41

"--batchSystem", "kubernetes",

42

"--provisioner", "aws",

43

"--nodeTypes", "m5.large,m5.xlarge",

44

"--maxNodes", "10",

45

"--defaultPreemptible",

46

"--retryCount", "3",

47

"--cleanWorkDir", "onSuccess",

48

"--outdir", "/results",

49

"--tmp-outdir-prefix", "/tmp/cwl-",

50

"complex-workflow.cwl",

51

"production-inputs.json"

52

]

53

54

# Programmatic CWL execution

55

def run_cwl_workflow_programmatic():

56

"""Execute CWL workflow programmatically."""

57

58

# Load CWL workflow and inputs

59

cwl_file = "analysis-workflow.cwl"

60

inputs_file = "sample-inputs.yml"

61

62

# Configure Toil for CWL execution

63

config = Config()

64

config.jobStore = "file:cwl-analysis-store"

65

config.batchSystem = "local"

66

config.maxCores = 8

67

config.maxMemory = "16G"

68

config.retryCount = 2

69

70

# CWL-specific configuration

71

config.cwl = True

72

config.cwlVersion = "v1.2"

73

config.cwlTmpOutDir = "/tmp/cwl-tmp"

74

config.cwlCachingDir = "/cache/cwl"

75

76

# Execute workflow

77

with Toil(config) as toil:

78

result = cwltoil.main([

79

cwl_file,

80

inputs_file,

81

"--jobStore", config.jobStore,

82

"--batchSystem", config.batchSystem

83

])

84

85

print(f"CWL workflow completed with result: {result}")

86

return result

87

88

def advanced_cwl_features():

89

"""Demonstrate advanced CWL features and configuration."""

90

91

# Docker container support

92

cwl_with_docker = """

93

cwlVersion: v1.2

94

class: CommandLineTool

95

96

requirements:

97

DockerRequirement:

98

dockerPull: ubuntu:20.04

99

ResourceRequirement:

100

coresMin: 2

101

ramMin: 4096

102

tmpdirMin: 1024

103

outdirMin: 1024

104

105

inputs:

106

input_file:

107

type: File

108

inputBinding:

109

position: 1

110

111

outputs:

112

output_file:

113

type: File

114

outputBinding:

115

glob: "output.txt"

116

117

baseCommand: ["bash", "-c"]

118

arguments:

119

- "cat $(inputs.input_file.path) | wc -l > output.txt"

120

"""

121

122

# Execute with Docker support

123

config = Config()

124

config.disableChaining = True # Required for some CWL features

125

config.enableCWLDockerSupport = True

126

config.dockerAppliance = "ubuntu:20.04"

127

128

# Singularity container support (alternative to Docker)

129

config.cwlUseSingularity = True

130

config.singularityArgs = ["--cleanenv", "--containall"]

131

132

# CWL caching for faster reruns

133

config.cwlCaching = True

134

config.cwlCachingDir = "/shared/cwl-cache"

135

136

# Custom resource requirements mapping

137

def map_cwl_resources(cwl_requirements):

138

"""Map CWL resource requirements to Toil resources."""

139

140

toil_resources = {

141

'memory': cwl_requirements.get('ramMin', 1024) * 1024 * 1024, # Convert MB to bytes

142

'cores': cwl_requirements.get('coresMin', 1),

143

'disk': (cwl_requirements.get('tmpdirMin', 1024) +

144

cwl_requirements.get('outdirMin', 1024)) * 1024 * 1024

145

}

146

147

return toil_resources

148

```

149

150

### WDL (Workflow Description Language) Support

151

{ .api }

152

153

Native WDL workflow execution with support for WDL 1.0+ specifications.

154

155

```python

156

from toil.wdl import wdltoil

157

from toil.common import Config, Toil

158

159

# WDL workflow execution using command line

160

def run_wdl_workflow_cli():

161

"""Execute WDL workflow using command-line interface."""

162

163

# Basic WDL execution

164

# toil-wdl-runner workflow.wdl inputs.json

165

166

# With comprehensive options

167

cmd = [

168

"toil-wdl-runner",

169

"--jobStore", "file:wdl-jobstore",

170

"--batchSystem", "slurm",

171

"--maxCores", "16",

172

"--maxMemory", "64G",

173

"--defaultDisk", "10G",

174

"--retryCount", "3",

175

"--logLevel", "DEBUG",

176

"pipeline.wdl",

177

"pipeline_inputs.json"

178

]

179

180

# Cloud execution with auto-scaling

181

cloud_cmd = [

182

"toil-wdl-runner",

183

"--jobStore", "aws:us-east-1:wdl-bucket:run-001",

184

"--batchSystem", "mesos",

185

"--provisioner", "aws",

186

"--nodeTypes", "c5.large:0.50,c5.xlarge:0.75,c5.2xlarge",

187

"--maxNodes", "50",

188

"--defaultPreemptible",

189

"--preemptibleCompensation", "1.5",

190

"large-scale-analysis.wdl",

191

"production-inputs.json"

192

]

193

194

# Programmatic WDL execution

195

def run_wdl_workflow_programmatic():

196

"""Execute WDL workflow programmatically."""

197

198

wdl_file = "genomics-pipeline.wdl"

199

inputs_file = "sample-cohort-inputs.json"

200

201

# Configure for WDL execution

202

config = Config()

203

config.jobStore = "file:genomics-run"

204

config.batchSystem = "kubernetes"

205

config.maxCores = 32

206

config.maxMemory = "128G"

207

config.defaultDisk = "100G"

208

209

# WDL-specific configuration

210

config.wdl = True

211

config.wdlVersion = "1.0"

212

config.wdlCallCaching = True

213

config.wdlCacheDir = "/cache/wdl-calls"

214

215

# Execute WDL workflow

216

with Toil(config) as toil:

217

result = wdltoil.main([

218

wdl_file,

219

inputs_file,

220

"--jobStore", config.jobStore,

221

"--batchSystem", config.batchSystem

222

])

223

224

return result

225

226

def advanced_wdl_features():

227

"""Advanced WDL workflow features and configuration."""

228

229

# WDL workflow with complex features

230

wdl_workflow = """

231

version 1.0

232

233

workflow GenomicsAnalysis {

234

input {

235

File reference_genome

236

Array[File] sample_fastqs

237

String output_prefix

238

Int? cpu_count = 4

239

String? memory_gb = "8G"

240

}

241

242

# Scatter processing over samples

243

scatter (fastq in sample_fastqs) {

244

call AlignReads {

245

input:

246

reference = reference_genome,

247

fastq_file = fastq,

248

cpu = cpu_count,

249

memory = memory_gb

250

}

251

}

252

253

# Merge results

254

call MergeAlignments {

255

input:

256

alignments = AlignReads.output_bam,

257

output_name = output_prefix

258

}

259

260

output {

261

File merged_bam = MergeAlignments.merged_bam

262

Array[File] individual_bams = AlignReads.output_bam

263

}

264

}

265

266

task AlignReads {

267

input {

268

File reference

269

File fastq_file

270

Int cpu = 2

271

String memory = "4G"

272

}

273

274

command <<<

275

bwa mem -t ${cpu} ${reference} ${fastq_file} | samtools sort -o output.bam

276

>>>

277

278

runtime {

279

docker: "biocontainers/bwa:v0.7.17_cv1"

280

cpu: cpu

281

memory: memory

282

disk: "20 GB"

283

}

284

285

output {

286

File output_bam = "output.bam"

287

}

288

}

289

"""

290

291

# Configure advanced WDL features

292

config = Config()

293

294

# Call caching for expensive operations

295

config.wdlCallCaching = True

296

config.wdlCallCachingBackend = "file" # or "database"

297

298

# Docker/container support

299

config.enableWDLDockerSupport = True

300

config.dockerAppliance = "ubuntu:20.04"

301

302

# Resource optimization

303

config.wdlOptimizeResources = True

304

config.wdlResourceProfile = "high-throughput"

305

306

# Localization strategies

307

config.wdlLocalizationStrategy = "copy" # or "symlink", "hardlink"

308

config.wdlTmpDir = "/fast-tmp"

309

310

return config

311

```

312

313

### Workflow Conversion and Translation

314

{ .api }

315

316

Tools for converting between workflow languages and translating to Toil's internal representation.

317

318

```python

319

from toil.cwl.cwltoil import CWLWorkflow

320

from toil.wdl.wdltoil import WDLWorkflow

321

322

def workflow_introspection():

323

"""Inspect and analyze workflow specifications."""

324

325

# Load CWL workflow

326

cwl_workflow = CWLWorkflow.load("analysis.cwl")

327

328

# Inspect CWL structure

329

print("CWL Workflow Analysis:")

330

print(f"Version: {cwl_workflow.version}")

331

print(f"Class: {cwl_workflow.class_}")

332

print(f"Tools: {len(cwl_workflow.steps)}")

333

334

# Analyze resource requirements

335

total_cpu = 0

336

total_memory = 0

337

338

for step in cwl_workflow.steps:

339

if hasattr(step, 'requirements'):

340

for req in step.requirements:

341

if req.class_ == 'ResourceRequirement':

342

total_cpu += req.get('coresMin', 1)

343

total_memory += req.get('ramMin', 1024)

344

345

print(f"Total CPU cores needed: {total_cpu}")

346

print(f"Total memory needed: {total_memory} MB")

347

348

# Load WDL workflow

349

wdl_workflow = WDLWorkflow.load("pipeline.wdl")

350

351

# Inspect WDL structure

352

print("\nWDL Workflow Analysis:")

353

print(f"Version: {wdl_workflow.version}")

354

print(f"Workflow name: {wdl_workflow.name}")

355

print(f"Tasks: {len(wdl_workflow.tasks)}")

356

357

# Analyze WDL tasks

358

for task in wdl_workflow.tasks:

359

runtime = task.runtime

360

print(f"Task {task.name}:")

361

print(f" CPU: {runtime.get('cpu', 'default')}")

362

print(f" Memory: {runtime.get('memory', 'default')}")

363

print(f" Disk: {runtime.get('disk', 'default')}")

364

if 'docker' in runtime:

365

print(f" Docker: {runtime['docker']}")

366

367

def workflow_validation_and_linting():

368

"""Validate workflow specifications for common issues."""

369

370

def validate_cwl_workflow(cwl_file: str):

371

"""Validate CWL workflow specification."""

372

373

errors = []

374

warnings = []

375

376

try:

377

workflow = CWLWorkflow.load(cwl_file)

378

379

# Check for common issues

380

if not hasattr(workflow, 'requirements'):

381

warnings.append("No workflow-level requirements specified")

382

383

for step in workflow.steps:

384

# Check resource requirements

385

if not hasattr(step, 'requirements'):

386

warnings.append(f"Step {step.id} has no resource requirements")

387

388

# Check input/output connections

389

for inp in step.in_:

390

if not inp.source:

391

errors.append(f"Step {step.id} input {inp.id} has no source")

392

393

# Validate tool references

394

if not step.run:

395

errors.append(f"Step {step.id} has no tool reference")

396

397

except Exception as e:

398

errors.append(f"Failed to parse CWL: {str(e)}")

399

400

return {'errors': errors, 'warnings': warnings}

401

402

def validate_wdl_workflow(wdl_file: str):

403

"""Validate WDL workflow specification."""

404

405

errors = []

406

warnings = []

407

408

try:

409

workflow = WDLWorkflow.load(wdl_file)

410

411

# Check WDL syntax and structure

412

if not workflow.version:

413

errors.append("WDL version not specified")

414

415

# Validate task definitions

416

for task in workflow.tasks:

417

if not task.command:

418

errors.append(f"Task {task.name} has no command")

419

420

# Check runtime requirements

421

if not hasattr(task, 'runtime'):

422

warnings.append(f"Task {task.name} has no runtime requirements")

423

424

# Validate output specifications

425

if not task.outputs:

426

warnings.append(f"Task {task.name} has no outputs")

427

428

# Check workflow calls and connections

429

for call in workflow.calls:

430

if call.task_name not in [t.name for t in workflow.tasks]:

431

errors.append(f"Call references undefined task: {call.task_name}")

432

433

except Exception as e:

434

errors.append(f"Failed to parse WDL: {str(e)}")

435

436

return {'errors': errors, 'warnings': warnings}

437

438

# Example usage

439

cwl_result = validate_cwl_workflow("workflow.cwl")

440

wdl_result = validate_wdl_workflow("pipeline.wdl")

441

442

return {'cwl': cwl_result, 'wdl': wdl_result}

443

```

444

445

### Workflow Execution Monitoring

446

{ .api }

447

448

Monitoring and debugging capabilities for workflow language execution.

449

450

```python

451

from toil.cwl.utils import CWLLogger

452

from toil.wdl.utils import WDLLogger

453

import logging

454

455

def setup_workflow_monitoring():

456

"""Setup comprehensive monitoring for workflow execution."""

457

458

# Configure CWL-specific logging

459

cwl_logger = CWLLogger()

460

cwl_logger.setLevel(logging.DEBUG)

461

462

# Add handlers for different log types

463

file_handler = logging.FileHandler('cwl_execution.log')

464

file_handler.setFormatter(

465

logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

466

)

467

cwl_logger.addHandler(file_handler)

468

469

# Configure WDL-specific logging

470

wdl_logger = WDLLogger()

471

wdl_logger.setLevel(logging.INFO)

472

473

console_handler = logging.StreamHandler()

474

console_handler.setFormatter(

475

logging.Formatter('WDL: %(levelname)s - %(message)s')

476

)

477

wdl_logger.addHandler(console_handler)

478

479

return cwl_logger, wdl_logger

480

481

def workflow_progress_tracking():

482

"""Track workflow execution progress and performance."""

483

484

class WorkflowTracker:

485

def __init__(self):

486

self.start_time = None

487

self.step_times = {}

488

self.step_status = {}

489

self.resource_usage = {}

490

491

def start_workflow(self):

492

"""Mark workflow start."""

493

import time

494

self.start_time = time.time()

495

print("Workflow execution started")

496

497

def step_started(self, step_id: str):

498

"""Mark step start."""

499

import time

500

self.step_times[step_id] = {'start': time.time()}

501

self.step_status[step_id] = 'running'

502

print(f"Step started: {step_id}")

503

504

def step_completed(self, step_id: str, resources_used: dict = None):

505

"""Mark step completion."""

506

import time

507

end_time = time.time()

508

509

if step_id in self.step_times:

510

self.step_times[step_id]['end'] = end_time

511

duration = end_time - self.step_times[step_id]['start']

512

self.step_times[step_id]['duration'] = duration

513

514

print(f"Step completed: {step_id} ({duration:.2f}s)")

515

516

self.step_status[step_id] = 'completed'

517

518

if resources_used:

519

self.resource_usage[step_id] = resources_used

520

521

def workflow_completed(self):

522

"""Mark workflow completion and report summary."""

523

import time

524

525

if self.start_time:

526

total_time = time.time() - self.start_time

527

print(f"Workflow completed in {total_time:.2f}s")

528

529

# Report step durations

530

print("\nStep execution times:")

531

for step_id, times in self.step_times.items():

532

if 'duration' in times:

533

print(f" {step_id}: {times['duration']:.2f}s")

534

535

# Report resource usage

536

if self.resource_usage:

537

print("\nResource usage summary:")

538

total_cpu_hours = 0

539

total_memory_gb_hours = 0

540

541

for step_id, resources in self.resource_usage.items():

542

duration_hours = self.step_times[step_id].get('duration', 0) / 3600

543

cpu_hours = resources.get('cpu', 0) * duration_hours

544

memory_gb_hours = resources.get('memory_gb', 0) * duration_hours

545

546

total_cpu_hours += cpu_hours

547

total_memory_gb_hours += memory_gb_hours

548

549

print(f" {step_id}: {cpu_hours:.2f} CPU-hours, {memory_gb_hours:.2f} GB-hours")

550

551

print(f"\nTotal: {total_cpu_hours:.2f} CPU-hours, {total_memory_gb_hours:.2f} GB-hours")

552

553

return WorkflowTracker()

554

555

def debug_workflow_execution():

556

"""Debug workflow execution issues."""

557

558

def debug_cwl_step_failure(step_id: str, exit_code: int, stderr_log: str):

559

"""Debug CWL step failure."""

560

561

print(f"CWL Step Failed: {step_id}")

562

print(f"Exit code: {exit_code}")

563

564

# Analyze common failure patterns

565

if exit_code == 127:

566

print("Issue: Command not found")

567

print("Check: Docker image contains required tools")

568

elif exit_code == 137:

569

print("Issue: Process killed (likely OOM)")

570

print("Check: Increase memory requirements")

571

elif exit_code == 139:

572

print("Issue: Segmentation fault")

573

print("Check: Input data format or tool version")

574

575

# Parse stderr for specific errors

576

if "No space left on device" in stderr_log:

577

print("Issue: Insufficient disk space")

578

print("Check: Increase disk requirements or clean temp files")

579

elif "Permission denied" in stderr_log:

580

print("Issue: File permission problems")

581

print("Check: File ownership and Docker volume mounts")

582

583

# Suggest debugging steps

584

print("\nDebugging suggestions:")

585

print("1. Check input file formats and sizes")

586

print("2. Verify resource requirements (CPU, memory, disk)")

587

print("3. Test command manually with sample data")

588

print("4. Check Docker image and tool versions")

589

590

def debug_wdl_task_failure(task_name: str, error_msg: str):

591

"""Debug WDL task failure."""

592

593

print(f"WDL Task Failed: {task_name}")

594

print(f"Error: {error_msg}")

595

596

# Common WDL error patterns

597

if "localization" in error_msg.lower():

598

print("Issue: File localization failure")

599

print("Check: Input file paths and access permissions")

600

elif "runtime" in error_msg.lower():

601

print("Issue: Runtime requirement problem")

602

print("Check: Resource specifications in task runtime block")

603

elif "output" in error_msg.lower():

604

print("Issue: Output file collection failure")

605

print("Check: Output glob patterns and file generation")

606

607

print("\nDebugging steps:")

608

print("1. Verify all input files exist and are accessible")

609

print("2. Check runtime resource specifications")

610

print("3. Validate output glob patterns")

611

print("4. Test task command independently")

612

613

return debug_cwl_step_failure, debug_wdl_task_failure

614

```

615

616

### Cross-Platform Workflow Compatibility

617

{ .api }

618

619

Tools and utilities for ensuring workflow compatibility across different execution environments.

620

621

```python

622

def ensure_workflow_portability():

623

"""Ensure workflows are portable across different environments."""

624

625

def normalize_cwl_for_portability(cwl_workflow):

626

"""Modify CWL workflow for cross-platform compatibility."""

627

628

# Use standard Docker images

629

standard_images = {

630

'ubuntu': 'ubuntu:20.04',

631

'python': 'python:3.9-slim',

632

'r-base': 'r-base:4.1.0',

633

'bioconductor': 'bioconductor/bioconductor_docker:RELEASE_3_13'

634

}

635

636

# Replace custom images with standard ones where possible

637

for step in cwl_workflow.steps:

638

if hasattr(step.run, 'requirements'):

639

for req in step.run.requirements:

640

if req.class_ == 'DockerRequirement':

641

image = req.dockerPull

642

for key, standard_image in standard_images.items():

643

if key in image.lower():

644

req.dockerPull = standard_image

645

break

646

647

# Add software requirements as hints

648

for step in cwl_workflow.steps:

649

if not hasattr(step.run, 'hints'):

650

step.run.hints = []

651

652

# Add software requirements hint

653

software_hint = {

654

'class': 'SoftwareRequirement',

655

'packages': [

656

{'package': 'bash', 'version': ['>=4.0']},

657

{'package': 'coreutils', 'version': ['>=8.0']}

658

]

659

}

660

step.run.hints.append(software_hint)

661

662

return cwl_workflow

663

664

def validate_wdl_portability(wdl_workflow):

665

"""Validate WDL workflow for portability issues."""

666

667

portability_issues = []

668

669

for task in wdl_workflow.tasks:

670

# Check for hardcoded paths

671

command = task.command

672

if '/usr/local' in command or '/opt/' in command:

673

portability_issues.append(

674

f"Task {task.name}: Hardcoded paths in command"

675

)

676

677

# Check Docker image availability

678

runtime = task.runtime

679

if 'docker' in runtime:

680

docker_image = runtime['docker']

681

if 'localhost' in docker_image or 'private-registry' in docker_image:

682

portability_issues.append(

683

f"Task {task.name}: Uses private Docker registry"

684

)

685

686

# Check for platform-specific commands

687

platform_commands = ['sudo', 'yum', 'apt-get', 'brew']

688

for cmd in platform_commands:

689

if cmd in command:

690

portability_issues.append(

691

f"Task {task.name}: Uses platform-specific command '{cmd}'"

692

)

693

694

return portability_issues

695

696

def create_portable_workflow_template():

697

"""Create template for portable workflow development."""

698

699

cwl_template = """

700

cwlVersion: v1.2

701

class: Workflow

702

703

requirements:

704

- class: ScatterFeatureRequirement

705

- class: MultipleInputFeatureRequirement

706

- class: StepInputExpressionRequirement

707

708

hints:

709

- class: ResourceRequirement

710

coresMin: 1

711

ramMin: 1024

712

tmpdirMin: 1024

713

outdirMin: 1024

714

- class: DockerRequirement

715

dockerPull: ubuntu:20.04

716

717

inputs:

718

input_files:

719

type: File[]

720

doc: "Array of input files to process"

721

722

outputs:

723

processed_files:

724

type: File[]

725

outputSource: process_step/output_files

726

727

steps:

728

process_step:

729

run: process_tool.cwl

730

in:

731

inputs: input_files

732

out: [output_files]

733

scatter: inputs

734

"""

735

736

wdl_template = """

737

version 1.0

738

739

workflow PortableWorkflow {

740

input {

741

Array[File] input_files

742

String output_prefix = "processed"

743

}

744

745

scatter (input_file in input_files) {

746

call ProcessFile {

747

input:

748

input_file = input_file,

749

prefix = output_prefix

750

}

751

}

752

753

output {

754

Array[File] processed_files = ProcessFile.output_file

755

}

756

}

757

758

task ProcessFile {

759

input {

760

File input_file

761

String prefix

762

Int cpu = 1

763

String memory = "2 GB"

764

String disk = "10 GB"

765

}

766

767

command <<<

768

# Use standard POSIX commands only

769

basename_file=$(basename ~{input_file})

770

cp ~{input_file} ~{prefix}_${basename_file}

771

>>>

772

773

runtime {

774

docker: "ubuntu:20.04"

775

cpu: cpu

776

memory: memory

777

disks: "local-disk " + disk + " SSD"

778

}

779

780

output {

781

File output_file = "~{prefix}_*"

782

}

783

}

784

"""

785

786

return {'cwl': cwl_template, 'wdl': wdl_template}

787

```

788

789

This workflow language integration provides comprehensive support for executing standardized workflows with full compatibility across different computing environments while leveraging Toil's advanced execution and scaling capabilities.