or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-api.mdconfiguration.mdcontext-utilities.mdcore-workflows.mddeployments.mdindex.mdruntime-context.mdstate-management.mdvariables.md

deployments.mddocs/

0

# Deployments

1

2

Prefect's deployment system enables flows to be executed on remote infrastructure through work pools, scheduled runs, and deployment management. This provides the foundation for production workflow orchestration with infrastructure abstraction and scalable execution.

3

4

## Capabilities

5

6

### Flow Deployment

7

8

Deploy flows to work pools for remote execution with infrastructure management and scheduling capabilities.

9

10

```python { .api }

11

def deploy(

12

*flows: Flow,

13

name: str = None,

14

work_pool_name: str = None,

15

image: Union[str, DeploymentImage] = None,

16

build: bool = True,

17

push: bool = True,

18

print_next_steps: bool = True,

19

ignore_warnings: bool = False,

20

**kwargs,

21

) -> List[RunnerDeployment]:

22

"""

23

Deploy flows to work pools for remote execution.

24

25

Parameters:

26

- flows: One or more flow objects to deploy

27

- name: Name for the deployment (defaults to flow name)

28

- work_pool_name: Target work pool for execution

29

- image: Container image specification for deployment

30

- build: Whether to build the deployment image

31

- push: Whether to push the image to a registry

32

- print_next_steps: Whether to print deployment instructions

33

- ignore_warnings: Whether to ignore deployment warnings

34

- **kwargs: Additional deployment configuration options

35

36

Returns:

37

List of created RunnerDeployment objects

38

39

Raises:

40

ValueError: If deployment configuration is invalid

41

"""

42

```

43

44

#### Usage Examples

45

46

```python

47

from prefect import flow, deploy

48

from prefect.deployments.runner import RunnerDeployment

49

50

@flow

51

def data_pipeline():

52

"""ETL pipeline for data processing."""

53

# Pipeline logic here

54

pass

55

56

@flow

57

def monitoring_flow():

58

"""Monitoring and alerting workflow."""

59

# Monitoring logic here

60

pass

61

62

# Basic deployment

63

deployments = deploy(

64

data_pipeline,

65

name="production-etl",

66

work_pool_name="kubernetes-pool"

67

)

68

69

# Deployment with custom image

70

deployments = deploy(

71

data_pipeline,

72

monitoring_flow,

73

name="data-workflows",

74

work_pool_name="docker-pool",

75

image="my-registry/prefect-workflows:v1.0.0",

76

build=True,

77

push=True

78

)

79

80

# Deployment with custom configuration

81

deployments = deploy(

82

data_pipeline,

83

name="scheduled-etl",

84

work_pool_name="kubernetes-pool",

85

schedule={"cron": "0 2 * * *"}, # Daily at 2 AM

86

parameters={"env": "production"},

87

tags=["production", "etl"]

88

)

89

```

90

91

### Project Initialization

92

93

Initialize Prefect projects with configuration and deployment templates.

94

95

```python { .api }

96

def initialize_project(

97

name: str = None,

98

recipe: str = None,

99

) -> None:

100

"""

101

Initialize a new Prefect project with configuration templates.

102

103

Parameters:

104

- name: Name for the project (defaults to current directory name)

105

- recipe: Template recipe to use for initialization

106

107

Creates:

108

- prefect.yaml: Project configuration file

109

- .prefectignore: Files to ignore during deployment

110

- flows/: Directory for flow definitions (optional)

111

112

Raises:

113

FileExistsError: If project files already exist

114

"""

115

```

116

117

#### Usage Examples

118

119

```python

120

from prefect.deployments import initialize_project

121

122

# Initialize basic project

123

initialize_project(name="my-workflows")

124

125

# Initialize with Docker recipe

126

initialize_project(

127

name="docker-workflows",

128

recipe="docker"

129

)

130

131

# Initialize with Kubernetes recipe

132

initialize_project(

133

name="k8s-workflows",

134

recipe="kubernetes"

135

)

136

```

137

138

### Deployment Execution

139

140

Run deployed workflows on-demand or programmatically trigger execution.

141

142

```python { .api }

143

def run_deployment(

144

name: str,

145

parameters: Dict[str, Any] = None,

146

scheduled_time: datetime = None,

147

flow_run_name: str = None,

148

timeout: int = None,

149

poll_interval: int = 10,

150

tags: List[str] = None,

151

idempotency_key: str = None,

152

work_queue_name: str = None,

153

) -> FlowRun:

154

"""

155

Run a deployment and return the flow run result.

156

157

Parameters:

158

- name: Name of the deployment to run

159

- parameters: Parameters to pass to the flow

160

- scheduled_time: When to schedule the run (defaults to now)

161

- flow_run_name: Custom name for the flow run

162

- timeout: Maximum time to wait for completion (seconds)

163

- poll_interval: Polling interval for status updates (seconds)

164

- tags: Tags to apply to the flow run

165

- idempotency_key: Key to prevent duplicate runs

166

- work_queue_name: Specific work queue to use

167

168

Returns:

169

FlowRun object with execution results

170

171

Raises:

172

TimeoutError: If execution exceeds timeout

173

DeploymentNotFound: If deployment doesn't exist

174

"""

175

```

