or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin-api.mdcore-types.mddata-management.mdindex.mdmulti-language.mdplugins.mdtasks-workflows.md

tasks-workflows.mddocs/

0

# Task and Workflow Management

1

2

Comprehensive definitions for tasks and workflows including templates, specifications, bindings, and execution models. This module provides the core abstractions for defining computational units (tasks) and their orchestration (workflows) with support for complex composition patterns including conditional branching, parallel execution, and dynamic workflows.

3

4

## Capabilities

5

6

### Task Templates

7

8

Task templates define the complete specification of computational units including their interface, runtime requirements, and custom configuration.

9

10

```python { .api }

11

class TaskTemplate:

12

"""Complete task definition with metadata and specifications."""

13

id: Identifier

14

type: str

15

metadata: TaskMetadata

16

interface: TypedInterface

17

custom: dict[str, Any]

18

container: Container

19

k8s_pod: K8sPod

20

sql: Sql

21

security_context: SecurityContext

22

extended_resources: ExtendedResources

23

config: dict[str, str]

24

25

class TaskMetadata:

26

"""Metadata for task execution and discovery."""

27

discoverable: bool

28

runtime: RuntimeMetadata

29

timeout: timedelta

30

retries: RetryStrategy

31

discovery_version: str

32

deprecated_error_message: str

33

interruptible: bool

34

cache_serializable: bool

35

generates_deck: bool

36

tags: dict[str, str]

37

pod_template_name: str

38

cache_ignore_input_vars: list[str]

39

40

class RuntimeMetadata:

41

"""Runtime metadata for task execution."""

42

type: RuntimeType

43

version: str

44

flavor: str

45

46

class RetryStrategy:

47

"""Retry configuration for failed task executions."""

48

retries: int

49

```

50

51

### Workflow Templates

52

53

Workflow templates define the orchestration of multiple tasks with complex execution patterns and data flow management.

54

55

```python { .api }

56

class WorkflowTemplate:

57

"""Complete workflow definition with nodes and connections."""

58

id: Identifier

59

metadata: WorkflowMetadata

60

interface: TypedInterface

61

nodes: list[Node]

62

outputs: list[Binding]

63

failure_node: Node

64

metadata_defaults: NodeMetadata

65

66

class WorkflowMetadata:

67

"""Metadata for workflow execution and management."""

68

queuing_budget: timedelta

69

tags: dict[str, str]

70

71

class Node:

72

"""Individual node within a workflow defining execution units."""

73

id: str

74

metadata: NodeMetadata

75

inputs: list[Binding]

76

upstream_node_ids: list[str]

77

output_aliases: list[Alias]

78

task_node: TaskNode

79

workflow_node: WorkflowNode

80

branch_node: BranchNode

81

gate_node: GateNode

82

83

class NodeMetadata:

84

"""Metadata for individual workflow nodes."""

85

name: str

86

timeout: timedelta

87

retries: RetryStrategy

88

interruptible: bool

89

cacheable: bool

90

cache_version: str

91

```

92

93

### Node Types

94

95

Different types of nodes supporting various execution patterns within workflows.

96

97

```python { .api }

98

class TaskNode:

99

"""Node that executes a specific task."""

100

reference_id: Identifier

101

overrides: TaskNodeOverrides

102

103

class WorkflowNode:

104

"""Node that executes a subworkflow."""

105

launchplan_ref: Identifier

106

sub_workflow_ref: Identifier

107

108

class BranchNode:

109

"""Node that provides conditional execution branching."""

110

if_else: IfElseBlock

111

112

class GateNode:

113

"""Node that provides approval gates and human-in-the-loop control."""

114

kind: GateNodeKind

115

condition: GateCondition

116

sleep: SleepCondition

117

approve: ApproveCondition

118

signal: SignalCondition

119

120

class ArrayNode:

121

"""Node that executes array jobs with parallelism control."""

122

node: Node

123

parallelism: int

124

min_successes: int

125

min_success_ratio: float

126

```

127

128

### Conditional Execution

129

130

Support for conditional branching and dynamic execution paths within workflows.

131

132

