or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-systems.mdcore-workflow.mdfile-management.mdindex.mdjob-stores.mdprovisioning.mdutilities.mdworkflow-languages.md

core-workflow.mddocs/

0

# Core Workflow Management

1

2

## Overview

3

4

Toil's core workflow management provides the fundamental building blocks for creating, scheduling, and executing computational pipelines. The system centers around three key concepts: Jobs (units of work), Promises (result handling), and the Workflow Context (execution environment). This enables creation of complex DAG (Directed Acyclic Graph) workflows with sophisticated resource management and error handling.

5

6

## Capabilities

7

8

### Job Definition and Execution

9

{ .api }

10

11

The `Job` class is the fundamental unit of work in Toil workflows. Jobs can be defined as classes or using function decorators.

12

13

```python

14

from toil.job import Job

15

from toil.fileStores import AbstractFileStore

16

17

# Class-based job definition

18

class ProcessingJob(Job):

19

def __init__(self, input_data, memory=None, cores=None, disk=None,

20

accelerators=None, preemptible=True, checkpoint=False,

21

displayName=None):

22

# Resource requirements: memory/disk in bytes, cores as int/float

23

super().__init__(

24

memory=memory or 1024*1024*1024, # 1GB default

25

cores=cores or 1, # 1 core default

26

disk=disk or 1024*1024*1024, # 1GB default

27

accelerators=accelerators or [], # No accelerators default

28

preemptible=preemptible, # Allow preemption

29

checkpoint=checkpoint, # Checkpointing disabled

30

displayName=displayName # Job display name

31

)

32

self.input_data = input_data

33

34

def run(self, fileStore: AbstractFileStore) -> str:

35

"""Execute job logic. Must return serializable result."""

36

fileStore.logToMaster(f"Processing: {self.input_data}")

37

# Perform actual work

38

result = self.input_data.upper()

39

return result

40

41

# Function-based job definition

42

@Job.wrapJobFn

43

def simple_task(job, data, multiplier=2):

44

"""Function automatically wrapped as job with default resources."""

45

job.fileStore.logToMaster(f"Task processing: {data}")

46

return data * multiplier

47

48

# Custom resource function job

49

@Job.wrapJobFn(memory="2G", cores=2, disk="500M")

50

def resource_intensive_task(job, large_dataset):

51

"""Function with explicit resource requirements."""

52

return process_large_data(large_dataset)

53

```

54

55

### Resource Requirements and Accelerators

56

{ .api }

57

58

Toil supports detailed resource specifications including GPU/accelerator requirements.

59

60

```python

61

from toil.job import Job, AcceleratorRequirement, parse_accelerator

62

63

# GPU-enabled job

64

class MLTrainingJob(Job):

65

def __init__(self, model_config):

66

# Define GPU requirements

67

gpu_requirement: AcceleratorRequirement = {

68

"count": 2, # Number of GPUs

69

"kind": "gpu", # Accelerator type

70

"model": "Tesla V100", # Specific GPU model (optional)

71

"brand": "nvidia", # GPU brand (optional)

72

"api": "cuda" # API interface (optional)

73

}

74

75

super().__init__(

76

memory=16*1024*1024*1024, # 16GB RAM

77

cores=8, # 8 CPU cores

78

disk=100*1024*1024*1024, # 100GB disk

79

accelerators=[gpu_requirement],

80

preemptible=False # Don't preempt GPU jobs

81

)

82

self.model_config = model_config

83

84

def run(self, fileStore):

85

# Access GPU resources for training

86

return train_model(self.model_config)

87

88

# Parse accelerator from string specification

89

gpu_spec = parse_accelerator("2:nvidia:tesla_v100:cuda")

90

# Returns: {"count": 2, "kind": "gpu", "brand": "nvidia", "model": "tesla_v100", "api": "cuda"}

91

```

92

93

### Job Scheduling and Dependencies

94

{ .api }

95

96

Toil provides flexible job scheduling patterns including sequential, parallel, and conditional execution.

97

98

