or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-api.mdconfiguration.mdcontext-utilities.mdcore-workflows.mddeployments.mdindex.mdruntime-context.mdstate-management.mdvariables.md

index.mddocs/

0

# Prefect

1

2

Prefect is a workflow orchestration and management framework for building resilient data pipelines in Python. It provides a modern approach to workflow management with features like automatic retries, dynamic mapping, caching, and real-time observability, enabling robust and scalable data workflows.

3

4

## Package Information

5

6

- **Package Name**: prefect

7

- **Language**: Python

8

- **Installation**: `pip install prefect`

9

10

## Core Imports

11

12

```python

13

import prefect

14

from prefect import flow, task, get_run_logger, get_client, State

15

```

16

17

Common imports for workflow development:

18

19

```python

20

from prefect import flow, task

21

from prefect.tasks import task_input_hash

22

from prefect.deployments import deploy

23

from prefect.client.orchestration import get_client

24

from prefect.states import Completed, Failed, Running

25

from prefect.context import TaskRunContext

26

```

27

28

## Basic Usage

29

30

```python

31

from prefect import flow, task, get_run_logger

32

33

@task

34

def extract_data(url: str):

35

"""Extract data from a source."""

36

logger = get_run_logger()

37

logger.info(f"Extracting data from {url}")

38

# Simulate data extraction

39

return {"records": 100, "source": url}

40

41

@task

42

def transform_data(raw_data: dict):

43

"""Transform the extracted data."""

44

logger = get_run_logger()

45

logger.info("Transforming data")

46

return {

47

"processed_records": raw_data["records"],

48

"status": "transformed"

49

}

50

51

@task

52

def load_data(processed_data: dict):

53

"""Load data to destination."""

54

logger = get_run_logger()

55

logger.info(f"Loading {processed_data['processed_records']} records")

56

return {"loaded": True}

57

58

@flow(name="ETL Pipeline")

59

def etl_flow(source_url: str):

60

"""Complete ETL workflow."""

61

raw = extract_data(source_url)

62

processed = transform_data(raw)

63

result = load_data(processed)

64

return result

65

66

if __name__ == "__main__":

67

# Run the flow

68

result = etl_flow("https://api.example.com/data")

69

print(result)

70

```

71

72

## Architecture

73

74

Prefect's architecture is built around several key concepts:

75

76

- **Flows**: Top-level workflow containers that define the execution logic and dependencies between tasks

77

- **Tasks**: Individual units of work that can be cached, retried, and run in parallel

78

- **States**: Immutable snapshots representing the current status of flows and tasks (Pending, Running, Completed, Failed, etc.)

79

- **Deployments**: Infrastructure-aware flow configurations that enable scheduled and triggered execution

80

- **Work Pools**: Infrastructure abstraction layer for executing flows across different environments

81

- **Blocks**: Reusable configuration objects for credentials, connections, and infrastructure settings

82

83

This design enables building resilient workflows with automatic error handling, observability, and scalable execution across diverse infrastructure environments.

84

85

## Capabilities

86

87

### Core Workflows

88

89

Flow and task creation with decorators, execution control, serving deployments, and flow lifecycle management.

90

91

```python { .api }

92

def flow(

93

fn=None,

94

*,

95

name: str = None,

96

description: str = None,

97

version: str = None,

98

flow_run_name: str = None,

99

task_runner: TaskRunner = None,

100

timeout_seconds: Union[int, float] = None,

101

validate_parameters: bool = True,

102

retries: int = None,

103

retry_delay_seconds: Union[int, float] = None,

104

persist_result: bool = None,

105

result_storage: ResultStorage = None,

106

result_serializer: ResultSerializer = None,

107

cache_result_in_memory: bool = True,

108

on_completion: List[FlowStateHook] = None,

109

on_failure: List[FlowStateHook] = None,

110

on_cancellation: List[FlowStateHook] = None,

111

on_crashed: List[FlowStateHook] = None,

112

on_running: List[FlowStateHook] = None,

113

log_prints: bool = None,

114

): ...

115

116

def task(

117

fn=None,

118

*,

119

name: str = None,

120

description: str = None,

121

tags: Iterable[str] = None,

122

version: str = None,

123

cache_key_fn: Callable[[TaskRunContext, Dict[str, Any]], Optional[str]] = None,

124

cache_expiration: datetime.timedelta = None,

125

task_run_name: Union[str, Callable[[], str]] = None,

126

retries: int = None,

127

retry_delay_seconds: Union[int, float] = None,

128

retry_condition_fn: RetryConditionCallable = None,

129

persist_result: bool = None,

130

result_storage: ResultStorage = None,

131

result_serializer: ResultSerializer = None,

132

cache_result_in_memory: bool = True,

133

timeout_seconds: Union[int, float] = None,

134

log_prints: bool = None,

135

refresh_cache: bool = None,

136

on_completion: List[StateHookCallable] = None,

137

on_failure: List[StateHookCallable] = None,

138

): ...

139

140

def serve(*flows: Flow, **kwargs) -> None: ...

141

def aserve(*flows: Flow, **kwargs) -> None: ...

142

```