```python { .api }

133

class IfElseBlock:

134

"""Conditional execution block with if/else branches."""

135

condition: BooleanExpression

136

then_node: Node

137

else_node: Node

138

139

class IfBlock:

140

"""Conditional if block."""

141

condition: BooleanExpression

142

then_node: Node

143

144

class BooleanExpression:

145

"""Boolean expression for conditional evaluation."""

146

conjunction: ConjunctionExpression

147

comparison: ComparisonExpression

148

149

class ConjunctionExpression:

150

"""Logical conjunction of boolean expressions."""

151

operator: ConjunctionOperator

152

left_expression: BooleanExpression

153

right_expression: BooleanExpression

154

155

class ComparisonExpression:

156

"""Comparison expression between operands."""

157

operator: ComparisonOperator

158

left_value: Operand

159

right_value: Operand

160

161

class Operand:

162

"""Operand for expressions."""

163

primitive: Primitive

164

var: str

165

```

166

167

### Gate Conditions

168

169

Different types of gate conditions for workflow control and approval processes.

170

171

```python { .api }

172

class GateCondition:

173

"""Base gate condition specification."""

174

pass

175

176

class SleepCondition(GateCondition):

177

"""Sleep condition with duration specification."""

178

duration: timedelta

179

180

class ApproveCondition(GateCondition):

181

"""Approval condition requiring human intervention."""

182

signal_id: str

183

184

class SignalCondition(GateCondition):

185

"""Signal condition waiting for external signals."""

186

signal_id: str

187

type: LiteralType

188

output_variable_name: str

189

```

190

191

### Container Specifications

192

193

Container runtime specifications for task execution in containerized environments.

194

195

```python { .api }

196

class Container:

197

"""Container specification for task execution."""

198

image: str

199

command: list[str]

200

args: list[str]

201

resources: Resources

202

env: list[KeyValuePair]

203

config: list[KeyValuePair]

204

ports: list[ContainerPort]

205

data_config: DataLoadingConfig

206

207

class Resources:

208

"""Resource requirements and limits for containers."""

209

requests: list[ResourceEntry]

210

limits: list[ResourceEntry]

211

212

class ResourceEntry:

213

"""Individual resource specification."""

214

name: ResourceName

215

value: str

216

217

class ResourceName:

218

"""Resource name enumeration."""

219

UNKNOWN = 0

220

CPU = 1

221

GPU = 2

222

MEMORY = 3

223

STORAGE = 4

224

EPHEMERAL_STORAGE = 5

225

226

class KeyValuePair:

227

"""Key-value pair for configuration."""

228

key: str

229

value: str

230

231

class ContainerPort:

232

"""Container port specification."""

233

container_port: int

234

```

235

236

### K8s Pod Specifications

237

238

Kubernetes-specific pod specifications for advanced container orchestration.

239

240

```python { .api }

241

class K8sPod:

242

"""Kubernetes pod specification for task execution."""

243

metadata: K8sObjectMetadata

244

pod_spec: dict[str, Any]

245

data_config: DataLoadingConfig

246

247

class K8sObjectMetadata:

248

"""Kubernetes object metadata."""

249

labels: dict[str, str]

250

annotations: dict[str, str]

251

```

252

253

### SQL Task Specifications

254

255

SQL task specifications for database query execution.

256

257

```python { .api }

258

class Sql:

259

"""SQL task specification."""

260

statement: str

261

dialect: SqlDialect

262

263

class SqlDialect:

264

"""SQL dialect enumeration."""

265

UNDEFINED = 0

266

ANSI = 1

267

HIVE = 2

268

OTHER = 3

269

```

270

271

### Data Loading Configuration

272

273

Configuration for data loading and caching in task execution.

274

275

```python { .api }

276

class DataLoadingConfig:

277

"""Configuration for data loading behavior."""

278

enabled: bool

279

input_path: str

280

output_path: str

281

format: IOStrategy

282

io_strategy: IOStrategy

283

284

class IOStrategy:

285

"""I/O strategy enumeration."""

286

DOWNLOAD_EAGER = 0

287

DOWNLOAD_STREAM = 1

288

DO_NOT_DOWNLOAD = 2

289

```

290

291

### Workflow Closures

292

293

Complete workflow specifications with compiled information and metadata.

294

295

