or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-prefect-dask

Prefect integrations with the Dask execution framework for parallel and distributed computing.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/prefect-dask@0.3.x

To install, run

npx @tessl/cli install tessl/pypi-prefect-dask@0.3.0

index.mddocs/

Prefect Dask

Prefect integrations with the Dask execution framework for parallel and distributed computing. This package enables Prefect flows to execute tasks using Dask's distributed computing capabilities, providing scalable task execution through the DaskTaskRunner and utilities for managing Dask clients within Prefect workflows.

Package Information

  • Package Name: prefect-dask
  • Language: Python
  • Installation: pip install prefect-dask

Core Imports

from prefect_dask import DaskTaskRunner, PrefectDaskClient, get_dask_client, get_async_dask_client

Module-specific imports:

from prefect_dask.task_runners import DaskTaskRunner, PrefectDaskFuture
from prefect_dask.client import PrefectDaskClient
from prefect_dask.utils import get_dask_client, get_async_dask_client

Basic Usage

import time
from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def compute_task(x):
    time.sleep(1)  # Simulate work
    return x * 2

@flow(task_runner=DaskTaskRunner())
def parallel_flow():
    # Submit multiple tasks for parallel execution
    futures = []
    for i in range(5):
        futures.append(compute_task.submit(i))
    
    # Collect results
    results = [future.result() for future in futures]
    return results

# Execute with local Dask cluster
if __name__ == "__main__":
    results = parallel_flow()
    print(results)  # [0, 2, 4, 6, 8]

Using Dask clients within tasks:

import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client

@task
def process_data():
    with get_dask_client() as client:
        # Use dask operations within the task
        df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
        result = client.compute(df.describe()).result()
    return result

@flow(task_runner=DaskTaskRunner())
def data_processing_flow():
    return process_data()

Capabilities

Task Runner

The DaskTaskRunner enables parallel and distributed execution of Prefect tasks using Dask's distributed computing framework. It supports local clusters, remote clusters, and various Dask cluster configurations.

class DaskTaskRunner:
    def __init__(
        self,
        cluster: Optional[distributed.deploy.cluster.Cluster] = None,
        address: Optional[str] = None,
        cluster_class: Union[str, Callable[[], distributed.deploy.cluster.Cluster], None] = None,
        cluster_kwargs: Optional[dict[str, Any]] = None,
        adapt_kwargs: Optional[dict[str, Any]] = None,
        client_kwargs: Optional[dict[str, Any]] = None,
        performance_report_path: Optional[str] = None,
    ):
        """
        A parallel task runner that submits tasks to the dask.distributed scheduler.

        Parameters:
        - cluster: Currently running dask cluster object
        - address: Address of currently running dask scheduler 
        - cluster_class: The cluster class to use when creating temporary cluster
        - cluster_kwargs: Additional kwargs to pass to cluster_class
        - adapt_kwargs: Additional kwargs to pass to cluster.adapt for adaptive scaling
        - client_kwargs: Additional kwargs for distributed.Client creation
        - performance_report_path: Path where Dask performance report will be saved
        """

    def submit(
        self,
        task: Task,
        parameters: dict[str, Any],
        wait_for: Iterable[PrefectDaskFuture] | None = None,
        dependencies: dict[str, Set[RunInput]] | None = None,
    ) -> PrefectDaskFuture:
        """
        Submit a task for execution on the Dask cluster.

        Parameters:
        - task: The Prefect task to execute
        - parameters: Task parameters as key-value pairs
        - wait_for: Other futures to wait for before execution
        - dependencies: Task run dependencies

        Returns:
        PrefectDaskFuture: Future representing the task execution
        """

    def map(
        self,
        task: Task,
        parameters: dict[str, Any],
        wait_for: Iterable[PrefectFuture[Any]] | None = None,
    ) -> PrefectFutureList[PrefectDaskFuture]:
        """
        Map a task over multiple parameter sets for parallel execution.

        Parameters:
        - task: The Prefect task to map
        - parameters: Dictionary with lists of parameters to map over
        - wait_for: Other futures to wait for before execution

        Returns:
        PrefectFutureList: List of futures representing mapped task executions
        """

    @property
    def client(self) -> PrefectDaskClient:
        """Get the Dask client for the task runner."""

    def duplicate(self) -> DaskTaskRunner:
        """Create a new instance with the same settings."""

Dask Client Integration

PrefectDaskClient extends the standard Dask distributed.Client to handle Prefect-specific task submission and execution patterns.

class PrefectDaskClient(distributed.Client):
    def submit(
        self,
        func,
        *args,
        key=None,
        workers=None,
        resources=None,
        retries=None,
        priority=0,
        fifo_timeout="100 ms",
        allow_other_workers=False,
        actor=False,
        actors=False,
        pure=True,
        **kwargs,
    ):
        """
        Submit a function or Prefect task for execution.

        When func is a Prefect Task, automatically handles task context,
        dependencies, and return types. Otherwise behaves like standard
        distributed.Client.submit().

        Parameters:
        - func: Function or Prefect Task to execute
        - args: Positional arguments for the function
        - key: Unique identifier for the task
        - workers: Specific workers to run on
        - resources: Resource requirements
        - retries: Number of retries on failure
        - priority: Task priority (higher = more important)
        - fifo_timeout: Time to wait in scheduler queue
        - allow_other_workers: Allow execution on other workers
        - actor: Create as an actor
        - actors: Legacy actor parameter
        - pure: Whether function is pure (deterministic)
        - kwargs: Additional keyword arguments

        Returns:
        distributed.Future: Future representing the execution
        """

    def map(
        self,
        func,
        *iterables,
        key=None,
        workers=None,
        retries=None,
        resources=None,
        priority=0,
        allow_other_workers=False,
        fifo_timeout="100 ms",
        actor=False,
        actors=False,
        pure=True,
        batch_size=None,
        **kwargs,
    ):
        """
        Map a function or Prefect task over iterables.

        When func is a Prefect Task, handles each mapped execution with
        proper Prefect context. Otherwise behaves like standard
        distributed.Client.map().

        Parameters:
        - func: Function or Prefect Task to map
        - iterables: Iterables to map over
        - key: Base key for generated tasks
        - workers: Specific workers to run on
        - retries: Number of retries on failure
        - resources: Resource requirements
        - priority: Task priority
        - allow_other_workers: Allow execution on other workers
        - fifo_timeout: Time to wait in scheduler queue
        - actor: Create as actors
        - actors: Legacy actor parameter
        - pure: Whether function is pure
        - batch_size: Number of items per batch
        - kwargs: Additional keyword arguments

        Returns:
        List[distributed.Future]: List of futures for mapped executions
        """

