or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Prefect Dask

1

2

Prefect integrations with the Dask execution framework for parallel and distributed computing. This package enables Prefect flows to execute tasks using Dask's distributed computing capabilities, providing scalable task execution through the DaskTaskRunner and utilities for managing Dask clients within Prefect workflows.

3

4

## Package Information

5

6

- **Package Name**: prefect-dask

7

- **Language**: Python

8

- **Installation**: `pip install prefect-dask`

9

10

## Core Imports

11

12

```python

13

from prefect_dask import DaskTaskRunner, PrefectDaskClient, get_dask_client, get_async_dask_client

14

```

15

16

Module-specific imports:

17

18

```python

19

from prefect_dask.task_runners import DaskTaskRunner, PrefectDaskFuture

20

from prefect_dask.client import PrefectDaskClient

21

from prefect_dask.utils import get_dask_client, get_async_dask_client

22

```

23

24

## Basic Usage

25

26

```python

27

import time

28

from prefect import flow, task

29

from prefect_dask import DaskTaskRunner

30

31

@task

32

def compute_task(x):

33

time.sleep(1) # Simulate work

34

return x * 2

35

36

@flow(task_runner=DaskTaskRunner())

37

def parallel_flow():

38

# Submit multiple tasks for parallel execution

39

futures = []

40

for i in range(5):

41

futures.append(compute_task.submit(i))

42

43

# Collect results

44

results = [future.result() for future in futures]

45

return results

46

47

# Execute with local Dask cluster

48

if __name__ == "__main__":

49

results = parallel_flow()

50

print(results) # [0, 2, 4, 6, 8]

51

```

52

53

Using Dask clients within tasks:

54

55

```python

56

import dask

57

from prefect import flow, task

58

from prefect_dask import DaskTaskRunner, get_dask_client

59

60

@task

61

def process_data():

62

with get_dask_client() as client:

63

# Use dask operations within the task

64

df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")

65

result = client.compute(df.describe()).result()

66

return result

67

68

@flow(task_runner=DaskTaskRunner())

69

def data_processing_flow():

70

return process_data()

71

```

72

73

## Capabilities

74

75

### Task Runner

76

77

The DaskTaskRunner enables parallel and distributed execution of Prefect tasks using Dask's distributed computing framework. It supports local clusters, remote clusters, and various Dask cluster configurations.

78

79

```python { .api }

80

class DaskTaskRunner:

81

def __init__(

82

self,

83

cluster: Optional[distributed.deploy.cluster.Cluster] = None,

84

address: Optional[str] = None,

85

cluster_class: Union[str, Callable[[], distributed.deploy.cluster.Cluster], None] = None,

86

cluster_kwargs: Optional[dict[str, Any]] = None,

87

adapt_kwargs: Optional[dict[str, Any]] = None,

88

client_kwargs: Optional[dict[str, Any]] = None,

89

performance_report_path: Optional[str] = None,

90

):

91

"""

92

A parallel task runner that submits tasks to the dask.distributed scheduler.

93

94

Parameters:

95

- cluster: Currently running dask cluster object

96

- address: Address of currently running dask scheduler

97

- cluster_class: The cluster class to use when creating temporary cluster

98

- cluster_kwargs: Additional kwargs to pass to cluster_class

99

- adapt_kwargs: Additional kwargs to pass to cluster.adapt for adaptive scaling

100

- client_kwargs: Additional kwargs for distributed.Client creation

101

- performance_report_path: Path where Dask performance report will be saved

102

"""

103

104

def submit(

105

self,

106

task: Task,

107

parameters: dict[str, Any],

108

wait_for: Iterable[PrefectDaskFuture] | None = None,

109

dependencies: dict[str, Set[RunInput]] | None = None,

110

) -> PrefectDaskFuture:

111

"""

112

Submit a task for execution on the Dask cluster.

113

114

Parameters:

115

- task: The Prefect task to execute

116

- parameters: Task parameters as key-value pairs

117

- wait_for: Other futures to wait for before execution

118

- dependencies: Task run dependencies

119

120

Returns:

121

PrefectDaskFuture: Future representing the task execution

122

"""

123

124

def map(

125

self,

126

task: Task,

127

parameters: dict[str, Any],

128

wait_for: Iterable[PrefectFuture[Any]] | None = None,

129

) -> PrefectFutureList[PrefectDaskFuture]:

130

"""

131

Map a task over multiple parameter sets for parallel execution.

132

133

Parameters:

134

- task: The Prefect task to map

135

- parameters: Dictionary with lists of parameters to map over

136

- wait_for: Other futures to wait for before execution

137

138

Returns:

139

PrefectFutureList: List of futures representing mapped task executions

140

"""

141

142

@property

143

def client(self) -> PrefectDaskClient:

144

"""Get the Dask client for the task runner."""

145

146

def duplicate(self) -> DaskTaskRunner:

147

"""Create a new instance with the same settings."""

148

```