```python { .api }

296

class WorkflowClosure:

297

"""Complete workflow specification with compiled data."""

298

workflow: WorkflowTemplate

299

tasks: list[TaskTemplate]

300

sub_workflows: list[WorkflowTemplate]

301

302

class CompiledWorkflow:

303

"""Compiled workflow with resolved dependencies."""

304

template: WorkflowTemplate

305

connections: ConnectionSet

306

307

class ConnectionSet:

308

"""Set of connections between workflow nodes."""

309

downstream: dict[str, ConnectionSet.IdList]

310

upstream: dict[str, ConnectionSet.IdList]

311

312

class CompiledTask:

313

"""Compiled task with resolved configuration."""

314

template: TaskTemplate

315

```

316

317

## Usage Examples

318

319

### Creating a Task Template

320

321

```python

322

from flyteidl.core import tasks_pb2, identifier_pb2, interface_pb2

323

324

# Create task identifier

325

task_id = identifier_pb2.Identifier(

326

resource_type=identifier_pb2.ResourceType.TASK,

327

project="my-project",

328

domain="development",

329

name="data-processor",

330

version="v1.0.0"

331

)

332

333

# Create task metadata

334

metadata = tasks_pb2.TaskMetadata(

335

discoverable=True,

336

timeout=timedelta(minutes=30),

337

retries=tasks_pb2.RetryStrategy(retries=3),

338

interruptible=True

339

)

340

341

# Create container specification

342

container = tasks_pb2.Container(

343

image="python:3.9-slim",

344

command=["python"],

345

args=["-m", "my_module"],

346

resources=tasks_pb2.Resources(

347

requests=[

348

tasks_pb2.ResourceEntry(

349

name=tasks_pb2.ResourceName.CPU,

350

value="500m"

351

),

352

tasks_pb2.ResourceEntry(

353

name=tasks_pb2.ResourceName.MEMORY,

354

value="1Gi"

355

)

356

]

357

)

358

)

359

360

# Create task template

361

task_template = tasks_pb2.TaskTemplate(

362

id=task_id,

363

type="python-task",

364

metadata=metadata,

365

interface=interface, # Defined earlier

366

container=container

367

)

368

```

369

370

### Creating a Workflow Template

371

372

```python

373

from flyteidl.core import workflow_pb2

374

375

# Create workflow nodes

376

process_node = workflow_pb2.Node(

377

id="process-data",

378

inputs=[], # Define bindings

379

task_node=workflow_pb2.TaskNode(

380

reference_id=task_id # Reference to task above

381

)

382

)

383

384

validate_node = workflow_pb2.Node(

385

id="validate-results",

386

inputs=[], # Define bindings from process_node

387

upstream_node_ids=["process-data"],

388

task_node=workflow_pb2.TaskNode(

389

reference_id=validator_task_id

390

)

391

)

392

393

# Create workflow identifier

394

workflow_id = identifier_pb2.Identifier(

395

resource_type=identifier_pb2.ResourceType.WORKFLOW,

396

project="my-project",

397

domain="development",

398

name="data-pipeline",

399

version="v1.0.0"

400

)

401

402

# Create workflow template

403

workflow_template = workflow_pb2.WorkflowTemplate(

404

id=workflow_id,

405

interface=workflow_interface,

406

nodes=[process_node, validate_node],

407

outputs=[] # Define output bindings

408

)

409

```

410

411

### Creating Conditional Workflows

412

413

```python

414

from flyteidl.core import workflow_pb2

415

416

# Create condition for branching

417

condition = workflow_pb2.BooleanExpression(

418

comparison=workflow_pb2.ComparisonExpression(

419

operator=workflow_pb2.ComparisonOperator.GT,

420

left_value=workflow_pb2.Operand(var="input_count"),

421

right_value=workflow_pb2.Operand(

422

primitive=literals_pb2.Primitive(integer=100)

423

)

424

)

425

)

426

427

# Create branch node

428

branch_node = workflow_pb2.Node(

429

id="data-size-check",

430

branch_node=workflow_pb2.BranchNode(

431

if_else=workflow_pb2.IfElseBlock(

432

condition=condition,

433

then_node=large_data_processor_node,

434

else_node=small_data_processor_node

435

)

436

)

437

)

438

```

439

440

### Creating Array Jobs

441

442

```python

443

# Create array node for parallel processing

444

array_node = workflow_pb2.Node(

445

id="parallel-processing",

446

array_node=workflow_pb2.ArrayNode(

447

node=base_processing_node,

448

parallelism=10,

449

min_successes=8, # At least 8 out of 10 must succeed

450

min_success_ratio=0.8

451

)

452

)

453

```