Future Handling

PrefectDaskFuture wraps Dask distributed.Future objects to provide Prefect-compatible future interface with proper state handling.

class PrefectDaskFuture(PrefectWrappedFuture):
    def __init__(self, task_run_id: UUID, wrapped_future: distributed.Future):
        """
        A Prefect future that wraps a distributed.Future.

        Parameters:
        - task_run_id: Prefect task run identifier
        - wrapped_future: The underlying Dask distributed.Future
        """

    def wait(self, timeout: Optional[float] = None) -> None:
        """
        Wait for the future to complete.

        Parameters:
        - timeout: Maximum time to wait in seconds
        """

    def result(
        self,
        timeout: Optional[float] = None,
        raise_on_failure: bool = True,
    ):
        """
        Get the result of the future.

        Parameters:
        - timeout: Maximum time to wait for result in seconds
        - raise_on_failure: Whether to raise exception on task failure

        Returns:
        The task result

        Raises:
        - TimeoutError: If timeout is reached before completion
        - Exception: Task execution exception if raise_on_failure=True
        """

Client Utilities

Context managers for obtaining temporary Dask clients within Prefect tasks and flows, supporting both synchronous and asynchronous execution patterns.

def get_dask_client(
    timeout: Optional[Union[int, float, str, timedelta]] = None,
    **client_kwargs: Dict[str, Any],
) -> Generator[distributed.Client, None, None]:
    """
    Context manager yielding a temporary synchronous dask client.

    Automatically configures client based on the current Prefect context
    (task or flow execution). Useful for parallelizing operations on dask
    collections within Prefect tasks.

    Parameters:
    - timeout: Connection timeout (no effect in flow contexts)
    - client_kwargs: Additional keyword arguments for distributed.Client

    Yields:
    distributed.Client: Temporary synchronous dask client

    Example:
    ```python
    @task
    def compute_task():
        with get_dask_client(timeout="120s") as client:
            df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
            result = client.compute(df.describe()).result()
        return result
    ```
    """

async def get_async_dask_client(
    timeout: Optional[Union[int, float, str, timedelta]] = None,
    **client_kwargs: Dict[str, Any],
) -> AsyncGenerator[distributed.Client, None]:
    """
    Async context manager yielding a temporary asynchronous dask client.

    Automatically configures client based on the current Prefect context
    for async task or flow execution. Useful for parallelizing operations
    on dask collections within async Prefect tasks.

    Parameters:
    - timeout: Connection timeout (no effect in flow contexts)  
    - client_kwargs: Additional keyword arguments for distributed.Client

    Yields:
    distributed.Client: Temporary asynchronous dask client

    Example:
    ```python
    @task
    async def async_compute_task():
        async with get_async_dask_client(timeout="120s") as client:
            df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
            result = await client.compute(df.describe())
        return result
    ```
    """

Types

from typing import Any, Dict, Set, Optional, Union, Callable, Iterable, Generator, AsyncGenerator
from datetime import timedelta
from uuid import UUID
import distributed
import distributed.deploy.cluster
from prefect.client.schemas.objects import RunInput, State
from prefect.futures import PrefectFuture, PrefectFutureList, PrefectWrappedFuture
from prefect.tasks import Task
from prefect.task_runners import TaskRunner

Configuration Examples

Local Cluster with Custom Settings

from prefect_dask import DaskTaskRunner

# Create task runner with custom local cluster
task_runner = DaskTaskRunner(
    cluster_kwargs={
        "n_workers": 4,
        "threads_per_worker": 2,
        "memory_limit": "2GB",
        "dashboard_address": ":8787"
    }
)

Remote Cluster Connection

# Connect to existing cluster by address
task_runner = DaskTaskRunner(address="192.168.1.100:8786")

# Connect to existing cluster object
import distributed
cluster = distributed.LocalCluster()
task_runner = DaskTaskRunner(cluster=cluster)

Cloud Provider Integration

# Using dask-cloudprovider for AWS Fargate
task_runner = DaskTaskRunner(
    cluster_class="dask_cloudprovider.FargateCluster",
    cluster_kwargs={
        "image": "prefecthq/prefect:latest",
        "n_workers": 10,
        "fargate_use_private_ip": True
    }
)

Adaptive Scaling

# Enable adaptive scaling
task_runner = DaskTaskRunner(
    cluster_kwargs={"n_workers": 2},
    adapt_kwargs={
        "minimum": 1,
        "maximum": 20,
        "target_duration": "30s"
    }
)