CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-flyteidl

IDL for Flyte Platform containing protobuf specifications, gRPC API definitions, and generated clients for multiple languages

Pending
Overview
Eval results
Files

plugins.mddocs/

Plugin System

Extensible plugin framework supporting diverse compute requirements including Apache Spark, machine learning frameworks (TensorFlow, PyTorch), distributed computing (Dask, Ray), database queries, and specialized execution patterns. The plugin system enables Flyte to integrate with various execution backends while maintaining a consistent interface.

Capabilities

Apache Spark Jobs

Execute Apache Spark applications with comprehensive configuration support for different Spark application types and cluster management.

class SparkJob:
    """Apache Spark job configuration."""
    spark_conf: dict[str, str]
    application_file: str
    executor_path: str
    main_application_file: str
    main_class: str
    spark_type: SparkType
    databricks_conf: DatabricksConf
    databricks_token: str
    databricks_instance: str

class SparkType:
    """Spark application type enumeration."""
    PYTHON = 0
    JAVA = 1
    SCALA = 2
    R = 3

class DatabricksConf:
    """Databricks-specific configuration."""
    databricks_token: str
    databricks_instance: str
    tags: dict[str, str]

PyTorch Distributed Training

Execute PyTorch distributed training jobs with support for various distributed training strategies.

class PyTorchJob:
    """PyTorch distributed training job configuration."""
    workers: int
    master_replicas: DistributedPyTorchTrainingReplicaSpec
    worker_replicas: DistributedPyTorchTrainingReplicaSpec

class DistributedPyTorchTrainingReplicaSpec:
    """Replica specification for PyTorch distributed training."""
    replicas: int
    image: str
    resources: k8s.ResourceRequirements
    restart_policy: RestartPolicy
    common: CommonReplicaSpec

class RestartPolicy:
    """Restart policy enumeration."""
    NEVER = 0
    ON_FAILURE = 1
    ALWAYS = 2

class CommonReplicaSpec:
    """Common replica specification."""
    replicas: int
    template: k8s.PodTemplateSpec
    restart_policy: RestartPolicy

TensorFlow Distributed Training

Execute TensorFlow distributed training jobs with parameter server and worker configuration.

class TensorFlowJob:
    """TensorFlow distributed training job configuration."""
    workers: int
    ps_replicas: DistributedTensorFlowTrainingReplicaSpec
    chief_replicas: DistributedTensorFlowTrainingReplicaSpec
    worker_replicas: DistributedTensorFlowTrainingReplicaSpec
    evaluator_replicas: DistributedTensorFlowTrainingReplicaSpec
    run_policy: RunPolicy

class DistributedTensorFlowTrainingReplicaSpec:
    """Replica specification for TensorFlow distributed training."""
    replicas: int
    image: str
    resources: k8s.ResourceRequirements
    restart_policy: RestartPolicy
    common: CommonReplicaSpec

class RunPolicy:
    """Run policy for training jobs."""
    clean_pod_policy: CleanPodPolicy
    ttl_seconds_after_finished: int
    active_deadline_seconds: int
    backoff_limit: int

class CleanPodPolicy:
    """Pod cleanup policy enumeration."""
    NONE = 0
    ALL = 1
    RUNNING = 2
    SUCCEEDED = 3

MPI Jobs

Execute MPI (Message Passing Interface) distributed computing jobs for high-performance computing workloads.

class MpiJob:
    """MPI distributed computing job configuration."""
    slots: int
    replicas: int
    launcher_replicas: MpiReplicaSpec
    worker_replicas: MpiReplicaSpec
    run_policy: RunPolicy

class MpiReplicaSpec:
    """MPI replica specification."""
    replicas: int
    image: str
    resources: k8s.ResourceRequirements
    restart_policy: RestartPolicy
    common: CommonReplicaSpec

Ray Distributed Computing

Execute Ray distributed computing jobs with cluster configuration and resource management.

class RayJob:
    """Ray distributed computing job configuration."""
    ray_cluster: RayCluster
    runtime_env: str
    ttl_seconds_after_finished: int
    shutdown_after_job_finishes: bool
    enable_autoscaling: bool

class RayCluster:
    """Ray cluster configuration."""
    head_group_spec: HeadGroupSpec
    worker_group_spec: list[WorkerGroupSpec]
    enable_autoscaling: bool
    autoscaler_options: AutoscalerOptions

class HeadGroupSpec:
    """Ray head node specification."""
    compute_template: str
    image: str
    service_type: str
    enable_ingress: bool
    ray_start_params: dict[str, str]

class WorkerGroupSpec:
    """Ray worker group specification."""
    group_name: str
    compute_template: str
    image: str
    replicas: int
    min_replicas: int
    max_replicas: int
    ray_start_params: dict[str, str]