176

177

#### Usage Examples

178

179

```python

180

from prefect.deployments.flow_runs import run_deployment

181

from datetime import datetime, timedelta

182

183

# Run deployment immediately

184

flow_run = run_deployment(

185

"data-pipeline/production-etl",

186

parameters={"source": "s3://data-bucket", "env": "prod"}

187

)

188

189

# Schedule for future execution

190

future_time = datetime.now() + timedelta(hours=2)

191

flow_run = run_deployment(

192

"monitoring/health-check",

193

scheduled_time=future_time,

194

tags=["scheduled", "monitoring"]

195

)

196

197

# Run with custom configuration

198

flow_run = run_deployment(

199

"etl-pipeline/daily-load",

200

parameters={

201

"batch_size": 1000,

202

"parallel_workers": 4

203

},

204

flow_run_name="manual-daily-load-2024-01-15",

205

timeout=3600, # 1 hour timeout

206

work_queue_name="high-priority"

207

)

208

209

print(f"Flow run completed: {flow_run.state}")

210

```

211

212

### Runner Deployment Class

213

214

The RunnerDeployment class provides programmatic deployment management with full configuration control.

215

216

```python { .api }

217

class RunnerDeployment:

218

"""

219

Deployment configuration for runner-based execution.

220

221

Attributes:

222

- name: Deployment name

223

- flow: Associated flow object

224

- schedule: Scheduling configuration

225

- parameters: Default flow parameters

226

- tags: Deployment tags

227

- description: Deployment description

228

- version: Deployment version

229

- work_pool_name: Target work pool

230

- work_queue_name: Target work queue

231

- job_variables: Job-specific variables

232

"""

233

234

def __init__(

235

self,

236

name: str,

237

flow: Flow,

238

schedule: Union[CronSchedule, IntervalSchedule] = None,

239

parameters: Dict[str, Any] = None,

240

tags: List[str] = None,

241

description: str = None,

242

version: str = None,

243

work_pool_name: str = None,

244

work_queue_name: str = None,

245

job_variables: Dict[str, Any] = None,

246

enforce_parameter_schema: bool = None,

247

):

248

"""Initialize a runner deployment."""

249

250

def serve(

251

self,

252

pause_on_shutdown: bool = True,

253

print_starting_message: bool = True,

254

limit: int = None,

255

**kwargs

256

) -> None:

257

"""

258

Serve the deployment for remote execution.

259

260

Parameters:

261

- pause_on_shutdown: Whether to pause on shutdown

262

- print_starting_message: Whether to print startup message

263

- limit: Maximum concurrent runs

264

"""

265

266

@classmethod

267

def from_flow(

268

cls,

269

flow: Flow,

270

name: str = None,

271

**kwargs

272

) -> "RunnerDeployment":

273

"""Create a deployment from a flow object."""

274

275

def deploy(

276

self,

277

work_pool_name: str = None,

278

image: Union[str, DeploymentImage] = None,

279

**kwargs

280

) -> UUID:

281

"""

282

Deploy to the Prefect server.

283

284

Parameters:

285

- work_pool_name: Target work pool

286

- image: Container image specification

287

288

Returns:

289

UUID of the created deployment

290

"""

291

```

292

293

#### Usage Examples

294

295

```python

296

from prefect import flow

297

from prefect.deployments.runner import RunnerDeployment

298

from prefect.client.schemas.schedules import CronSchedule

299

300

@flow

301

def etl_pipeline():

302

# ETL logic here

303

pass

304

305

# Create deployment programmatically

306

deployment = RunnerDeployment(

307

name="etl-deployment",

308

flow=etl_pipeline,

309

schedule=CronSchedule(cron="0 2 * * *"), # Daily at 2 AM

310

parameters={"env": "production"},

311

tags=["etl", "production"],

312

description="Daily ETL pipeline",

313

version="1.0.0",

314

work_pool_name="kubernetes-pool"

315

)

316

317

# Deploy to server

318

deployment_id = deployment.deploy()

319

320

# Serve for local execution

321

deployment.serve(limit=5)

322

```

323

324

### Deployment Management

325

326

Functions for managing existing deployments, including updates and deletion.

327

328

```python { .api }

329

async def get_deployment(

330

name: str,

331

client: PrefectClient = None,

332

) -> Deployment:

333

"""

334

Retrieve a deployment by name.

335

336

Parameters:

337

- name: Name of the deployment

338

- client: Prefect client (defaults to current client)

339

340

Returns:

341

Deployment object

342

"""

343

344

async def update_deployment(

345

name: str,

346

schedule: Union[CronSchedule, IntervalSchedule] = None,

347

parameters: Dict[str, Any] = None,

348

tags: List[str] = None,

349

description: str = None,

350

version: str = None,

351

work_pool_name: str = None,

352

client: PrefectClient = None,

353

) -> Deployment:

354

"""

355

Update an existing deployment.

356

357

Parameters:

358

- name: Name of the deployment to update

359

- schedule: New scheduling configuration

360

- parameters: New default parameters

361

- tags: New tags

362

- description: New description

363

- version: New version

364

- work_pool_name: New work pool

365

- client: Prefect client (defaults to current client)

366

367

Returns:

368

Updated deployment object

369

"""

370

371

async def delete_deployment(

372

name: str,

373

client: PrefectClient = None,

374

) -> bool:

375

"""

376

Delete a deployment.

377

378

Parameters:

379

- name: Name of the deployment to delete

380

- client: Prefect client (defaults to current client)

381

382

Returns:

383

True if deletion was successful

384

"""

385

```