149

150

### Dask Client Integration

151

152

PrefectDaskClient extends the standard Dask distributed.Client to handle Prefect-specific task submission and execution patterns.

153

154

```python { .api }

155

class PrefectDaskClient(distributed.Client):

156

def submit(

157

self,

158

func,

159

*args,

160

key=None,

161

workers=None,

162

resources=None,

163

retries=None,

164

priority=0,

165

fifo_timeout="100 ms",

166

allow_other_workers=False,

167

actor=False,

168

actors=False,

169

pure=True,

170

**kwargs,

171

):

172

"""

173

Submit a function or Prefect task for execution.

174

175

When func is a Prefect Task, automatically handles task context,

176

dependencies, and return types. Otherwise behaves like standard

177

distributed.Client.submit().

178

179

Parameters:

180

- func: Function or Prefect Task to execute

181

- args: Positional arguments for the function

182

- key: Unique identifier for the task

183

- workers: Specific workers to run on

184

- resources: Resource requirements

185

- retries: Number of retries on failure

186

- priority: Task priority (higher = more important)

187

- fifo_timeout: Time to wait in scheduler queue

188

- allow_other_workers: Allow execution on other workers

189

- actor: Create as an actor

190

- actors: Legacy actor parameter

191

- pure: Whether function is pure (deterministic)

192

- kwargs: Additional keyword arguments

193

194

Returns:

195

distributed.Future: Future representing the execution

196

"""

197

198

def map(

199

self,

200

func,

201

*iterables,

202

key=None,

203

workers=None,

204

retries=None,

205

resources=None,

206

priority=0,

207

allow_other_workers=False,

208

fifo_timeout="100 ms",

209

actor=False,

210

actors=False,

211

pure=True,

212

batch_size=None,

213

**kwargs,

214

):

215

"""

216

Map a function or Prefect task over iterables.

217

218

When func is a Prefect Task, handles each mapped execution with

219

proper Prefect context. Otherwise behaves like standard

220

distributed.Client.map().

221

222

Parameters:

223

- func: Function or Prefect Task to map

224

- iterables: Iterables to map over

225

- key: Base key for generated tasks

226

- workers: Specific workers to run on

227

- retries: Number of retries on failure

228

- resources: Resource requirements

229

- priority: Task priority

230

- allow_other_workers: Allow execution on other workers

231

- fifo_timeout: Time to wait in scheduler queue

232

- actor: Create as actors

233

- actors: Legacy actor parameter

234

- pure: Whether function is pure

235

- batch_size: Number of items per batch

236

- kwargs: Additional keyword arguments

237

238

Returns:

239

List[distributed.Future]: List of futures for mapped executions

240

"""

241

```

242

243

### Future Handling

244

245

PrefectDaskFuture wraps Dask distributed.Future objects to provide Prefect-compatible future interface with proper state handling.

246

247

```python { .api }

248

class PrefectDaskFuture(PrefectWrappedFuture):

249

def __init__(self, task_run_id: UUID, wrapped_future: distributed.Future):

250

"""

251

A Prefect future that wraps a distributed.Future.

252

253

Parameters:

254

- task_run_id: Prefect task run identifier

255

- wrapped_future: The underlying Dask distributed.Future

256

"""

257

258

def wait(self, timeout: Optional[float] = None) -> None:

259

"""

260

Wait for the future to complete.

261

262

Parameters:

263

- timeout: Maximum time to wait in seconds

264

"""

265

266

def result(

267

self,

268

timeout: Optional[float] = None,

269

raise_on_failure: bool = True,

270

):

271

"""

272

Get the result of the future.

273

274

Parameters:

275

- timeout: Maximum time to wait for result in seconds

276

- raise_on_failure: Whether to raise exception on task failure

277

278

Returns:

279

The task result

280

281

Raises:

282

- TimeoutError: If timeout is reached before completion

283

- Exception: Task execution exception if raise_on_failure=True

284

"""

285

```