```python

99

from toil.job import Job

100

101

class WorkflowController(Job):

102

def run(self, fileStore):

103

# Parallel execution - children run concurrently

104

child1 = DataPreprocessingJob("dataset1")

105

child2 = DataPreprocessingJob("dataset2")

106

child3 = DataPreprocessingJob("dataset3")

107

108

self.addChild(child1)

109

self.addChild(child2)

110

self.addChild(child3)

111

112

# Sequential execution - follow-on runs after children complete

113

merge_job = MergeDataJob()

114

self.addFollowOn(merge_job)

115

116

# Analysis runs after merge completes

117

analysis_job = AnalysisJob()

118

merge_job.addFollowOn(analysis_job)

119

120

# Service job - runs alongside other jobs

121

monitoring_service = MonitoringService()

122

self.addService(monitoring_service)

123

124

return "Workflow initiated"

125

126

# Conditional job execution based on results

127

@Job.wrapJobFn

128

def conditional_processor(job, input_file):

129

# Check input characteristics

130

if needs_preprocessing(input_file):

131

preprocess_job = PreprocessingJob(input_file)

132

job.addChild(preprocess_job)

133

return preprocess_job.rv() # Return preprocessed result

134

else:

135

return input_file # Return original file

136

```

137

138

### Promise-Based Result Handling

139

{ .api }

140

141

Promises enable jobs to reference results from other jobs before they complete execution.

142

143

```python

144

from toil.job import Job, Promise

145

146

class PipelineJob(Job):

147

def run(self, fileStore):

148

# Create processing jobs

149

step1 = ProcessStep1Job("input_data")

150

step2 = ProcessStep2Job()

151

step3 = ProcessStep3Job()

152

153

# Chain jobs using promises

154

self.addChild(step1)

155

156

# step2 will receive result of step1 when it completes

157

step2_with_input = Job.wrapJobFn(step2.run, step1.rv())

158

step1.addFollowOn(step2_with_input)

159

160

# step3 receives results from both step1 and step2

161

step3_with_inputs = Job.wrapJobFn(

162

step3.run,

163

step1.rv(), # Promise for step1 result

164

step2_with_input.rv() # Promise for step2 result

165

)

166

step2_with_input.addFollowOn(step3_with_inputs)

167

168

return step3_with_inputs.rv() # Return final result promise

169

170

# Function using multiple promise results

171

@Job.wrapJobFn

172

def combine_results(job, result1_promise: Promise, result2_promise: Promise, result3_promise: Promise):

173

"""Function receives resolved promise values as arguments."""

174

# Promises are automatically resolved to actual values

175

combined = f"{result1_promise} + {result2_promise} + {result3_promise}"

176

job.fileStore.logToMaster(f"Combined results: {combined}")

177

return combined

178

```

179

180

### Workflow Configuration

181

{ .api }

182

183

The `Config` class provides comprehensive workflow configuration options.

184

185

```python

186

from toil.common import Config

187

from toil.lib.conversions import human2bytes

188

189

# Create and configure workflow

190

config = Config()

191

192

# Job store configuration

193

config.jobStore = "file:/tmp/my-job-store" # Local file store

194

# config.jobStore = "aws:us-west-2:my-toil-bucket" # AWS S3 store

195

196

# Batch system configuration

197

config.batchSystem = "local" # Local execution

198

# config.batchSystem = "slurm" # Slurm cluster

199

# config.batchSystem = "kubernetes" # Kubernetes cluster

200

201

# Resource defaults

202

config.defaultMemory = human2bytes("2G") # Default job memory

203

config.defaultCores = 1 # Default CPU cores

204

config.defaultDisk = human2bytes("1G") # Default disk space

205

206

# Resource limits

207

config.maxCores = 32 # Maximum cores per job

208

config.maxMemory = human2bytes("64G") # Maximum memory per job

209

config.maxDisk = human2bytes("1T") # Maximum disk per job

210

211

# Error handling

212

config.retryCount = 3 # Job retry attempts

213

config.rescueJobsFrequency = 60 # Rescue job check interval

214

215

# Logging and monitoring

216

config.logLevel = "INFO" # Log verbosity

217

config.stats = True # Enable statistics collection

218

219

# Cleanup configuration

220

config.clean = "onSuccess" # Clean on workflow success

221

# config.clean = "always" # Always clean

222

# config.clean = "never" # Never clean

223

224

# Working directory

225

config.workDir = "/tmp/toil-work" # Temporary file location

226

227

# Preemptible job configuration

228

config.preemptibleWorkerTimeout = 1800 # Preemptible timeout (seconds)

229

config.defaultPreemptible = True # Jobs preemptible by default

230

```

231

232

### Workflow Execution Context

233

{ .api }

234

235

The `Toil` context manager handles workflow lifecycle and provides execution environment.

236

237

