CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-flyteidl

IDL for Flyte Platform containing protobuf specifications, gRPC API definitions, and generated clients for multiple languages

Pending
Overview
Eval results
Files

tasks-workflows.mddocs/

Task and Workflow Management

Comprehensive definitions for tasks and workflows including templates, specifications, bindings, and execution models. This module provides the core abstractions for defining computational units (tasks) and their orchestration (workflows) with support for complex composition patterns including conditional branching, parallel execution, and dynamic workflows.

Capabilities

Task Templates

Task templates define the complete specification of computational units including their interface, runtime requirements, and custom configuration.

class TaskTemplate:
    """Complete task definition with metadata and specifications."""
    id: Identifier
    type: str
    metadata: TaskMetadata
    interface: TypedInterface
    custom: dict[str, Any]
    container: Container
    k8s_pod: K8sPod
    sql: Sql
    security_context: SecurityContext
    extended_resources: ExtendedResources
    config: dict[str, str]

class TaskMetadata:
    """Metadata for task execution and discovery."""
    discoverable: bool
    runtime: RuntimeMetadata
    timeout: timedelta
    retries: RetryStrategy
    discovery_version: str
    deprecated_error_message: str
    interruptible: bool
    cache_serializable: bool
    generates_deck: bool
    tags: dict[str, str]
    pod_template_name: str
    cache_ignore_input_vars: list[str]

class RuntimeMetadata:
    """Runtime metadata for task execution."""
    type: RuntimeType
    version: str
    flavor: str

class RetryStrategy:
    """Retry configuration for failed task executions."""
    retries: int

Workflow Templates

Workflow templates define the orchestration of multiple tasks with complex execution patterns and data flow management.

class WorkflowTemplate:
    """Complete workflow definition with nodes and connections."""
    id: Identifier
    metadata: WorkflowMetadata
    interface: TypedInterface
    nodes: list[Node]
    outputs: list[Binding]
    failure_node: Node
    metadata_defaults: NodeMetadata

class WorkflowMetadata:
    """Metadata for workflow execution and management."""
    queuing_budget: timedelta
    tags: dict[str, str]

class Node:
    """Individual node within a workflow defining execution units."""
    id: str
    metadata: NodeMetadata
    inputs: list[Binding]
    upstream_node_ids: list[str]
    output_aliases: list[Alias]
    task_node: TaskNode
    workflow_node: WorkflowNode
    branch_node: BranchNode
    gate_node: GateNode

class NodeMetadata:
    """Metadata for individual workflow nodes."""
    name: str
    timeout: timedelta
    retries: RetryStrategy
    interruptible: bool
    cacheable: bool
    cache_version: str

Node Types

Different types of nodes supporting various execution patterns within workflows.

class TaskNode:
    """Node that executes a specific task."""
    reference_id: Identifier
    overrides: TaskNodeOverrides

class WorkflowNode:
    """Node that executes a subworkflow."""
    launchplan_ref: Identifier
    sub_workflow_ref: Identifier

class BranchNode:
    """Node that provides conditional execution branching."""
    if_else: IfElseBlock

class GateNode:
    """Node that provides approval gates and human-in-the-loop control."""
    kind: GateNodeKind
    condition: GateCondition
    sleep: SleepCondition
    approve: ApproveCondition
    signal: SignalCondition

class ArrayNode:
    """Node that executes array jobs with parallelism control."""
    node: Node
    parallelism: int
    min_successes: int
    min_success_ratio: float

Conditional Execution

Support for conditional branching and dynamic execution paths within workflows.

class IfElseBlock:
    """Conditional execution block with if/else branches."""
    condition: BooleanExpression
    then_node: Node
    else_node: Node

class IfBlock:
    """Conditional if block."""
    condition: BooleanExpression
    then_node: Node

class BooleanExpression:
    """Boolean expression for conditional evaluation."""
    conjunction: ConjunctionExpression
    comparison: ComparisonExpression

class ConjunctionExpression:
    """Logical conjunction of boolean expressions."""
    operator: ConjunctionOperator
    left_expression: BooleanExpression
    right_expression: BooleanExpression

class ComparisonExpression:
    """Comparison expression between operands."""
    operator: ComparisonOperator
    left_value: Operand
    right_value: Operand

class Operand:
    """Operand for expressions."""
    primitive: Primitive
    var: str

Gate Conditions

Different types of gate conditions for workflow control and approval processes.

class GateCondition:
    """Base gate condition specification."""
    pass

class SleepCondition(GateCondition):
    """Sleep condition with duration specification."""
    duration: timedelta

class ApproveCondition(GateCondition):
    """Approval condition requiring human intervention."""
    signal_id: str

class SignalCondition(GateCondition):
    """Signal condition waiting for external signals."""
    signal_id: str
    type: LiteralType
    output_variable_name: str

Container Specifications

Container runtime specifications for task execution in containerized environments.

class Container:
    """Container specification for task execution."""
    image: str
    command: list[str]
    args: list[str]
    resources: Resources
    env: list[KeyValuePair]
    config: list[KeyValuePair]
    ports: list[ContainerPort]
    data_config: DataLoadingConfig

class Resources:
    """Resource requirements and limits for containers."""
    requests: list[ResourceEntry]
    limits: list[ResourceEntry]

class ResourceEntry:
    """Individual resource specification."""
    name: ResourceName
    value: str

class ResourceName:
    """Resource name enumeration."""
    UNKNOWN = 0
    CPU = 1
    GPU = 2
    MEMORY = 3
    STORAGE = 4
    EPHEMERAL_STORAGE = 5