143

144

[Core Workflows](./core-workflows.md)

145

146

### State Management

147

148

State creation functions, state utilities, and flow run lifecycle control.

149

150

```python { .api }

151

def Completed(cls: type = State, **kwargs: Any) -> State: ...

152

153

def Failed(cls: type = State, **kwargs: Any) -> State: ...

154

155

def Running(cls: type = State, **kwargs: Any) -> State: ...

156

157

def Pending(cls: type = State, **kwargs: Any) -> State: ...

158

159

def Cancelled(cls: type = State, **kwargs: Any) -> State: ...

160

161

def Crashed(cls: type = State, **kwargs: Any) -> State: ...

162

163

def Scheduled(

164

scheduled_time: datetime = None,

165

name: str = None,

166

message: str = None,

167

type: StateType = None,

168

) -> State: ...

169

170

def Paused(

171

cls: type = State,

172

timeout_seconds: Optional[int] = None,

173

pause_expiration_time: Optional[datetime] = None,

174

reschedule: bool = False,

175

pause_key: Optional[str] = None,

176

**kwargs: Any,

177

) -> State: ...

178

179

def Suspended(

180

cls: type = State,

181

timeout_seconds: Optional[int] = None,

182

pause_expiration_time: Optional[datetime] = None,

183

pause_key: Optional[str] = None,

184

**kwargs: Any,

185

) -> State: ...

186

187

def AwaitingRetry(

188

cls: type = State,

189

scheduled_time: Optional[datetime] = None,

190

**kwargs: Any,

191

) -> State: ...

192

193

def Retrying(cls: type = State, **kwargs: Any) -> State: ...

194

195

def pause_flow_run(

196

wait_for_input: Optional[type] = None,

197

timeout: int = 3600,

198

poll_interval: int = 10,

199

key: Optional[str] = None,

200

) -> Any: ...

201

202

def resume_flow_run(

203

flow_run_id: UUID,

204

run_input: Optional[Dict[str, Any]] = None,

205

) -> None: ...

206

```

207

208

[State Management](./state-management.md)

209

210

### Deployments

211

212

Deploy flows to work pools, manage deployments, and run deployed workflows.

213

214

```python { .api }

215

def deploy(

216

*deployments: RunnerDeployment,

217

work_pool_name: Optional[str] = None,

218

image: Optional[Union[str, DockerImage]] = None,

219

build: bool = True,

220

push: bool = True,

221

print_next_steps_message: bool = True,

222

ignore_warnings: bool = False,

223

) -> List[UUID]: ...

224

225

def initialize_project(

226

name: str = None,

227

recipe: str = None,

228

) -> None: ...

229

230

def run_deployment(

231

name: str,

232

parameters: Dict[str, Any] = None,

233

scheduled_time: datetime = None,

234

flow_run_name: str = None,

235

timeout: int = None,

236

poll_interval: int = 10,

237

tags: List[str] = None,

238

idempotency_key: str = None,

239

work_queue_name: str = None,

240

) -> FlowRun: ...

241

```

242

243

[Deployments](./deployments.md)

244

245

### Client API

246

247

HTTP clients for interacting with the Prefect API server and cloud services.

248

249

```python { .api }

250

def get_client(

251

httpx_settings: dict = None,

252

sync_client: bool = None,

253

) -> Union[PrefectClient, SyncPrefectClient]: ...

254

255

class PrefectClient:

256

def __init__(

257

self,

258

api: str = None,

259

api_key: str = None,

260

api_version: str = None,

261

httpx_settings: dict = None,

262

): ...

263

264

async def create_flow_run(

265

self,

266

flow: Flow,

267

name: str = None,

268

parameters: Dict[str, Any] = None,

269

context: Dict[str, Any] = None,

270

tags: List[str] = None,

271

parent_task_run_id: UUID = None,

272

state: State = None,

273

) -> FlowRun: ...

274

```

275

276

[Client API](./client-api.md)

277

278

### Context & Utilities

279

280

Context management, logging, annotations, and transaction support.

281

282

```python { .api }

283

def get_run_logger(name: str = None) -> logging.Logger: ...

284

285

def tags(*tags: str, **kwargs) -> ContextManager: ...

286

287

class unmapped:

288

def __init__(self, value: Any): ...

289

290

class allow_failure:

291

def __init__(self, value: Any): ...

292

293

class Transaction:

294

def __init__(

295

self,

296

key: Optional[str] = None,

297

timeout: Optional[float] = None,

298

): ...

299

300

def __enter__(self) -> "Transaction": ...

301

def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

302

```

303

304

[Context & Utilities](./context-utilities.md)

305

306

### Runtime Context Access

307

308

Access current execution context including deployment parameters, flow run metadata, and task run information during workflow execution.

309

310