286

287

### Client Utilities

288

289

Context managers for obtaining temporary Dask clients within Prefect tasks and flows, supporting both synchronous and asynchronous execution patterns.

290

291

```python { .api }

292

def get_dask_client(

293

timeout: Optional[Union[int, float, str, timedelta]] = None,

294

**client_kwargs: Dict[str, Any],

295

) -> Generator[distributed.Client, None, None]:

296

"""

297

Context manager yielding a temporary synchronous dask client.

298

299

Automatically configures client based on the current Prefect context

300

(task or flow execution). Useful for parallelizing operations on dask

301

collections within Prefect tasks.

302

303

Parameters:

304

- timeout: Connection timeout (no effect in flow contexts)

305

- client_kwargs: Additional keyword arguments for distributed.Client

306

307

Yields:

308

distributed.Client: Temporary synchronous dask client

309

310

Example:

311

```python

312

@task

313

def compute_task():

314

with get_dask_client(timeout="120s") as client:

315

df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")

316

result = client.compute(df.describe()).result()

317

return result

318

```

319

"""

320

321

async def get_async_dask_client(

322

timeout: Optional[Union[int, float, str, timedelta]] = None,

323

**client_kwargs: Dict[str, Any],

324

) -> AsyncGenerator[distributed.Client, None]:

325

"""

326

Async context manager yielding a temporary asynchronous dask client.

327

328

Automatically configures client based on the current Prefect context

329

for async task or flow execution. Useful for parallelizing operations

330

on dask collections within async Prefect tasks.

331

332

Parameters:

333

- timeout: Connection timeout (no effect in flow contexts)

334

- client_kwargs: Additional keyword arguments for distributed.Client

335

336

Yields:

337

distributed.Client: Temporary asynchronous dask client

338

339

Example:

340

```python

341

@task

342

async def async_compute_task():

343

async with get_async_dask_client(timeout="120s") as client:

344

df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")

345

result = await client.compute(df.describe())

346

return result

347

```

348

"""

349

```

350

351

## Types

352

353

```python { .api }

354

from typing import Any, Dict, Set, Optional, Union, Callable, Iterable, Generator, AsyncGenerator

355

from datetime import timedelta

356

from uuid import UUID

357

import distributed

358

import distributed.deploy.cluster

359

from prefect.client.schemas.objects import RunInput, State

360

from prefect.futures import PrefectFuture, PrefectFutureList, PrefectWrappedFuture

361

from prefect.tasks import Task

362

from prefect.task_runners import TaskRunner

363

```

364

365

## Configuration Examples

366

367

### Local Cluster with Custom Settings

368

369

```python

370

from prefect_dask import DaskTaskRunner

371

372

# Create task runner with custom local cluster

373

task_runner = DaskTaskRunner(

374

cluster_kwargs={

375

"n_workers": 4,

376

"threads_per_worker": 2,

377

"memory_limit": "2GB",

378

"dashboard_address": ":8787"

379

}

380

)

381

```

382

383

### Remote Cluster Connection

384

385

```python

386

# Connect to existing cluster by address

387

task_runner = DaskTaskRunner(address="192.168.1.100:8786")

388

389

# Connect to existing cluster object

390

import distributed

391

cluster = distributed.LocalCluster()

392

task_runner = DaskTaskRunner(cluster=cluster)

393

```

394

395

### Cloud Provider Integration

396

397

```python

398

# Using dask-cloudprovider for AWS Fargate

399

task_runner = DaskTaskRunner(

400

cluster_class="dask_cloudprovider.FargateCluster",

401

cluster_kwargs={

402

"image": "prefecthq/prefect:latest",

403

"n_workers": 10,

404

"fargate_use_private_ip": True

405

}

406

)

407

```

408

409

### Adaptive Scaling

410

411

```python

412

# Enable adaptive scaling

413

task_runner = DaskTaskRunner(

414

cluster_kwargs={"n_workers": 2},

415

adapt_kwargs={

416

"minimum": 1,

417

"maximum": 20,

418

"target_duration": "30s"

419

}

420

)

421

```