class KeyValuePair:
    """Key-value pair for configuration."""
    key: str
    value: str

class ContainerPort:
    """Container port specification."""
    container_port: int

K8s Pod Specifications

Kubernetes-specific pod specifications for advanced container orchestration.

class K8sPod:
    """Kubernetes pod specification for task execution."""
    metadata: K8sObjectMetadata
    pod_spec: dict[str, Any]
    data_config: DataLoadingConfig

class K8sObjectMetadata:
    """Kubernetes object metadata."""
    labels: dict[str, str]
    annotations: dict[str, str]

SQL Task Specifications

SQL task specifications for database query execution.

class Sql:
    """SQL task specification."""
    statement: str
    dialect: SqlDialect

class SqlDialect:
    """SQL dialect enumeration."""
    UNDEFINED = 0
    ANSI = 1
    HIVE = 2
    OTHER = 3

Data Loading Configuration

Configuration for data loading and caching in task execution.

class DataLoadingConfig:
    """Configuration for data loading behavior."""
    enabled: bool
    input_path: str
    output_path: str
    format: IOStrategy
    io_strategy: IOStrategy

class IOStrategy:
    """I/O strategy enumeration."""
    DOWNLOAD_EAGER = 0
    DOWNLOAD_STREAM = 1
    DO_NOT_DOWNLOAD = 2

Workflow Closures

Complete workflow specifications with compiled information and metadata.

class WorkflowClosure:
    """Complete workflow specification with compiled data."""
    workflow: WorkflowTemplate
    tasks: list[TaskTemplate]
    sub_workflows: list[WorkflowTemplate]

class CompiledWorkflow:
    """Compiled workflow with resolved dependencies."""
    template: WorkflowTemplate
    connections: ConnectionSet

class ConnectionSet:
    """Set of connections between workflow nodes."""
    downstream: dict[str, ConnectionSet.IdList]
    upstream: dict[str, ConnectionSet.IdList]

class CompiledTask:
    """Compiled task with resolved configuration."""
    template: TaskTemplate

Usage Examples

Creating a Task Template

from flyteidl.core import tasks_pb2, identifier_pb2, interface_pb2

# Create task identifier
task_id = identifier_pb2.Identifier(
    resource_type=identifier_pb2.ResourceType.TASK,
    project="my-project",
    domain="development",
    name="data-processor",
    version="v1.0.0"
)

# Create task metadata
metadata = tasks_pb2.TaskMetadata(
    discoverable=True,
    timeout=timedelta(minutes=30),
    retries=tasks_pb2.RetryStrategy(retries=3),
    interruptible=True
)

# Create container specification
container = tasks_pb2.Container(
    image="python:3.9-slim",
    command=["python"],
    args=["-m", "my_module"],
    resources=tasks_pb2.Resources(
        requests=[
            tasks_pb2.ResourceEntry(
                name=tasks_pb2.ResourceName.CPU,
                value="500m"
            ),
            tasks_pb2.ResourceEntry(
                name=tasks_pb2.ResourceName.MEMORY,
                value="1Gi"
            )
        ]
    )
)

# Create task template
task_template = tasks_pb2.TaskTemplate(
    id=task_id,
    type="python-task",
    metadata=metadata,
    interface=interface,  # Defined earlier
    container=container
)

Creating a Workflow Template

from flyteidl.core import workflow_pb2

# Create workflow nodes
process_node = workflow_pb2.Node(
    id="process-data",
    inputs=[],  # Define bindings
    task_node=workflow_pb2.TaskNode(
        reference_id=task_id  # Reference to task above
    )
)

validate_node = workflow_pb2.Node(
    id="validate-results", 
    inputs=[],  # Define bindings from process_node
    upstream_node_ids=["process-data"],
    task_node=workflow_pb2.TaskNode(
        reference_id=validator_task_id
    )
)

# Create workflow identifier
workflow_id = identifier_pb2.Identifier(
    resource_type=identifier_pb2.ResourceType.WORKFLOW,
    project="my-project",
    domain="development",
    name="data-pipeline",
    version="v1.0.0"
)

# Create workflow template
workflow_template = workflow_pb2.WorkflowTemplate(
    id=workflow_id,
    interface=workflow_interface,
    nodes=[process_node, validate_node],
    outputs=[]  # Define output bindings
)

Creating Conditional Workflows

from flyteidl.core import workflow_pb2

# Create condition for branching
condition = workflow_pb2.BooleanExpression(
    comparison=workflow_pb2.ComparisonExpression(
        operator=workflow_pb2.ComparisonOperator.GT,
        left_value=workflow_pb2.Operand(var="input_count"),
        right_value=workflow_pb2.Operand(
            primitive=literals_pb2.Primitive(integer=100)
        )
    )
)

# Create branch node
branch_node = workflow_pb2.Node(
    id="data-size-check",
    branch_node=workflow_pb2.BranchNode(
        if_else=workflow_pb2.IfElseBlock(
            condition=condition,
            then_node=large_data_processor_node,
            else_node=small_data_processor_node
        )
    )
)

Creating Array Jobs

# Create array node for parallel processing
array_node = workflow_pb2.Node(
    id="parallel-processing",
    array_node=workflow_pb2.ArrayNode(
        node=base_processing_node,
        parallelism=10,
        min_successes=8,  # At least 8 out of 10 must succeed
        min_success_ratio=0.8
    )
)

Install with Tessl CLI

npx tessl i tessl/pypi-flyteidl

docs

admin-api.md

core-types.md

data-management.md

index.md

multi-language.md

plugins.md

tasks-workflows.md

tile.json