class AutoscalerOptions:
    """Ray autoscaler configuration."""
    upscaling_speed: float
    downscaling_speed: float
    idle_timeout_seconds: int

Dask Distributed Computing

Execute Dask distributed computing jobs with scheduler and worker configuration.

class DaskJob:
    """Dask distributed computing job configuration."""
    scheduler: DaskScheduler
    workers: DaskWorkerGroup

class DaskScheduler:
    """Dask scheduler configuration."""
    image: str
    resources: k8s.ResourceRequirements
    cluster: DaskCluster

class DaskWorkerGroup: 
    """Dask worker group configuration."""
    number_of_workers: int
    image: str
    resources: k8s.ResourceRequirements

class DaskCluster:
    """Dask cluster configuration."""
    n_workers: int
    threads_per_worker: int
    scheduler_resources: k8s.ResourceRequirements
    worker_resources: k8s.ResourceRequirements

Array Jobs

Execute array jobs with parallel task execution and success criteria configuration.

class ArrayJob:
    """Array job configuration for parallel execution."""
    parallelism: int
    size: int
    min_successes: int
    min_success_ratio: float
    success_policy: SuccessPolicy

class SuccessPolicy:
    """Success policy for array jobs."""
    min_successes: int
    min_success_ratio: float

Database Query Jobs

Execute SQL queries against various database systems with dialect-specific support.

class PrestoJob:
    """Presto SQL query job configuration."""
    statement: str
    query_properties: dict[str, str]
    routing_group: str
    catalog: str
    schema: str

class QuboleJob:
    """Qubole data platform job configuration."""
    tags: list[str]
    cluster_label: str
    sdk_version: str

Waitable Tasks

Tasks that wait for external conditions or resources before proceeding.

class WaitableInterface:
    """Interface for tasks that wait for external conditions."""
    wakeup_policy: WakeupPolicy
    sleep_policy: SleepPolicy

class WakeupPolicy:
    """Policy for waking up waiting tasks."""
    pass

class SleepPolicy:
    """Policy for sleeping tasks."""
    pass

Kubeflow Integration

Deep integration with Kubeflow for machine learning workflow orchestration.

# Kubeflow TensorFlow Job
class TensorFlowJob:
    """Kubeflow TensorFlow training job."""
    workers: int
    ps_replicas: DistributedTensorFlowTrainingReplicaSpec
    chief_replicas: DistributedTensorFlowTrainingReplicaSpec
    worker_replicas: DistributedTensorFlowTrainingReplicaSpec
    evaluator_replicas: DistributedTensorFlowTrainingReplicaSpec
    run_policy: RunPolicy

# Kubeflow PyTorch Job  
class PyTorchJob:
    """Kubeflow PyTorch training job."""
    workers: int
    master_replicas: DistributedPyTorchTrainingReplicaSpec
    worker_replicas: DistributedPyTorchTrainingReplicaSpec

# Kubeflow MPI Job
class MpiJob:
    """Kubeflow MPI distributed computing job."""
    slots: int
    replicas: int
    launcher_replicas: MpiReplicaSpec
    worker_replicas: MpiReplicaSpec
    run_policy: RunPolicy

Usage Examples

Spark Job Configuration

from flyteidl.plugins import spark_pb2

# Configure Spark application
spark_job = spark_pb2.SparkJob(
    spark_type=spark_pb2.SparkType.PYTHON,
    application_file="s3://my-bucket/spark-app.py",
    spark_conf={
        "spark.executor.memory": "4g",
        "spark.executor.cores": "2",
        "spark.executor.instances": "10",
        "spark.driver.memory": "2g",
        "spark.driver.cores": "1",
        "spark.sql.adaptive.enabled": "true",
        "spark.sql.adaptive.coalescePartitions.enabled": "true"
    }
)

# Use in task template custom field
task_template = TaskTemplate(
    id=task_id,
    type="spark",
    interface=interface,
    custom=MessageToDict(spark_job)
)

PyTorch Distributed Training

from flyteidl.plugins.kubeflow import pytorch_pb2

# Configure PyTorch distributed training
pytorch_job = pytorch_pb2.PyTorchJob(
    workers=4,
    master_replicas=pytorch_pb2.DistributedPyTorchTrainingReplicaSpec(
        replicas=1,
        image="pytorch/pytorch:1.12.0-cuda11.3-cudnn8-runtime",
        resources=k8s_pb2.ResourceRequirements(
            requests={
                "nvidia.com/gpu": "1",
                "cpu": "2",
                "memory": "8Gi"
            }
        )
    ),
    worker_replicas=pytorch_pb2.DistributedPyTorchTrainingReplicaSpec(
        replicas=3,
        image="pytorch/pytorch:1.12.0-cuda11.3-cudnn8-runtime",
        resources=k8s_pb2.ResourceRequirements(
            requests={
                "nvidia.com/gpu": "1", 
                "cpu": "2",
                "memory": "8Gi"
            }
        )
    )
)

