Prefect integrations with the Dask execution framework for parallel and distributed computing.
npx @tessl/cli install tessl/pypi-prefect-dask@0.3.0Prefect integrations with the Dask execution framework for parallel and distributed computing. This package enables Prefect flows to execute tasks using Dask's distributed computing capabilities, providing scalable task execution through the DaskTaskRunner and utilities for managing Dask clients within Prefect workflows.
pip install prefect-daskfrom prefect_dask import DaskTaskRunner, PrefectDaskClient, get_dask_client, get_async_dask_clientModule-specific imports:
from prefect_dask.task_runners import DaskTaskRunner, PrefectDaskFuture
from prefect_dask.client import PrefectDaskClient
from prefect_dask.utils import get_dask_client, get_async_dask_clientimport time
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def compute_task(x):
time.sleep(1) # Simulate work
return x * 2
@flow(task_runner=DaskTaskRunner())
def parallel_flow():
# Submit multiple tasks for parallel execution
futures = []
for i in range(5):
futures.append(compute_task.submit(i))
# Collect results
results = [future.result() for future in futures]
return results
# Execute with local Dask cluster
if __name__ == "__main__":
results = parallel_flow()
print(results) # [0, 2, 4, 6, 8]Using Dask clients within tasks:
import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client
@task
def process_data():
with get_dask_client() as client:
# Use dask operations within the task
df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
result = client.compute(df.describe()).result()
return result
@flow(task_runner=DaskTaskRunner())
def data_processing_flow():
return process_data()The DaskTaskRunner enables parallel and distributed execution of Prefect tasks using Dask's distributed computing framework. It supports local clusters, remote clusters, and various Dask cluster configurations.
class DaskTaskRunner:
def __init__(
self,
cluster: Optional[distributed.deploy.cluster.Cluster] = None,
address: Optional[str] = None,
cluster_class: Union[str, Callable[[], distributed.deploy.cluster.Cluster], None] = None,
cluster_kwargs: Optional[dict[str, Any]] = None,
adapt_kwargs: Optional[dict[str, Any]] = None,
client_kwargs: Optional[dict[str, Any]] = None,
performance_report_path: Optional[str] = None,
):
"""
A parallel task runner that submits tasks to the dask.distributed scheduler.
Parameters:
- cluster: Currently running dask cluster object
- address: Address of currently running dask scheduler
- cluster_class: The cluster class to use when creating temporary cluster
- cluster_kwargs: Additional kwargs to pass to cluster_class
- adapt_kwargs: Additional kwargs to pass to cluster.adapt for adaptive scaling
- client_kwargs: Additional kwargs for distributed.Client creation
- performance_report_path: Path where Dask performance report will be saved
"""
def submit(
self,
task: Task,
parameters: dict[str, Any],
wait_for: Iterable[PrefectDaskFuture] | None = None,
dependencies: dict[str, Set[RunInput]] | None = None,
) -> PrefectDaskFuture:
"""
Submit a task for execution on the Dask cluster.
Parameters:
- task: The Prefect task to execute
- parameters: Task parameters as key-value pairs
- wait_for: Other futures to wait for before execution
- dependencies: Task run dependencies
Returns:
PrefectDaskFuture: Future representing the task execution
"""
def map(
self,
task: Task,
parameters: dict[str, Any],
wait_for: Iterable[PrefectFuture[Any]] | None = None,
) -> PrefectFutureList[PrefectDaskFuture]:
"""
Map a task over multiple parameter sets for parallel execution.
Parameters:
- task: The Prefect task to map
- parameters: Dictionary with lists of parameters to map over
- wait_for: Other futures to wait for before execution
Returns:
PrefectFutureList: List of futures representing mapped task executions
"""
@property
def client(self) -> PrefectDaskClient:
"""Get the Dask client for the task runner."""
def duplicate(self) -> DaskTaskRunner:
"""Create a new instance with the same settings."""PrefectDaskClient extends the standard Dask distributed.Client to handle Prefect-specific task submission and execution patterns.
class PrefectDaskClient(distributed.Client):
def submit(
self,
func,
*args,
key=None,
workers=None,
resources=None,
retries=None,
priority=0,
fifo_timeout="100 ms",
allow_other_workers=False,
actor=False,
actors=False,
pure=True,
**kwargs,
):
"""
Submit a function or Prefect task for execution.
When func is a Prefect Task, automatically handles task context,
dependencies, and return types. Otherwise behaves like standard
distributed.Client.submit().
Parameters:
- func: Function or Prefect Task to execute
- args: Positional arguments for the function
- key: Unique identifier for the task
- workers: Specific workers to run on
- resources: Resource requirements
- retries: Number of retries on failure
- priority: Task priority (higher = more important)
- fifo_timeout: Time to wait in scheduler queue
- allow_other_workers: Allow execution on other workers
- actor: Create as an actor
- actors: Legacy actor parameter
- pure: Whether function is pure (deterministic)
- kwargs: Additional keyword arguments
Returns:
distributed.Future: Future representing the execution
"""
def map(
self,
func,
*iterables,
key=None,
workers=None,
retries=None,
resources=None,
priority=0,
allow_other_workers=False,
fifo_timeout="100 ms",
actor=False,
actors=False,
pure=True,
batch_size=None,
**kwargs,
):
"""
Map a function or Prefect task over iterables.
When func is a Prefect Task, handles each mapped execution with
proper Prefect context. Otherwise behaves like standard
distributed.Client.map().
Parameters:
- func: Function or Prefect Task to map
- iterables: Iterables to map over
- key: Base key for generated tasks
- workers: Specific workers to run on
- retries: Number of retries on failure
- resources: Resource requirements
- priority: Task priority
- allow_other_workers: Allow execution on other workers
- fifo_timeout: Time to wait in scheduler queue
- actor: Create as actors
- actors: Legacy actor parameter
- pure: Whether function is pure
- batch_size: Number of items per batch
- kwargs: Additional keyword arguments
Returns:
List[distributed.Future]: List of futures for mapped executions
"""PrefectDaskFuture wraps Dask distributed.Future objects to provide Prefect-compatible future interface with proper state handling.
class PrefectDaskFuture(PrefectWrappedFuture):
def __init__(self, task_run_id: UUID, wrapped_future: distributed.Future):
"""
A Prefect future that wraps a distributed.Future.
Parameters:
- task_run_id: Prefect task run identifier
- wrapped_future: The underlying Dask distributed.Future
"""
def wait(self, timeout: Optional[float] = None) -> None:
"""
Wait for the future to complete.
Parameters:
- timeout: Maximum time to wait in seconds
"""
def result(
self,
timeout: Optional[float] = None,
raise_on_failure: bool = True,
):
"""
Get the result of the future.
Parameters:
- timeout: Maximum time to wait for result in seconds
- raise_on_failure: Whether to raise exception on task failure
Returns:
The task result
Raises:
- TimeoutError: If timeout is reached before completion
- Exception: Task execution exception if raise_on_failure=True
"""Context managers for obtaining temporary Dask clients within Prefect tasks and flows, supporting both synchronous and asynchronous execution patterns.
def get_dask_client(
timeout: Optional[Union[int, float, str, timedelta]] = None,
**client_kwargs: Dict[str, Any],
) -> Generator[distributed.Client, None, None]:
"""
Context manager yielding a temporary synchronous dask client.
Automatically configures client based on the current Prefect context
(task or flow execution). Useful for parallelizing operations on dask
collections within Prefect tasks.
Parameters:
- timeout: Connection timeout (no effect in flow contexts)
- client_kwargs: Additional keyword arguments for distributed.Client
Yields:
distributed.Client: Temporary synchronous dask client
Example:
```python
@task
def compute_task():
with get_dask_client(timeout="120s") as client:
df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
result = client.compute(df.describe()).result()
return result
```
"""
async def get_async_dask_client(
timeout: Optional[Union[int, float, str, timedelta]] = None,
**client_kwargs: Dict[str, Any],
) -> AsyncGenerator[distributed.Client, None]:
"""
Async context manager yielding a temporary asynchronous dask client.
Automatically configures client based on the current Prefect context
for async task or flow execution. Useful for parallelizing operations
on dask collections within async Prefect tasks.
Parameters:
- timeout: Connection timeout (no effect in flow contexts)
- client_kwargs: Additional keyword arguments for distributed.Client
Yields:
distributed.Client: Temporary asynchronous dask client
Example:
```python
@task
async def async_compute_task():
async with get_async_dask_client(timeout="120s") as client:
df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
result = await client.compute(df.describe())
return result
```
"""from typing import Any, Dict, Set, Optional, Union, Callable, Iterable, Generator, AsyncGenerator
from datetime import timedelta
from uuid import UUID
import distributed
import distributed.deploy.cluster
from prefect.client.schemas.objects import RunInput, State
from prefect.futures import PrefectFuture, PrefectFutureList, PrefectWrappedFuture
from prefect.tasks import Task
from prefect.task_runners import TaskRunnerfrom prefect_dask import DaskTaskRunner
# Create task runner with custom local cluster
task_runner = DaskTaskRunner(
cluster_kwargs={
"n_workers": 4,
"threads_per_worker": 2,
"memory_limit": "2GB",
"dashboard_address": ":8787"
}
)# Connect to existing cluster by address
task_runner = DaskTaskRunner(address="192.168.1.100:8786")
# Connect to existing cluster object
import distributed
cluster = distributed.LocalCluster()
task_runner = DaskTaskRunner(cluster=cluster)# Using dask-cloudprovider for AWS Fargate
task_runner = DaskTaskRunner(
cluster_class="dask_cloudprovider.FargateCluster",
cluster_kwargs={
"image": "prefecthq/prefect:latest",
"n_workers": 10,
"fargate_use_private_ip": True
}
)# Enable adaptive scaling
task_runner = DaskTaskRunner(
cluster_kwargs={"n_workers": 2},
adapt_kwargs={
"minimum": 1,
"maximum": 20,
"target_duration": "30s"
}
)