IDL for Flyte Platform containing protobuf specifications, gRPC API definitions, and generated clients for multiple languages
—
Comprehensive definitions for tasks and workflows including templates, specifications, bindings, and execution models. This module provides the core abstractions for defining computational units (tasks) and their orchestration (workflows) with support for complex composition patterns including conditional branching, parallel execution, and dynamic workflows.
Task templates define the complete specification of computational units including their interface, runtime requirements, and custom configuration.
class TaskTemplate:
"""Complete task definition with metadata and specifications."""
id: Identifier
type: str
metadata: TaskMetadata
interface: TypedInterface
custom: dict[str, Any]
container: Container
k8s_pod: K8sPod
sql: Sql
security_context: SecurityContext
extended_resources: ExtendedResources
config: dict[str, str]
class TaskMetadata:
"""Metadata for task execution and discovery."""
discoverable: bool
runtime: RuntimeMetadata
timeout: timedelta
retries: RetryStrategy
discovery_version: str
deprecated_error_message: str
interruptible: bool
cache_serializable: bool
generates_deck: bool
tags: dict[str, str]
pod_template_name: str
cache_ignore_input_vars: list[str]
class RuntimeMetadata:
"""Runtime metadata for task execution."""
type: RuntimeType
version: str
flavor: str
class RetryStrategy:
"""Retry configuration for failed task executions."""
retries: intWorkflow templates define the orchestration of multiple tasks with complex execution patterns and data flow management.
class WorkflowTemplate:
"""Complete workflow definition with nodes and connections."""
id: Identifier
metadata: WorkflowMetadata
interface: TypedInterface
nodes: list[Node]
outputs: list[Binding]
failure_node: Node
metadata_defaults: NodeMetadata
class WorkflowMetadata:
"""Metadata for workflow execution and management."""
queuing_budget: timedelta
tags: dict[str, str]
class Node:
"""Individual node within a workflow defining execution units."""
id: str
metadata: NodeMetadata
inputs: list[Binding]
upstream_node_ids: list[str]
output_aliases: list[Alias]
task_node: TaskNode
workflow_node: WorkflowNode
branch_node: BranchNode
gate_node: GateNode
class NodeMetadata:
"""Metadata for individual workflow nodes."""
name: str
timeout: timedelta
retries: RetryStrategy
interruptible: bool
cacheable: bool
cache_version: strDifferent types of nodes supporting various execution patterns within workflows.
class TaskNode:
"""Node that executes a specific task."""
reference_id: Identifier
overrides: TaskNodeOverrides
class WorkflowNode:
"""Node that executes a subworkflow."""
launchplan_ref: Identifier
sub_workflow_ref: Identifier
class BranchNode:
"""Node that provides conditional execution branching."""
if_else: IfElseBlock
class GateNode:
"""Node that provides approval gates and human-in-the-loop control."""
kind: GateNodeKind
condition: GateCondition
sleep: SleepCondition
approve: ApproveCondition
signal: SignalCondition
class ArrayNode:
"""Node that executes array jobs with parallelism control."""
node: Node
parallelism: int
min_successes: int
min_success_ratio: floatSupport for conditional branching and dynamic execution paths within workflows.
class IfElseBlock:
"""Conditional execution block with if/else branches."""
condition: BooleanExpression
then_node: Node
else_node: Node
class IfBlock:
"""Conditional if block."""
condition: BooleanExpression
then_node: Node
class BooleanExpression:
"""Boolean expression for conditional evaluation."""
conjunction: ConjunctionExpression
comparison: ComparisonExpression
class ConjunctionExpression:
"""Logical conjunction of boolean expressions."""
operator: ConjunctionOperator
left_expression: BooleanExpression
right_expression: BooleanExpression
class ComparisonExpression:
"""Comparison expression between operands."""
operator: ComparisonOperator
left_value: Operand
right_value: Operand
class Operand:
"""Operand for expressions."""
primitive: Primitive
var: strDifferent types of gate conditions for workflow control and approval processes.
class GateCondition:
"""Base gate condition specification."""
pass
class SleepCondition(GateCondition):
"""Sleep condition with duration specification."""
duration: timedelta
class ApproveCondition(GateCondition):
"""Approval condition requiring human intervention."""
signal_id: str
class SignalCondition(GateCondition):
"""Signal condition waiting for external signals."""
signal_id: str
type: LiteralType
output_variable_name: strContainer runtime specifications for task execution in containerized environments.
class Container:
"""Container specification for task execution."""
image: str
command: list[str]
args: list[str]
resources: Resources
env: list[KeyValuePair]
config: list[KeyValuePair]
ports: list[ContainerPort]
data_config: DataLoadingConfig
class Resources:
"""Resource requirements and limits for containers."""
requests: list[ResourceEntry]
limits: list[ResourceEntry]
class ResourceEntry:
"""Individual resource specification."""
name: ResourceName
value: str
class ResourceName:
"""Resource name enumeration."""
UNKNOWN = 0
CPU = 1
GPU = 2
MEMORY = 3
STORAGE = 4
EPHEMERAL_STORAGE = 5
class KeyValuePair:
"""Key-value pair for configuration."""
key: str
value: str
class ContainerPort:
"""Container port specification."""
container_port: intKubernetes-specific pod specifications for advanced container orchestration.
class K8sPod:
"""Kubernetes pod specification for task execution."""
metadata: K8sObjectMetadata
pod_spec: dict[str, Any]
data_config: DataLoadingConfig
class K8sObjectMetadata:
"""Kubernetes object metadata."""
labels: dict[str, str]
annotations: dict[str, str]SQL task specifications for database query execution.
class Sql:
"""SQL task specification."""
statement: str
dialect: SqlDialect
class SqlDialect:
"""SQL dialect enumeration."""
UNDEFINED = 0
ANSI = 1
HIVE = 2
OTHER = 3Configuration for data loading and caching in task execution.
class DataLoadingConfig:
"""Configuration for data loading behavior."""
enabled: bool
input_path: str
output_path: str
format: IOStrategy
io_strategy: IOStrategy
class IOStrategy:
"""I/O strategy enumeration."""
DOWNLOAD_EAGER = 0
DOWNLOAD_STREAM = 1
DO_NOT_DOWNLOAD = 2Complete workflow specifications with compiled information and metadata.
class WorkflowClosure:
"""Complete workflow specification with compiled data."""
workflow: WorkflowTemplate
tasks: list[TaskTemplate]
sub_workflows: list[WorkflowTemplate]
class CompiledWorkflow:
"""Compiled workflow with resolved dependencies."""
template: WorkflowTemplate
connections: ConnectionSet
class ConnectionSet:
"""Set of connections between workflow nodes."""
downstream: dict[str, ConnectionSet.IdList]
upstream: dict[str, ConnectionSet.IdList]
class CompiledTask:
"""Compiled task with resolved configuration."""
template: TaskTemplatefrom flyteidl.core import tasks_pb2, identifier_pb2, interface_pb2
# Create task identifier
task_id = identifier_pb2.Identifier(
resource_type=identifier_pb2.ResourceType.TASK,
project="my-project",
domain="development",
name="data-processor",
version="v1.0.0"
)
# Create task metadata
metadata = tasks_pb2.TaskMetadata(
discoverable=True,
timeout=timedelta(minutes=30),
retries=tasks_pb2.RetryStrategy(retries=3),
interruptible=True
)
# Create container specification
container = tasks_pb2.Container(
image="python:3.9-slim",
command=["python"],
args=["-m", "my_module"],
resources=tasks_pb2.Resources(
requests=[
tasks_pb2.ResourceEntry(
name=tasks_pb2.ResourceName.CPU,
value="500m"
),
tasks_pb2.ResourceEntry(
name=tasks_pb2.ResourceName.MEMORY,
value="1Gi"
)
]
)
)
# Create task template
task_template = tasks_pb2.TaskTemplate(
id=task_id,
type="python-task",
metadata=metadata,
interface=interface, # Defined earlier
container=container
)from flyteidl.core import workflow_pb2
# Create workflow nodes
process_node = workflow_pb2.Node(
id="process-data",
inputs=[], # Define bindings
task_node=workflow_pb2.TaskNode(
reference_id=task_id # Reference to task above
)
)
validate_node = workflow_pb2.Node(
id="validate-results",
inputs=[], # Define bindings from process_node
upstream_node_ids=["process-data"],
task_node=workflow_pb2.TaskNode(
reference_id=validator_task_id
)
)
# Create workflow identifier
workflow_id = identifier_pb2.Identifier(
resource_type=identifier_pb2.ResourceType.WORKFLOW,
project="my-project",
domain="development",
name="data-pipeline",
version="v1.0.0"
)
# Create workflow template
workflow_template = workflow_pb2.WorkflowTemplate(
id=workflow_id,
interface=workflow_interface,
nodes=[process_node, validate_node],
outputs=[] # Define output bindings
)from flyteidl.core import workflow_pb2
# Create condition for branching
condition = workflow_pb2.BooleanExpression(
comparison=workflow_pb2.ComparisonExpression(
operator=workflow_pb2.ComparisonOperator.GT,
left_value=workflow_pb2.Operand(var="input_count"),
right_value=workflow_pb2.Operand(
primitive=literals_pb2.Primitive(integer=100)
)
)
)
# Create branch node
branch_node = workflow_pb2.Node(
id="data-size-check",
branch_node=workflow_pb2.BranchNode(
if_else=workflow_pb2.IfElseBlock(
condition=condition,
then_node=large_data_processor_node,
else_node=small_data_processor_node
)
)
)# Create array node for parallel processing
array_node = workflow_pb2.Node(
id="parallel-processing",
array_node=workflow_pb2.ArrayNode(
node=base_processing_node,
parallelism=10,
min_successes=8, # At least 8 out of 10 must succeed
min_success_ratio=0.8
)
)Install with Tessl CLI
npx tessl i tessl/pypi-flyteidl