386

387

#### Usage Examples

388

389

```python

390

from prefect.deployments.base import get_deployment, update_deployment, delete_deployment

391

from prefect.client.schemas.schedules import IntervalSchedule

392

from datetime import timedelta

393

394

# Get existing deployment

395

deployment = await get_deployment("data-pipeline/production-etl")

396

397

# Update deployment schedule

398

updated_deployment = await update_deployment(

399

"data-pipeline/production-etl",

400

schedule=IntervalSchedule(interval=timedelta(hours=6)),

401

parameters={"batch_size": 2000},

402

tags=["production", "etl", "updated"]

403

)

404

405

# Delete deployment

406

success = await delete_deployment("old-deployment")

407

```

408

409

### Work Pool Integration

410

411

Integration with work pools for infrastructure-aware deployment execution.

412

413

```python { .api }

414

class WorkPoolJobConfiguration:

415

"""Configuration for jobs running in work pools."""

416

417

def __init__(

418

self,

419

command: List[str] = None,

420

env: Dict[str, str] = None,

421

labels: Dict[str, str] = None,

422

name: str = None,

423

**kwargs

424

):

425

"""Initialize work pool job configuration."""

426

427

async def get_work_pool(

428

work_pool_name: str,

429

client: PrefectClient = None,

430

) -> WorkPool:

431

"""

432

Retrieve work pool information.

433

434

Parameters:

435

- work_pool_name: Name of the work pool

436

- client: Prefect client (defaults to current client)

437

438

Returns:

439

WorkPool object with configuration details

440

"""

441

442

async def create_work_queue(

443

work_pool_name: str,

444

work_queue_name: str,

445

description: str = None,

446

is_paused: bool = False,

447

concurrency_limit: int = None,

448

priority: int = None,

449

client: PrefectClient = None,

450

) -> WorkQueue:

451

"""

452

Create a work queue within a work pool.

453

454

Parameters:

455

- work_pool_name: Target work pool name

456

- work_queue_name: Name for the new work queue

457

- description: Queue description

458

- is_paused: Whether to start the queue paused

459

- concurrency_limit: Maximum concurrent jobs

460

- priority: Queue priority for job assignment

461

- client: Prefect client (defaults to current client)

462

463

Returns:

464

Created WorkQueue object

465

"""

466

```

467

468

## Types

469

470

Types related to deployment functionality:

471

472

```python { .api }

473

from typing import Any, Dict, List, Optional, Union

474

from datetime import datetime

475

from uuid import UUID

476

from enum import Enum

477

478

class Deployment:

479

"""Deployment configuration object."""

480

id: UUID

481

name: str

482

flow_id: UUID

483

schedule: Optional[Union[CronSchedule, IntervalSchedule]]

484

parameters: Dict[str, Any]

485

tags: List[str]

486

description: Optional[str]

487

version: Optional[str]

488

work_pool_name: Optional[str]

489

work_queue_name: Optional[str]

490

created: datetime

491

updated: datetime

492

493

class FlowRun:

494

"""Flow run result object."""

495

id: UUID

496

name: str

497

flow_id: UUID

498

deployment_id: Optional[UUID]

499

state: State

500

parameters: Dict[str, Any]

501

tags: List[str]

502

created: datetime

503

expected_start_time: datetime

504

start_time: Optional[datetime]

505

end_time: Optional[datetime]

506

507

class DeploymentImage:

508

"""Container image specification for deployments."""

509

name: str

510

tag: str

511

dockerfile: Optional[str]

512

buildargs: Optional[Dict[str, str]]

513

514

class WorkPool:

515

"""Work pool configuration."""

516

name: str

517

type: str

518

description: Optional[str]

519

is_paused: bool

520

concurrency_limit: Optional[int]

521

default_queue_id: UUID

522

523

class WorkQueue:

524

"""Work queue within a work pool."""

525

id: UUID

526

name: str

527

description: Optional[str]

528

is_paused: bool

529

concurrency_limit: Optional[int]

530

priority: int

531

work_pool_id: UUID

532

533

# Schedule types

534

class CronSchedule:

535

"""Cron-based scheduling."""

536

cron: str

537

timezone: Optional[str]

538

day_or: bool

539

540

class IntervalSchedule:

541

"""Interval-based scheduling."""

542

interval: timedelta

543

anchor_date: Optional[datetime]

544

timezone: Optional[str]

545

546

class RRuleSchedule:

547

"""RRule-based scheduling."""

548

rrule: str

549

timezone: Optional[str]

550

```