Ray Cluster Configuration

from flyteidl.plugins import ray_pb2

# Configure Ray cluster
ray_job = ray_pb2.RayJob(
    ray_cluster=ray_pb2.RayCluster(
        head_group_spec=ray_pb2.HeadGroupSpec(
            compute_template="head-template",
            image="rayproject/ray:2.0.0",
            ray_start_params={
                "dashboard-host": "0.0.0.0",
                "metrics-export-port": "8080"
            }
        ),
        worker_group_spec=[
            ray_pb2.WorkerGroupSpec(
                group_name="workers",
                compute_template="worker-template", 
                image="rayproject/ray:2.0.0",
                replicas=5,
                min_replicas=2,
                max_replicas=10,
                ray_start_params={
                    "metrics-export-port": "8080"
                }
            )
        ],
        enable_autoscaling=True,
        autoscaler_options=ray_pb2.AutoscalerOptions(
            upscaling_speed=1.0,
            downscaling_speed=0.5,
            idle_timeout_seconds=60
        )
    ),
    runtime_env='{"pip": ["pandas", "numpy"]}',
    enable_autoscaling=True,
    shutdown_after_job_finishes=True
)

Array Job Configuration

from flyteidl.plugins import array_job_pb2

# Configure array job for parallel processing
array_job = array_job_pb2.ArrayJob(
    parallelism=10,  # Maximum concurrent executions
    size=100,        # Total number of subjobs
    min_successes=90,  # Minimum successful completions
    min_success_ratio=0.9,  # Minimum success ratio
    success_policy=array_job_pb2.SuccessPolicy(
        min_successes=90,
        min_success_ratio=0.9
    )
)

SQL Query Jobs

from flyteidl.plugins import presto_pb2

# Configure Presto query
presto_job = presto_pb2.PrestoJob(
    statement="""
        SELECT customer_id, COUNT(*) as order_count
        FROM orders
        WHERE order_date >= DATE '2023-01-01'
        GROUP BY customer_id
        ORDER BY order_count DESC
        LIMIT 100
    """,
    query_properties={
        "query.max-memory": "10GB",
        "query.max-memory-per-node": "2GB"
    },
    routing_group="batch",
    catalog="hive",
    schema="analytics"
)

Dask Distributed Computing

from flyteidl.plugins import dask_pb2

# Configure Dask job
dask_job = dask_pb2.DaskJob(
    scheduler=dask_pb2.DaskScheduler(
        image="daskdev/dask:latest",
        resources=k8s_pb2.ResourceRequirements(
            requests={
                "cpu": "1",
                "memory": "2Gi"
            }
        )
    ),
    workers=dask_pb2.DaskWorkerGroup(
        number_of_workers=5,
        image="daskdev/dask:latest", 
        resources=k8s_pb2.ResourceRequirements(
            requests={
                "cpu": "2",
                "memory": "4Gi"
            }
        )
    )
)

TensorFlow Distributed Training

from flyteidl.plugins.kubeflow import tensorflow_pb2

# Configure TensorFlow distributed training
tensorflow_job = tensorflow_pb2.TensorFlowJob(
    workers=4,
    ps_replicas=tensorflow_pb2.DistributedTensorFlowTrainingReplicaSpec(
        replicas=2,
        image="tensorflow/tensorflow:2.8.0-gpu",
        resources=k8s_pb2.ResourceRequirements(
            requests={
                "cpu": "2",
                "memory": "4Gi"
            }
        )
    ),
    chief_replicas=tensorflow_pb2.DistributedTensorFlowTrainingReplicaSpec(
        replicas=1,
        image="tensorflow/tensorflow:2.8.0-gpu",
        resources=k8s_pb2.ResourceRequirements(
            requests={
                "nvidia.com/gpu": "1",
                "cpu": "4",
                "memory": "8Gi"
            }
        )
    ),
    worker_replicas=tensorflow_pb2.DistributedTensorFlowTrainingReplicaSpec(
        replicas=3,
        image="tensorflow/tensorflow:2.8.0-gpu",
        resources=k8s_pb2.ResourceRequirements(
            requests={
                "nvidia.com/gpu": "1",
                "cpu": "4", 
                "memory": "8Gi"
            }
        )
    )
)

Install with Tessl CLI

npx tessl i tessl/pypi-flyteidl

docs

admin-api.md

core-types.md

data-management.md

index.md

multi-language.md

plugins.md

tasks-workflows.md

tile.json