```python { .api }

311

# Runtime context modules for accessing execution information

312

import prefect.runtime.deployment

313

import prefect.runtime.flow_run

314

import prefect.runtime.task_run

315

316

# Deployment context attributes

317

prefect.runtime.deployment.id: str

318

prefect.runtime.deployment.name: str

319

prefect.runtime.deployment.parameters: Dict[str, Any]

320

prefect.runtime.deployment.version: str

321

322

# Flow run context attributes

323

prefect.runtime.flow_run.id: str

324

prefect.runtime.flow_run.name: str

325

prefect.runtime.flow_run.parameters: Dict[str, Any]

326

prefect.runtime.flow_run.tags: List[str]

327

328

# Task run context attributes

329

prefect.runtime.task_run.id: str

330

prefect.runtime.task_run.name: str

331

prefect.runtime.task_run.task_key: str

332

```

333

334

[Runtime Context Access](./runtime-context.md)

335

336

### Variables Management

337

338

Named, mutable JSON values that can be shared across tasks, flows, and deployments for configuration and data storage.

339

340

```python { .api }

341

from prefect.variables import Variable

342

343

class Variable(BaseModel):

344

name: str

345

value: StrictVariableValue

346

tags: Optional[List[str]]

347

348

@classmethod

349

def get(

350

cls,

351

name: str,

352

default: StrictVariableValue = None,

353

) -> StrictVariableValue: ...

354

355

@classmethod

356

async def aget(

357

cls,

358

name: str,

359

default: StrictVariableValue = None,

360

) -> StrictVariableValue: ...

361

362

@classmethod

363

def set(

364

cls,

365

name: str,

366

value: StrictVariableValue,

367

tags: Optional[List[str]] = None,

368

overwrite: bool = False,

369

) -> "Variable": ...

370

371

@classmethod

372

async def aset(

373

cls,

374

name: str,

375

value: StrictVariableValue,

376

tags: Optional[List[str]] = None,

377

overwrite: bool = False,

378

) -> "Variable": ...

379

```

380

381

[Variables Management](./variables.md)

382

383

### Configuration

384

385

Settings, blocks, and configuration management for Prefect infrastructure and credentials.

386

387

```python { .api }

388

class Block(BaseModel):

389

def save(

390

self,

391

name: str,

392

overwrite: bool = False,

393

) -> UUID: ...

394

395

@classmethod

396

def load(cls, name: str) -> "Block": ...

397

398

def get_settings_context() -> SettingsContext: ...

399

```

400

401

[Configuration](./configuration.md)

402

403

## Types

404

405

Core types used throughout the Prefect API:

406

407

```python { .api }

408

from typing import Any, Callable, Dict, List, Optional, Union, TypeVar, Generic

409

from datetime import datetime, timedelta

410

from uuid import UUID

411

import logging

412

413

# Generic type parameters

414

P = TypeVar("P") # Parameters

415

R = TypeVar("R") # Return type

416

417

class Flow(Generic[P, R]):

418

name: str

419

fn: Callable[P, R]

420

description: Optional[str]

421

version: Optional[str]

422

423

class Task(Generic[P, R]):

424

name: str

425

fn: Callable[P, R]

426

description: Optional[str]

427

version: Optional[str]

428

429

class State:

430

type: StateType

431

name: Optional[str]

432

message: Optional[str]

433

data: Any

434

timestamp: datetime

435

436

class FlowRun:

437

id: UUID

438

name: str

439

flow_id: UUID

440

state: State

441

parameters: Dict[str, Any]

442

443

class TaskRun:

444

id: UUID

445

name: str

446

task_key: str

447

flow_run_id: UUID

448

state: State

449

450

# Enums and Constants

451

class StateType(str, Enum):

452

SCHEDULED = "SCHEDULED"

453

PENDING = "PENDING"

454

RUNNING = "RUNNING"

455

COMPLETED = "COMPLETED"

456

FAILED = "FAILED"

457

CANCELLED = "CANCELLED"

458

CRASHED = "CRASHED"

459

PAUSED = "PAUSED"

460

SUSPENDED = "SUSPENDED"

461

AWAITING_RETRY = "AWAITING_RETRY"

462

RETRYING = "RETRYING"

463

464

# Context types

465

class TaskRunContext:

466

"""Context information available during task execution."""

467

pass

468

469

# Callable types

470

FlowStateHook = Callable[[Flow, FlowRun, State], None]

471

StateHookCallable = Callable[[Task, TaskRun, State], None]

472

RetryConditionCallable = Callable[[Task, TaskRun, State], bool]

473

474

# Configuration types

475

class TaskRunner:

476

pass

477

478

class ResultStorage:

479

pass

480

481

class ResultSerializer:

482

pass

483

484

# Deployment types

485

class RunnerDeployment:

486

"""Deployment configuration for flows."""

487

pass

488

489

class DockerImage:

490

"""Docker image configuration for deployments."""

491

pass

492

493

class DeploymentImage:

494

"""Deployment image configuration."""

495

pass

496

```