```python

238

from toil.common import Toil, Config

239

from toil.exceptions import FailedJobsException

240

241

def run_workflow():

242

config = Config()

243

config.jobStore = "file:/tmp/workflow-store"

244

config.batchSystem = "local"

245

246

try:

247

with Toil(config) as toil:

248

# Create root job

249

root_job = MainWorkflowJob("input_parameters")

250

251

# Start fresh workflow

252

if not toil.config.restart:

253

result = toil.start(root_job)

254

print(f"Workflow completed: {result}")

255

else:

256

# Restart failed workflow

257

result = toil.restart()

258

print(f"Workflow restarted: {result}")

259

260

return result

261

262

except FailedJobsException as e:

263

print(f"Workflow failed with {e.numberOfFailedJobs} failed jobs")

264

print(f"Job store: {e.jobStoreLocator}")

265

return None

266

267

# Alternative: manual context management

268

def manual_workflow_execution():

269

config = Config()

270

config.jobStore = "file:/tmp/manual-store"

271

272

toil = Toil(config)

273

try:

274

# Initialize workflow

275

toil.__enter__()

276

277

# Execute workflow

278

root_job = SimpleJob("data")

279

result = toil.start(root_job)

280

281

return result

282

finally:

283

# Cleanup

284

toil.__exit__(None, None, None)

285

```

286

287

### Advanced Job Patterns

288

{ .api }

289

290

Sophisticated job patterns for complex workflow requirements.

291

292

```python

293

from toil.job import Job, PromisedRequirement

294

295

class DynamicResourceJob(Job):

296

"""Job with resource requirements determined at runtime."""

297

298

def __init__(self, size_calculator_promise):

299

# Use promise to determine resources dynamically

300

dynamic_memory = PromisedRequirement(

301

lambda size: size * 1024 * 1024, # Convert MB to bytes

302

size_calculator_promise

303

)

304

305

super().__init__(

306

memory=dynamic_memory,

307

cores=1,

308

disk="1G"

309

)

310

self.size_promise = size_calculator_promise

311

312

def run(self, fileStore):

313

# Access resolved size value

314

actual_size = self.size_promise # Automatically resolved

315

fileStore.logToMaster(f"Processing size: {actual_size}MB")

316

return f"Processed {actual_size}MB of data"

317

318

class ServiceJob(Job):

319

"""Long-running service job."""

320

321

def __init__(self):

322

super().__init__(memory="512M", cores=1, disk="100M")

323

324

def run(self, fileStore):

325

# Start service process

326

service_process = start_monitoring_service()

327

328

# Service runs until workflow completes

329

try:

330

while True:

331

time.sleep(10)

332

if should_stop_service():

333

break

334

finally:

335

service_process.stop()

336

337

return "Service completed"

338

339

# Checkpointing for fault tolerance

340

class CheckpointedJob(Job):

341

def __init__(self, data):

342

super().__init__(

343

memory="1G",

344

cores=1,

345

disk="1G",

346

checkpoint=True # Enable checkpointing

347

)

348

self.data = data

349

350

def run(self, fileStore):

351

# Job can be restarted from checkpoint on failure

352

checkpoint_file = fileStore.getLocalTempFile()

353

354

# Save intermediate state

355

with open(checkpoint_file, 'w') as f:

356

json.dump(self.data, f)

357

358

# Long-running computation

359

result = expensive_computation(self.data)

360

361

return result

362

```

363

364

### Error Handling and Debugging

365

{ .api }

366

367

Comprehensive error handling and debugging capabilities for workflow development.

368

369

```python

370

from toil.job import Job, JobException

371

from toil.exceptions import FailedJobsException

372

373

class RobustJob(Job):

374

"""Job with comprehensive error handling."""

375

376

def run(self, fileStore):

377

try:

378

# Potentially failing operation

379

result = risky_operation()

380

381

except ValueError as e:

382

# Log error and continue

383

fileStore.logToMaster(f"Handled ValueError: {e}", level=logging.WARNING)

384

result = default_result()

385

386

except Exception as e:

387

# Fatal error - job will be retried

388

fileStore.logToMaster(f"Job failed: {e}", level=logging.ERROR)

389

raise JobException(f"Unrecoverable error: {e}")

390

391

return result

392

393

# Workflow-level error handling

394

def resilient_workflow():

395

config = Config()

396

config.retryCount = 5 # Retry failed jobs 5 times

397

398

try:

399

with Toil(config) as toil:

400

root_job = MainJob("input")

401

result = toil.start(root_job)

402

403

except FailedJobsException as e:

404

# Handle workflow failure

405

print(f"Workflow failed: {e.numberOfFailedJobs} jobs failed")

406

407

# Optionally restart from failure point

408

config.restart = True

409

with Toil(config) as toil:

410

result = toil.restart()

411

412

return result

413

```

414

415

This core workflow management system provides the foundation for building scalable, fault-tolerant computational pipelines with sophisticated resource management and flexible job scheduling patterns.