0
# Task and Workflow Management
1
2
Comprehensive definitions for tasks and workflows including templates, specifications, bindings, and execution models. This module provides the core abstractions for defining computational units (tasks) and their orchestration (workflows) with support for complex composition patterns including conditional branching, parallel execution, and dynamic workflows.
3
4
## Capabilities
5
6
### Task Templates
7
8
Task templates define the complete specification of computational units including their interface, runtime requirements, and custom configuration.
9
10
```python { .api }
11
class TaskTemplate:
12
"""Complete task definition with metadata and specifications."""
13
id: Identifier
14
type: str
15
metadata: TaskMetadata
16
interface: TypedInterface
17
custom: dict[str, Any]
18
container: Container
19
k8s_pod: K8sPod
20
sql: Sql
21
security_context: SecurityContext
22
extended_resources: ExtendedResources
23
config: dict[str, str]
24
25
class TaskMetadata:
26
"""Metadata for task execution and discovery."""
27
discoverable: bool
28
runtime: RuntimeMetadata
29
timeout: timedelta
30
retries: RetryStrategy
31
discovery_version: str
32
deprecated_error_message: str
33
interruptible: bool
34
cache_serializable: bool
35
generates_deck: bool
36
tags: dict[str, str]
37
pod_template_name: str
38
cache_ignore_input_vars: list[str]
39
40
class RuntimeMetadata:
41
"""Runtime metadata for task execution."""
42
type: RuntimeType
43
version: str
44
flavor: str
45
46
class RetryStrategy:
47
"""Retry configuration for failed task executions."""
48
retries: int
49
```
50
51
### Workflow Templates
52
53
Workflow templates define the orchestration of multiple tasks with complex execution patterns and data flow management.
54
55
```python { .api }
56
class WorkflowTemplate:
57
"""Complete workflow definition with nodes and connections."""
58
id: Identifier
59
metadata: WorkflowMetadata
60
interface: TypedInterface
61
nodes: list[Node]
62
outputs: list[Binding]
63
failure_node: Node
64
metadata_defaults: NodeMetadata
65
66
class WorkflowMetadata:
67
"""Metadata for workflow execution and management."""
68
queuing_budget: timedelta
69
tags: dict[str, str]
70
71
class Node:
72
"""Individual node within a workflow defining execution units."""
73
id: str
74
metadata: NodeMetadata
75
inputs: list[Binding]
76
upstream_node_ids: list[str]
77
output_aliases: list[Alias]
78
task_node: TaskNode
79
workflow_node: WorkflowNode
80
branch_node: BranchNode
81
gate_node: GateNode
82
83
class NodeMetadata:
84
"""Metadata for individual workflow nodes."""
85
name: str
86
timeout: timedelta
87
retries: RetryStrategy
88
interruptible: bool
89
cacheable: bool
90
cache_version: str
91
```
92
93
### Node Types
94
95
Different types of nodes supporting various execution patterns within workflows.
96
97
```python { .api }
98
class TaskNode:
99
"""Node that executes a specific task."""
100
reference_id: Identifier
101
overrides: TaskNodeOverrides
102
103
class WorkflowNode:
104
"""Node that executes a subworkflow."""
105
launchplan_ref: Identifier
106
sub_workflow_ref: Identifier
107
108
class BranchNode:
109
"""Node that provides conditional execution branching."""
110
if_else: IfElseBlock
111
112
class GateNode:
113
"""Node that provides approval gates and human-in-the-loop control."""
114
kind: GateNodeKind
115
condition: GateCondition
116
sleep: SleepCondition
117
approve: ApproveCondition
118
signal: SignalCondition
119
120
class ArrayNode:
121
"""Node that executes array jobs with parallelism control."""
122
node: Node
123
parallelism: int
124
min_successes: int
125
min_success_ratio: float
126
```
127
128
### Conditional Execution
129
130
Support for conditional branching and dynamic execution paths within workflows.
131
132
```python { .api }
133
class IfElseBlock:
134
"""Conditional execution block with if/else branches."""
135
condition: BooleanExpression
136
then_node: Node
137
else_node: Node
138
139
class IfBlock:
140
"""Conditional if block."""
141
condition: BooleanExpression
142
then_node: Node
143
144
class BooleanExpression:
145
"""Boolean expression for conditional evaluation."""
146
conjunction: ConjunctionExpression
147
comparison: ComparisonExpression
148
149
class ConjunctionExpression:
150
"""Logical conjunction of boolean expressions."""
151
operator: ConjunctionOperator
152
left_expression: BooleanExpression
153
right_expression: BooleanExpression
154
155
class ComparisonExpression:
156
"""Comparison expression between operands."""
157
operator: ComparisonOperator
158
left_value: Operand
159
right_value: Operand
160
161
class Operand:
162
"""Operand for expressions."""
163
primitive: Primitive
164
var: str
165
```
166
167
### Gate Conditions
168
169
Different types of gate conditions for workflow control and approval processes.
170
171
```python { .api }
172
class GateCondition:
173
"""Base gate condition specification."""
174
pass
175
176
class SleepCondition(GateCondition):
177
"""Sleep condition with duration specification."""
178
duration: timedelta
179
180
class ApproveCondition(GateCondition):
181
"""Approval condition requiring human intervention."""
182
signal_id: str
183
184
class SignalCondition(GateCondition):
185
"""Signal condition waiting for external signals."""
186
signal_id: str
187
type: LiteralType
188
output_variable_name: str
189
```
190
191
### Container Specifications
192
193
Container runtime specifications for task execution in containerized environments.
194
195
```python { .api }
196
class Container:
197
"""Container specification for task execution."""
198
image: str
199
command: list[str]
200
args: list[str]
201
resources: Resources
202
env: list[KeyValuePair]
203
config: list[KeyValuePair]
204
ports: list[ContainerPort]
205
data_config: DataLoadingConfig
206
207
class Resources:
208
"""Resource requirements and limits for containers."""
209
requests: list[ResourceEntry]
210
limits: list[ResourceEntry]
211
212
class ResourceEntry:
213
"""Individual resource specification."""
214
name: ResourceName
215
value: str
216
217
class ResourceName:
218
"""Resource name enumeration."""
219
UNKNOWN = 0
220
CPU = 1
221
GPU = 2
222
MEMORY = 3
223
STORAGE = 4
224
EPHEMERAL_STORAGE = 5
225
226
class KeyValuePair:
227
"""Key-value pair for configuration."""
228
key: str
229
value: str
230
231
class ContainerPort:
232
"""Container port specification."""
233
container_port: int
234
```
235
236
### K8s Pod Specifications
237
238
Kubernetes-specific pod specifications for advanced container orchestration.
239
240
```python { .api }
241
class K8sPod:
242
"""Kubernetes pod specification for task execution."""
243
metadata: K8sObjectMetadata
244
pod_spec: dict[str, Any]
245
data_config: DataLoadingConfig
246
247
class K8sObjectMetadata:
248
"""Kubernetes object metadata."""
249
labels: dict[str, str]
250
annotations: dict[str, str]
251
```
252
253
### SQL Task Specifications
254
255
SQL task specifications for database query execution.
256
257
```python { .api }
258
class Sql:
259
"""SQL task specification."""
260
statement: str
261
dialect: SqlDialect
262
263
class SqlDialect:
264
"""SQL dialect enumeration."""
265
UNDEFINED = 0
266
ANSI = 1
267
HIVE = 2
268
OTHER = 3
269
```
270
271
### Data Loading Configuration
272
273
Configuration for data loading and caching in task execution.
274
275
```python { .api }
276
class DataLoadingConfig:
277
"""Configuration for data loading behavior."""
278
enabled: bool
279
input_path: str
280
output_path: str
281
format: IOStrategy
282
io_strategy: IOStrategy
283
284
class IOStrategy:
285
"""I/O strategy enumeration."""
286
DOWNLOAD_EAGER = 0
287
DOWNLOAD_STREAM = 1
288
DO_NOT_DOWNLOAD = 2
289
```
290
291
### Workflow Closures
292
293
Complete workflow specifications with compiled information and metadata.
294
295
```python { .api }
296
class WorkflowClosure:
297
"""Complete workflow specification with compiled data."""
298
workflow: WorkflowTemplate
299
tasks: list[TaskTemplate]
300
sub_workflows: list[WorkflowTemplate]
301
302
class CompiledWorkflow:
303
"""Compiled workflow with resolved dependencies."""
304
template: WorkflowTemplate
305
connections: ConnectionSet
306
307
class ConnectionSet:
308
"""Set of connections between workflow nodes."""
309
downstream: dict[str, ConnectionSet.IdList]
310
upstream: dict[str, ConnectionSet.IdList]
311
312
class CompiledTask:
313
"""Compiled task with resolved configuration."""
314
template: TaskTemplate
315
```
316
317
## Usage Examples
318
319
### Creating a Task Template
320
321
```python
322
from flyteidl.core import tasks_pb2, identifier_pb2, interface_pb2
323
324
# Create task identifier
325
task_id = identifier_pb2.Identifier(
326
resource_type=identifier_pb2.ResourceType.TASK,
327
project="my-project",
328
domain="development",
329
name="data-processor",
330
version="v1.0.0"
331
)
332
333
# Create task metadata
334
metadata = tasks_pb2.TaskMetadata(
335
discoverable=True,
336
timeout=timedelta(minutes=30),
337
retries=tasks_pb2.RetryStrategy(retries=3),
338
interruptible=True
339
)
340
341
# Create container specification
342
container = tasks_pb2.Container(
343
image="python:3.9-slim",
344
command=["python"],
345
args=["-m", "my_module"],
346
resources=tasks_pb2.Resources(
347
requests=[
348
tasks_pb2.ResourceEntry(
349
name=tasks_pb2.ResourceName.CPU,
350
value="500m"
351
),
352
tasks_pb2.ResourceEntry(
353
name=tasks_pb2.ResourceName.MEMORY,
354
value="1Gi"
355
)
356
]
357
)
358
)
359
360
# Create task template
361
task_template = tasks_pb2.TaskTemplate(
362
id=task_id,
363
type="python-task",
364
metadata=metadata,
365
interface=interface, # Defined earlier
366
container=container
367
)
368
```
369
370
### Creating a Workflow Template
371
372
```python
373
from flyteidl.core import workflow_pb2
374
375
# Create workflow nodes
376
process_node = workflow_pb2.Node(
377
id="process-data",
378
inputs=[], # Define bindings
379
task_node=workflow_pb2.TaskNode(
380
reference_id=task_id # Reference to task above
381
)
382
)
383
384
validate_node = workflow_pb2.Node(
385
id="validate-results",
386
inputs=[], # Define bindings from process_node
387
upstream_node_ids=["process-data"],
388
task_node=workflow_pb2.TaskNode(
389
reference_id=validator_task_id
390
)
391
)
392
393
# Create workflow identifier
394
workflow_id = identifier_pb2.Identifier(
395
resource_type=identifier_pb2.ResourceType.WORKFLOW,
396
project="my-project",
397
domain="development",
398
name="data-pipeline",
399
version="v1.0.0"
400
)
401
402
# Create workflow template
403
workflow_template = workflow_pb2.WorkflowTemplate(
404
id=workflow_id,
405
interface=workflow_interface,
406
nodes=[process_node, validate_node],
407
outputs=[] # Define output bindings
408
)
409
```
410
411
### Creating Conditional Workflows
412
413
```python
414
from flyteidl.core import workflow_pb2
415
416
# Create condition for branching
417
condition = workflow_pb2.BooleanExpression(
418
comparison=workflow_pb2.ComparisonExpression(
419
operator=workflow_pb2.ComparisonOperator.GT,
420
left_value=workflow_pb2.Operand(var="input_count"),
421
right_value=workflow_pb2.Operand(
422
primitive=literals_pb2.Primitive(integer=100)
423
)
424
)
425
)
426
427
# Create branch node
428
branch_node = workflow_pb2.Node(
429
id="data-size-check",
430
branch_node=workflow_pb2.BranchNode(
431
if_else=workflow_pb2.IfElseBlock(
432
condition=condition,
433
then_node=large_data_processor_node,
434
else_node=small_data_processor_node
435
)
436
)
437
)
438
```
439
440
### Creating Array Jobs
441
442
```python
443
# Create array node for parallel processing
444
array_node = workflow_pb2.Node(
445
id="parallel-processing",
446
array_node=workflow_pb2.ArrayNode(
447
node=base_processing_node,
448
parallelism=10,
449
min_successes=8, # At least 8 out of 10 must succeed
450
min_success_ratio=0.8
451
)
452
)
453
```