IDL for Flyte Platform containing protobuf specifications, gRPC API definitions, and generated clients for multiple languages
—
Extensible plugin framework supporting diverse compute requirements including Apache Spark, machine learning frameworks (TensorFlow, PyTorch), distributed computing (Dask, Ray), database queries, and specialized execution patterns. The plugin system enables Flyte to integrate with various execution backends while maintaining a consistent interface.
Execute Apache Spark applications with comprehensive configuration support for different Spark application types and cluster management.
class SparkJob:
"""Apache Spark job configuration."""
spark_conf: dict[str, str]
application_file: str
executor_path: str
main_application_file: str
main_class: str
spark_type: SparkType
databricks_conf: DatabricksConf
databricks_token: str
databricks_instance: str
class SparkType:
"""Spark application type enumeration."""
PYTHON = 0
JAVA = 1
SCALA = 2
R = 3
class DatabricksConf:
"""Databricks-specific configuration."""
databricks_token: str
databricks_instance: str
tags: dict[str, str]Execute PyTorch distributed training jobs with support for various distributed training strategies.
class PyTorchJob:
"""PyTorch distributed training job configuration."""
workers: int
master_replicas: DistributedPyTorchTrainingReplicaSpec
worker_replicas: DistributedPyTorchTrainingReplicaSpec
class DistributedPyTorchTrainingReplicaSpec:
"""Replica specification for PyTorch distributed training."""
replicas: int
image: str
resources: k8s.ResourceRequirements
restart_policy: RestartPolicy
common: CommonReplicaSpec
class RestartPolicy:
"""Restart policy enumeration."""
NEVER = 0
ON_FAILURE = 1
ALWAYS = 2
class CommonReplicaSpec:
"""Common replica specification."""
replicas: int
template: k8s.PodTemplateSpec
restart_policy: RestartPolicyExecute TensorFlow distributed training jobs with parameter server and worker configuration.
class TensorFlowJob:
"""TensorFlow distributed training job configuration."""
workers: int
ps_replicas: DistributedTensorFlowTrainingReplicaSpec
chief_replicas: DistributedTensorFlowTrainingReplicaSpec
worker_replicas: DistributedTensorFlowTrainingReplicaSpec
evaluator_replicas: DistributedTensorFlowTrainingReplicaSpec
run_policy: RunPolicy
class DistributedTensorFlowTrainingReplicaSpec:
"""Replica specification for TensorFlow distributed training."""
replicas: int
image: str
resources: k8s.ResourceRequirements
restart_policy: RestartPolicy
common: CommonReplicaSpec
class RunPolicy:
"""Run policy for training jobs."""
clean_pod_policy: CleanPodPolicy
ttl_seconds_after_finished: int
active_deadline_seconds: int
backoff_limit: int
class CleanPodPolicy:
"""Pod cleanup policy enumeration."""
NONE = 0
ALL = 1
RUNNING = 2
SUCCEEDED = 3Execute MPI (Message Passing Interface) distributed computing jobs for high-performance computing workloads.
class MpiJob:
"""MPI distributed computing job configuration."""
slots: int
replicas: int
launcher_replicas: MpiReplicaSpec
worker_replicas: MpiReplicaSpec
run_policy: RunPolicy
class MpiReplicaSpec:
"""MPI replica specification."""
replicas: int
image: str
resources: k8s.ResourceRequirements
restart_policy: RestartPolicy
common: CommonReplicaSpecExecute Ray distributed computing jobs with cluster configuration and resource management.
class RayJob:
"""Ray distributed computing job configuration."""
ray_cluster: RayCluster
runtime_env: str
ttl_seconds_after_finished: int
shutdown_after_job_finishes: bool
enable_autoscaling: bool
class RayCluster:
"""Ray cluster configuration."""
head_group_spec: HeadGroupSpec
worker_group_spec: list[WorkerGroupSpec]
enable_autoscaling: bool
autoscaler_options: AutoscalerOptions
class HeadGroupSpec:
"""Ray head node specification."""
compute_template: str
image: str
service_type: str
enable_ingress: bool
ray_start_params: dict[str, str]
class WorkerGroupSpec:
"""Ray worker group specification."""
group_name: str
compute_template: str
image: str
replicas: int
min_replicas: int
max_replicas: int
ray_start_params: dict[str, str]
class AutoscalerOptions:
"""Ray autoscaler configuration."""
upscaling_speed: float
downscaling_speed: float
idle_timeout_seconds: intExecute Dask distributed computing jobs with scheduler and worker configuration.
class DaskJob:
"""Dask distributed computing job configuration."""
scheduler: DaskScheduler
workers: DaskWorkerGroup
class DaskScheduler:
"""Dask scheduler configuration."""
image: str
resources: k8s.ResourceRequirements
cluster: DaskCluster
class DaskWorkerGroup:
"""Dask worker group configuration."""
number_of_workers: int
image: str
resources: k8s.ResourceRequirements
class DaskCluster:
"""Dask cluster configuration."""
n_workers: int
threads_per_worker: int
scheduler_resources: k8s.ResourceRequirements
worker_resources: k8s.ResourceRequirementsExecute array jobs with parallel task execution and success criteria configuration.
class ArrayJob:
"""Array job configuration for parallel execution."""
parallelism: int
size: int
min_successes: int
min_success_ratio: float
success_policy: SuccessPolicy
class SuccessPolicy:
"""Success policy for array jobs."""
min_successes: int
min_success_ratio: floatExecute SQL queries against various database systems with dialect-specific support.
class PrestoJob:
"""Presto SQL query job configuration."""
statement: str
query_properties: dict[str, str]
routing_group: str
catalog: str
schema: str
class QuboleJob:
"""Qubole data platform job configuration."""
tags: list[str]
cluster_label: str
sdk_version: strTasks that wait for external conditions or resources before proceeding.
class WaitableInterface:
"""Interface for tasks that wait for external conditions."""
wakeup_policy: WakeupPolicy
sleep_policy: SleepPolicy
class WakeupPolicy:
"""Policy for waking up waiting tasks."""
pass
class SleepPolicy:
"""Policy for sleeping tasks."""
passDeep integration with Kubeflow for machine learning workflow orchestration.
# Kubeflow TensorFlow Job
class TensorFlowJob:
"""Kubeflow TensorFlow training job."""
workers: int
ps_replicas: DistributedTensorFlowTrainingReplicaSpec
chief_replicas: DistributedTensorFlowTrainingReplicaSpec
worker_replicas: DistributedTensorFlowTrainingReplicaSpec
evaluator_replicas: DistributedTensorFlowTrainingReplicaSpec
run_policy: RunPolicy
# Kubeflow PyTorch Job
class PyTorchJob:
"""Kubeflow PyTorch training job."""
workers: int
master_replicas: DistributedPyTorchTrainingReplicaSpec
worker_replicas: DistributedPyTorchTrainingReplicaSpec
# Kubeflow MPI Job
class MpiJob:
"""Kubeflow MPI distributed computing job."""
slots: int
replicas: int
launcher_replicas: MpiReplicaSpec
worker_replicas: MpiReplicaSpec
run_policy: RunPolicyfrom flyteidl.plugins import spark_pb2
# Configure Spark application
spark_job = spark_pb2.SparkJob(
spark_type=spark_pb2.SparkType.PYTHON,
application_file="s3://my-bucket/spark-app.py",
spark_conf={
"spark.executor.memory": "4g",
"spark.executor.cores": "2",
"spark.executor.instances": "10",
"spark.driver.memory": "2g",
"spark.driver.cores": "1",
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true"
}
)
# Use in task template custom field
task_template = TaskTemplate(
id=task_id,
type="spark",
interface=interface,
custom=MessageToDict(spark_job)
)from flyteidl.plugins.kubeflow import pytorch_pb2
# Configure PyTorch distributed training
pytorch_job = pytorch_pb2.PyTorchJob(
workers=4,
master_replicas=pytorch_pb2.DistributedPyTorchTrainingReplicaSpec(
replicas=1,
image="pytorch/pytorch:1.12.0-cuda11.3-cudnn8-runtime",
resources=k8s_pb2.ResourceRequirements(
requests={
"nvidia.com/gpu": "1",
"cpu": "2",
"memory": "8Gi"
}
)
),
worker_replicas=pytorch_pb2.DistributedPyTorchTrainingReplicaSpec(
replicas=3,
image="pytorch/pytorch:1.12.0-cuda11.3-cudnn8-runtime",
resources=k8s_pb2.ResourceRequirements(
requests={
"nvidia.com/gpu": "1",
"cpu": "2",
"memory": "8Gi"
}
)
)
)from flyteidl.plugins import ray_pb2
# Configure Ray cluster
ray_job = ray_pb2.RayJob(
ray_cluster=ray_pb2.RayCluster(
head_group_spec=ray_pb2.HeadGroupSpec(
compute_template="head-template",
image="rayproject/ray:2.0.0",
ray_start_params={
"dashboard-host": "0.0.0.0",
"metrics-export-port": "8080"
}
),
worker_group_spec=[
ray_pb2.WorkerGroupSpec(
group_name="workers",
compute_template="worker-template",
image="rayproject/ray:2.0.0",
replicas=5,
min_replicas=2,
max_replicas=10,
ray_start_params={
"metrics-export-port": "8080"
}
)
],
enable_autoscaling=True,
autoscaler_options=ray_pb2.AutoscalerOptions(
upscaling_speed=1.0,
downscaling_speed=0.5,
idle_timeout_seconds=60
)
),
runtime_env='{"pip": ["pandas", "numpy"]}',
enable_autoscaling=True,
shutdown_after_job_finishes=True
)from flyteidl.plugins import array_job_pb2
# Configure array job for parallel processing
array_job = array_job_pb2.ArrayJob(
parallelism=10, # Maximum concurrent executions
size=100, # Total number of subjobs
min_successes=90, # Minimum successful completions
min_success_ratio=0.9, # Minimum success ratio
success_policy=array_job_pb2.SuccessPolicy(
min_successes=90,
min_success_ratio=0.9
)
)from flyteidl.plugins import presto_pb2
# Configure Presto query
presto_job = presto_pb2.PrestoJob(
statement="""
SELECT customer_id, COUNT(*) as order_count
FROM orders
WHERE order_date >= DATE '2023-01-01'
GROUP BY customer_id
ORDER BY order_count DESC
LIMIT 100
""",
query_properties={
"query.max-memory": "10GB",
"query.max-memory-per-node": "2GB"
},
routing_group="batch",
catalog="hive",
schema="analytics"
)from flyteidl.plugins import dask_pb2
# Configure Dask job
dask_job = dask_pb2.DaskJob(
scheduler=dask_pb2.DaskScheduler(
image="daskdev/dask:latest",
resources=k8s_pb2.ResourceRequirements(
requests={
"cpu": "1",
"memory": "2Gi"
}
)
),
workers=dask_pb2.DaskWorkerGroup(
number_of_workers=5,
image="daskdev/dask:latest",
resources=k8s_pb2.ResourceRequirements(
requests={
"cpu": "2",
"memory": "4Gi"
}
)
)
)from flyteidl.plugins.kubeflow import tensorflow_pb2
# Configure TensorFlow distributed training
tensorflow_job = tensorflow_pb2.TensorFlowJob(
workers=4,
ps_replicas=tensorflow_pb2.DistributedTensorFlowTrainingReplicaSpec(
replicas=2,
image="tensorflow/tensorflow:2.8.0-gpu",
resources=k8s_pb2.ResourceRequirements(
requests={
"cpu": "2",
"memory": "4Gi"
}
)
),
chief_replicas=tensorflow_pb2.DistributedTensorFlowTrainingReplicaSpec(
replicas=1,
image="tensorflow/tensorflow:2.8.0-gpu",
resources=k8s_pb2.ResourceRequirements(
requests={
"nvidia.com/gpu": "1",
"cpu": "4",
"memory": "8Gi"
}
)
),
worker_replicas=tensorflow_pb2.DistributedTensorFlowTrainingReplicaSpec(
replicas=3,
image="tensorflow/tensorflow:2.8.0-gpu",
resources=k8s_pb2.ResourceRequirements(
requests={
"nvidia.com/gpu": "1",
"cpu": "4",
"memory": "8Gi"
}
)
)
)Install with Tessl CLI
npx tessl i tessl/pypi-flyteidl