0
# Prefect Dask
1
2
Prefect integrations with the Dask execution framework for parallel and distributed computing. This package enables Prefect flows to execute tasks using Dask's distributed computing capabilities, providing scalable task execution through the DaskTaskRunner and utilities for managing Dask clients within Prefect workflows.
3
4
## Package Information
5
6
- **Package Name**: prefect-dask
7
- **Language**: Python
8
- **Installation**: `pip install prefect-dask`
9
10
## Core Imports
11
12
```python
13
from prefect_dask import DaskTaskRunner, PrefectDaskClient, get_dask_client, get_async_dask_client
14
```
15
16
Module-specific imports:
17
18
```python
19
from prefect_dask.task_runners import DaskTaskRunner, PrefectDaskFuture
20
from prefect_dask.client import PrefectDaskClient
21
from prefect_dask.utils import get_dask_client, get_async_dask_client
22
```
23
24
## Basic Usage
25
26
```python
27
import time
28
from prefect import flow, task
29
from prefect_dask import DaskTaskRunner
30
31
@task
32
def compute_task(x):
33
time.sleep(1) # Simulate work
34
return x * 2
35
36
@flow(task_runner=DaskTaskRunner())
37
def parallel_flow():
38
# Submit multiple tasks for parallel execution
39
futures = []
40
for i in range(5):
41
futures.append(compute_task.submit(i))
42
43
# Collect results
44
results = [future.result() for future in futures]
45
return results
46
47
# Execute with local Dask cluster
48
if __name__ == "__main__":
49
results = parallel_flow()
50
print(results) # [0, 2, 4, 6, 8]
51
```
52
53
Using Dask clients within tasks:
54
55
```python
56
import dask
57
from prefect import flow, task
58
from prefect_dask import DaskTaskRunner, get_dask_client
59
60
@task
61
def process_data():
62
with get_dask_client() as client:
63
# Use dask operations within the task
64
df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
65
result = client.compute(df.describe()).result()
66
return result
67
68
@flow(task_runner=DaskTaskRunner())
69
def data_processing_flow():
70
return process_data()
71
```
72
73
## Capabilities
74
75
### Task Runner
76
77
The DaskTaskRunner enables parallel and distributed execution of Prefect tasks using Dask's distributed computing framework. It supports local clusters, remote clusters, and various Dask cluster configurations.
78
79
```python { .api }
80
class DaskTaskRunner:
81
def __init__(
82
self,
83
cluster: Optional[distributed.deploy.cluster.Cluster] = None,
84
address: Optional[str] = None,
85
cluster_class: Union[str, Callable[[], distributed.deploy.cluster.Cluster], None] = None,
86
cluster_kwargs: Optional[dict[str, Any]] = None,
87
adapt_kwargs: Optional[dict[str, Any]] = None,
88
client_kwargs: Optional[dict[str, Any]] = None,
89
performance_report_path: Optional[str] = None,
90
):
91
"""
92
A parallel task runner that submits tasks to the dask.distributed scheduler.
93
94
Parameters:
95
- cluster: Currently running dask cluster object
96
- address: Address of currently running dask scheduler
97
- cluster_class: The cluster class to use when creating temporary cluster
98
- cluster_kwargs: Additional kwargs to pass to cluster_class
99
- adapt_kwargs: Additional kwargs to pass to cluster.adapt for adaptive scaling
100
- client_kwargs: Additional kwargs for distributed.Client creation
101
- performance_report_path: Path where Dask performance report will be saved
102
"""
103
104
def submit(
105
self,
106
task: Task,
107
parameters: dict[str, Any],
108
wait_for: Iterable[PrefectDaskFuture] | None = None,
109
dependencies: dict[str, Set[RunInput]] | None = None,
110
) -> PrefectDaskFuture:
111
"""
112
Submit a task for execution on the Dask cluster.
113
114
Parameters:
115
- task: The Prefect task to execute
116
- parameters: Task parameters as key-value pairs
117
- wait_for: Other futures to wait for before execution
118
- dependencies: Task run dependencies
119
120
Returns:
121
PrefectDaskFuture: Future representing the task execution
122
"""
123
124
def map(
125
self,
126
task: Task,
127
parameters: dict[str, Any],
128
wait_for: Iterable[PrefectFuture[Any]] | None = None,
129
) -> PrefectFutureList[PrefectDaskFuture]:
130
"""
131
Map a task over multiple parameter sets for parallel execution.
132
133
Parameters:
134
- task: The Prefect task to map
135
- parameters: Dictionary with lists of parameters to map over
136
- wait_for: Other futures to wait for before execution
137
138
Returns:
139
PrefectFutureList: List of futures representing mapped task executions
140
"""
141
142
@property
143
def client(self) -> PrefectDaskClient:
144
"""Get the Dask client for the task runner."""
145
146
def duplicate(self) -> DaskTaskRunner:
147
"""Create a new instance with the same settings."""
148
```
149
150
### Dask Client Integration
151
152
PrefectDaskClient extends the standard Dask distributed.Client to handle Prefect-specific task submission and execution patterns.
153
154
```python { .api }
155
class PrefectDaskClient(distributed.Client):
156
def submit(
157
self,
158
func,
159
*args,
160
key=None,
161
workers=None,
162
resources=None,
163
retries=None,
164
priority=0,
165
fifo_timeout="100 ms",
166
allow_other_workers=False,
167
actor=False,
168
actors=False,
169
pure=True,
170
**kwargs,
171
):
172
"""
173
Submit a function or Prefect task for execution.
174
175
When func is a Prefect Task, automatically handles task context,
176
dependencies, and return types. Otherwise behaves like standard
177
distributed.Client.submit().
178
179
Parameters:
180
- func: Function or Prefect Task to execute
181
- args: Positional arguments for the function
182
- key: Unique identifier for the task
183
- workers: Specific workers to run on
184
- resources: Resource requirements
185
- retries: Number of retries on failure
186
- priority: Task priority (higher = more important)
187
- fifo_timeout: Time to wait in scheduler queue
188
- allow_other_workers: Allow execution on other workers
189
- actor: Create as an actor
190
- actors: Legacy actor parameter
191
- pure: Whether function is pure (deterministic)
192
- kwargs: Additional keyword arguments
193
194
Returns:
195
distributed.Future: Future representing the execution
196
"""
197
198
def map(
199
self,
200
func,
201
*iterables,
202
key=None,
203
workers=None,
204
retries=None,
205
resources=None,
206
priority=0,
207
allow_other_workers=False,
208
fifo_timeout="100 ms",
209
actor=False,
210
actors=False,
211
pure=True,
212
batch_size=None,
213
**kwargs,
214
):
215
"""
216
Map a function or Prefect task over iterables.
217
218
When func is a Prefect Task, handles each mapped execution with
219
proper Prefect context. Otherwise behaves like standard
220
distributed.Client.map().
221
222
Parameters:
223
- func: Function or Prefect Task to map
224
- iterables: Iterables to map over
225
- key: Base key for generated tasks
226
- workers: Specific workers to run on
227
- retries: Number of retries on failure
228
- resources: Resource requirements
229
- priority: Task priority
230
- allow_other_workers: Allow execution on other workers
231
- fifo_timeout: Time to wait in scheduler queue
232
- actor: Create as actors
233
- actors: Legacy actor parameter
234
- pure: Whether function is pure
235
- batch_size: Number of items per batch
236
- kwargs: Additional keyword arguments
237
238
Returns:
239
List[distributed.Future]: List of futures for mapped executions
240
"""
241
```
242
243
### Future Handling
244
245
PrefectDaskFuture wraps Dask distributed.Future objects to provide Prefect-compatible future interface with proper state handling.
246
247
```python { .api }
248
class PrefectDaskFuture(PrefectWrappedFuture):
249
def __init__(self, task_run_id: UUID, wrapped_future: distributed.Future):
250
"""
251
A Prefect future that wraps a distributed.Future.
252
253
Parameters:
254
- task_run_id: Prefect task run identifier
255
- wrapped_future: The underlying Dask distributed.Future
256
"""
257
258
def wait(self, timeout: Optional[float] = None) -> None:
259
"""
260
Wait for the future to complete.
261
262
Parameters:
263
- timeout: Maximum time to wait in seconds
264
"""
265
266
def result(
267
self,
268
timeout: Optional[float] = None,
269
raise_on_failure: bool = True,
270
):
271
"""
272
Get the result of the future.
273
274
Parameters:
275
- timeout: Maximum time to wait for result in seconds
276
- raise_on_failure: Whether to raise exception on task failure
277
278
Returns:
279
The task result
280
281
Raises:
282
- TimeoutError: If timeout is reached before completion
283
- Exception: Task execution exception if raise_on_failure=True
284
"""
285
```
286
287
### Client Utilities
288
289
Context managers for obtaining temporary Dask clients within Prefect tasks and flows, supporting both synchronous and asynchronous execution patterns.
290
291
```python { .api }
292
def get_dask_client(
293
timeout: Optional[Union[int, float, str, timedelta]] = None,
294
**client_kwargs: Dict[str, Any],
295
) -> Generator[distributed.Client, None, None]:
296
"""
297
Context manager yielding a temporary synchronous dask client.
298
299
Automatically configures client based on the current Prefect context
300
(task or flow execution). Useful for parallelizing operations on dask
301
collections within Prefect tasks.
302
303
Parameters:
304
- timeout: Connection timeout (no effect in flow contexts)
305
- client_kwargs: Additional keyword arguments for distributed.Client
306
307
Yields:
308
distributed.Client: Temporary synchronous dask client
309
310
Example:
311
```python
312
@task
313
def compute_task():
314
with get_dask_client(timeout="120s") as client:
315
df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
316
result = client.compute(df.describe()).result()
317
return result
318
```
319
"""
320
321
async def get_async_dask_client(
322
timeout: Optional[Union[int, float, str, timedelta]] = None,
323
**client_kwargs: Dict[str, Any],
324
) -> AsyncGenerator[distributed.Client, None]:
325
"""
326
Async context manager yielding a temporary asynchronous dask client.
327
328
Automatically configures client based on the current Prefect context
329
for async task or flow execution. Useful for parallelizing operations
330
on dask collections within async Prefect tasks.
331
332
Parameters:
333
- timeout: Connection timeout (no effect in flow contexts)
334
- client_kwargs: Additional keyword arguments for distributed.Client
335
336
Yields:
337
distributed.Client: Temporary asynchronous dask client
338
339
Example:
340
```python
341
@task
342
async def async_compute_task():
343
async with get_async_dask_client(timeout="120s") as client:
344
df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
345
result = await client.compute(df.describe())
346
return result
347
```
348
"""
349
```
350
351
## Types
352
353
```python { .api }
354
from typing import Any, Dict, Set, Optional, Union, Callable, Iterable, Generator, AsyncGenerator
355
from datetime import timedelta
356
from uuid import UUID
357
import distributed
358
import distributed.deploy.cluster
359
from prefect.client.schemas.objects import RunInput, State
360
from prefect.futures import PrefectFuture, PrefectFutureList, PrefectWrappedFuture
361
from prefect.tasks import Task
362
from prefect.task_runners import TaskRunner
363
```
364
365
## Configuration Examples
366
367
### Local Cluster with Custom Settings
368
369
```python
370
from prefect_dask import DaskTaskRunner
371
372
# Create task runner with custom local cluster
373
task_runner = DaskTaskRunner(
374
cluster_kwargs={
375
"n_workers": 4,
376
"threads_per_worker": 2,
377
"memory_limit": "2GB",
378
"dashboard_address": ":8787"
379
}
380
)
381
```
382
383
### Remote Cluster Connection
384
385
```python
386
# Connect to existing cluster by address
387
task_runner = DaskTaskRunner(address="192.168.1.100:8786")
388
389
# Connect to existing cluster object
390
import distributed
391
cluster = distributed.LocalCluster()
392
task_runner = DaskTaskRunner(cluster=cluster)
393
```
394
395
### Cloud Provider Integration
396
397
```python
398
# Using dask-cloudprovider for AWS Fargate
399
task_runner = DaskTaskRunner(
400
cluster_class="dask_cloudprovider.FargateCluster",
401
cluster_kwargs={
402
"image": "prefecthq/prefect:latest",
403
"n_workers": 10,
404
"fargate_use_private_ip": True
405
}
406
)
407
```
408
409
### Adaptive Scaling
410
411
```python
412
# Enable adaptive scaling
413
task_runner = DaskTaskRunner(
414
cluster_kwargs={"n_workers": 2},
415
adapt_kwargs={
416
"minimum": 1,
417
"maximum": 20,
418
"target_duration": "30s"
419
}
420
)
421
```