0
# Deployments
1
2
Prefect's deployment system enables flows to be executed on remote infrastructure through work pools, scheduled runs, and deployment management. This provides the foundation for production workflow orchestration with infrastructure abstraction and scalable execution.
3
4
## Capabilities
5
6
### Flow Deployment
7
8
Deploy flows to work pools for remote execution with infrastructure management and scheduling capabilities.
9
10
```python { .api }
11
def deploy(
12
*flows: Flow,
13
name: str = None,
14
work_pool_name: str = None,
15
image: Union[str, DeploymentImage] = None,
16
build: bool = True,
17
push: bool = True,
18
print_next_steps: bool = True,
19
ignore_warnings: bool = False,
20
**kwargs,
21
) -> List[RunnerDeployment]:
22
"""
23
Deploy flows to work pools for remote execution.
24
25
Parameters:
26
- flows: One or more flow objects to deploy
27
- name: Name for the deployment (defaults to flow name)
28
- work_pool_name: Target work pool for execution
29
- image: Container image specification for deployment
30
- build: Whether to build the deployment image
31
- push: Whether to push the image to a registry
32
- print_next_steps: Whether to print deployment instructions
33
- ignore_warnings: Whether to ignore deployment warnings
34
- **kwargs: Additional deployment configuration options
35
36
Returns:
37
List of created RunnerDeployment objects
38
39
Raises:
40
ValueError: If deployment configuration is invalid
41
"""
42
```
43
44
#### Usage Examples
45
46
```python
47
from prefect import flow, deploy
48
from prefect.deployments.runner import RunnerDeployment
49
50
@flow
51
def data_pipeline():
52
"""ETL pipeline for data processing."""
53
# Pipeline logic here
54
pass
55
56
@flow
57
def monitoring_flow():
58
"""Monitoring and alerting workflow."""
59
# Monitoring logic here
60
pass
61
62
# Basic deployment
63
deployments = deploy(
64
data_pipeline,
65
name="production-etl",
66
work_pool_name="kubernetes-pool"
67
)
68
69
# Deployment with custom image
70
deployments = deploy(
71
data_pipeline,
72
monitoring_flow,
73
name="data-workflows",
74
work_pool_name="docker-pool",
75
image="my-registry/prefect-workflows:v1.0.0",
76
build=True,
77
push=True
78
)
79
80
# Deployment with custom configuration
81
deployments = deploy(
82
data_pipeline,
83
name="scheduled-etl",
84
work_pool_name="kubernetes-pool",
85
schedule={"cron": "0 2 * * *"}, # Daily at 2 AM
86
parameters={"env": "production"},
87
tags=["production", "etl"]
88
)
89
```
90
91
### Project Initialization
92
93
Initialize Prefect projects with configuration and deployment templates.
94
95
```python { .api }
96
def initialize_project(
97
name: str = None,
98
recipe: str = None,
99
) -> None:
100
"""
101
Initialize a new Prefect project with configuration templates.
102
103
Parameters:
104
- name: Name for the project (defaults to current directory name)
105
- recipe: Template recipe to use for initialization
106
107
Creates:
108
- prefect.yaml: Project configuration file
109
- .prefectignore: Files to ignore during deployment
110
- flows/: Directory for flow definitions (optional)
111
112
Raises:
113
FileExistsError: If project files already exist
114
"""
115
```
116
117
#### Usage Examples
118
119
```python
120
from prefect.deployments import initialize_project
121
122
# Initialize basic project
123
initialize_project(name="my-workflows")
124
125
# Initialize with Docker recipe
126
initialize_project(
127
name="docker-workflows",
128
recipe="docker"
129
)
130
131
# Initialize with Kubernetes recipe
132
initialize_project(
133
name="k8s-workflows",
134
recipe="kubernetes"
135
)
136
```
137
138
### Deployment Execution
139
140
Run deployed workflows on-demand or programmatically trigger execution.
141
142
```python { .api }
143
def run_deployment(
144
name: str,
145
parameters: Dict[str, Any] = None,
146
scheduled_time: datetime = None,
147
flow_run_name: str = None,
148
timeout: int = None,
149
poll_interval: int = 10,
150
tags: List[str] = None,
151
idempotency_key: str = None,
152
work_queue_name: str = None,
153
) -> FlowRun:
154
"""
155
Run a deployment and return the flow run result.
156
157
Parameters:
158
- name: Name of the deployment to run
159
- parameters: Parameters to pass to the flow
160
- scheduled_time: When to schedule the run (defaults to now)
161
- flow_run_name: Custom name for the flow run
162
- timeout: Maximum time to wait for completion (seconds)
163
- poll_interval: Polling interval for status updates (seconds)
164
- tags: Tags to apply to the flow run
165
- idempotency_key: Key to prevent duplicate runs
166
- work_queue_name: Specific work queue to use
167
168
Returns:
169
FlowRun object with execution results
170
171
Raises:
172
TimeoutError: If execution exceeds timeout
173
DeploymentNotFound: If deployment doesn't exist
174
"""
175
```
176
177
#### Usage Examples
178
179
```python
180
from prefect.deployments.flow_runs import run_deployment
181
from datetime import datetime, timedelta
182
183
# Run deployment immediately
184
flow_run = run_deployment(
185
"data-pipeline/production-etl",
186
parameters={"source": "s3://data-bucket", "env": "prod"}
187
)
188
189
# Schedule for future execution
190
future_time = datetime.now() + timedelta(hours=2)
191
flow_run = run_deployment(
192
"monitoring/health-check",
193
scheduled_time=future_time,
194
tags=["scheduled", "monitoring"]
195
)
196
197
# Run with custom configuration
198
flow_run = run_deployment(
199
"etl-pipeline/daily-load",
200
parameters={
201
"batch_size": 1000,
202
"parallel_workers": 4
203
},
204
flow_run_name="manual-daily-load-2024-01-15",
205
timeout=3600, # 1 hour timeout
206
work_queue_name="high-priority"
207
)
208
209
print(f"Flow run completed: {flow_run.state}")
210
```
211
212
### Runner Deployment Class
213
214
The RunnerDeployment class provides programmatic deployment management with full configuration control.
215
216
```python { .api }
217
class RunnerDeployment:
218
"""
219
Deployment configuration for runner-based execution.
220
221
Attributes:
222
- name: Deployment name
223
- flow: Associated flow object
224
- schedule: Scheduling configuration
225
- parameters: Default flow parameters
226
- tags: Deployment tags
227
- description: Deployment description
228
- version: Deployment version
229
- work_pool_name: Target work pool
230
- work_queue_name: Target work queue
231
- job_variables: Job-specific variables
232
"""
233
234
def __init__(
235
self,
236
name: str,
237
flow: Flow,
238
schedule: Union[CronSchedule, IntervalSchedule] = None,
239
parameters: Dict[str, Any] = None,
240
tags: List[str] = None,
241
description: str = None,
242
version: str = None,
243
work_pool_name: str = None,
244
work_queue_name: str = None,
245
job_variables: Dict[str, Any] = None,
246
enforce_parameter_schema: bool = None,
247
):
248
"""Initialize a runner deployment."""
249
250
def serve(
251
self,
252
pause_on_shutdown: bool = True,
253
print_starting_message: bool = True,
254
limit: int = None,
255
**kwargs
256
) -> None:
257
"""
258
Serve the deployment for remote execution.
259
260
Parameters:
261
- pause_on_shutdown: Whether to pause on shutdown
262
- print_starting_message: Whether to print startup message
263
- limit: Maximum concurrent runs
264
"""
265
266
@classmethod
267
def from_flow(
268
cls,
269
flow: Flow,
270
name: str = None,
271
**kwargs
272
) -> "RunnerDeployment":
273
"""Create a deployment from a flow object."""
274
275
def deploy(
276
self,
277
work_pool_name: str = None,
278
image: Union[str, DeploymentImage] = None,
279
**kwargs
280
) -> UUID:
281
"""
282
Deploy to the Prefect server.
283
284
Parameters:
285
- work_pool_name: Target work pool
286
- image: Container image specification
287
288
Returns:
289
UUID of the created deployment
290
"""
291
```
292
293
#### Usage Examples
294
295
```python
296
from prefect import flow
297
from prefect.deployments.runner import RunnerDeployment
298
from prefect.client.schemas.schedules import CronSchedule
299
300
@flow
301
def etl_pipeline():
302
# ETL logic here
303
pass
304
305
# Create deployment programmatically
306
deployment = RunnerDeployment(
307
name="etl-deployment",
308
flow=etl_pipeline,
309
schedule=CronSchedule(cron="0 2 * * *"), # Daily at 2 AM
310
parameters={"env": "production"},
311
tags=["etl", "production"],
312
description="Daily ETL pipeline",
313
version="1.0.0",
314
work_pool_name="kubernetes-pool"
315
)
316
317
# Deploy to server
318
deployment_id = deployment.deploy()
319
320
# Serve for local execution
321
deployment.serve(limit=5)
322
```
323
324
### Deployment Management
325
326
Functions for managing existing deployments, including updates and deletion.
327
328
```python { .api }
329
async def get_deployment(
330
name: str,
331
client: PrefectClient = None,
332
) -> Deployment:
333
"""
334
Retrieve a deployment by name.
335
336
Parameters:
337
- name: Name of the deployment
338
- client: Prefect client (defaults to current client)
339
340
Returns:
341
Deployment object
342
"""
343
344
async def update_deployment(
345
name: str,
346
schedule: Union[CronSchedule, IntervalSchedule] = None,
347
parameters: Dict[str, Any] = None,
348
tags: List[str] = None,
349
description: str = None,
350
version: str = None,
351
work_pool_name: str = None,
352
client: PrefectClient = None,
353
) -> Deployment:
354
"""
355
Update an existing deployment.
356
357
Parameters:
358
- name: Name of the deployment to update
359
- schedule: New scheduling configuration
360
- parameters: New default parameters
361
- tags: New tags
362
- description: New description
363
- version: New version
364
- work_pool_name: New work pool
365
- client: Prefect client (defaults to current client)
366
367
Returns:
368
Updated deployment object
369
"""
370
371
async def delete_deployment(
372
name: str,
373
client: PrefectClient = None,
374
) -> bool:
375
"""
376
Delete a deployment.
377
378
Parameters:
379
- name: Name of the deployment to delete
380
- client: Prefect client (defaults to current client)
381
382
Returns:
383
True if deletion was successful
384
"""
385
```
386
387
#### Usage Examples
388
389
```python
390
from prefect.deployments.base import get_deployment, update_deployment, delete_deployment
391
from prefect.client.schemas.schedules import IntervalSchedule
392
from datetime import timedelta
393
394
# Get existing deployment
395
deployment = await get_deployment("data-pipeline/production-etl")
396
397
# Update deployment schedule
398
updated_deployment = await update_deployment(
399
"data-pipeline/production-etl",
400
schedule=IntervalSchedule(interval=timedelta(hours=6)),
401
parameters={"batch_size": 2000},
402
tags=["production", "etl", "updated"]
403
)
404
405
# Delete deployment
406
success = await delete_deployment("old-deployment")
407
```
408
409
### Work Pool Integration
410
411
Integration with work pools for infrastructure-aware deployment execution.
412
413
```python { .api }
414
class WorkPoolJobConfiguration:
415
"""Configuration for jobs running in work pools."""
416
417
def __init__(
418
self,
419
command: List[str] = None,
420
env: Dict[str, str] = None,
421
labels: Dict[str, str] = None,
422
name: str = None,
423
**kwargs
424
):
425
"""Initialize work pool job configuration."""
426
427
async def get_work_pool(
428
work_pool_name: str,
429
client: PrefectClient = None,
430
) -> WorkPool:
431
"""
432
Retrieve work pool information.
433
434
Parameters:
435
- work_pool_name: Name of the work pool
436
- client: Prefect client (defaults to current client)
437
438
Returns:
439
WorkPool object with configuration details
440
"""
441
442
async def create_work_queue(
443
work_pool_name: str,
444
work_queue_name: str,
445
description: str = None,
446
is_paused: bool = False,
447
concurrency_limit: int = None,
448
priority: int = None,
449
client: PrefectClient = None,
450
) -> WorkQueue:
451
"""
452
Create a work queue within a work pool.
453
454
Parameters:
455
- work_pool_name: Target work pool name
456
- work_queue_name: Name for the new work queue
457
- description: Queue description
458
- is_paused: Whether to start the queue paused
459
- concurrency_limit: Maximum concurrent jobs
460
- priority: Queue priority for job assignment
461
- client: Prefect client (defaults to current client)
462
463
Returns:
464
Created WorkQueue object
465
"""
466
```
467
468
## Types
469
470
Types related to deployment functionality:
471
472
```python { .api }
473
from typing import Any, Dict, List, Optional, Union
474
from datetime import datetime
475
from uuid import UUID
476
from enum import Enum
477
478
class Deployment:
479
"""Deployment configuration object."""
480
id: UUID
481
name: str
482
flow_id: UUID
483
schedule: Optional[Union[CronSchedule, IntervalSchedule]]
484
parameters: Dict[str, Any]
485
tags: List[str]
486
description: Optional[str]
487
version: Optional[str]
488
work_pool_name: Optional[str]
489
work_queue_name: Optional[str]
490
created: datetime
491
updated: datetime
492
493
class FlowRun:
494
"""Flow run result object."""
495
id: UUID
496
name: str
497
flow_id: UUID
498
deployment_id: Optional[UUID]
499
state: State
500
parameters: Dict[str, Any]
501
tags: List[str]
502
created: datetime
503
expected_start_time: datetime
504
start_time: Optional[datetime]
505
end_time: Optional[datetime]
506
507
class DeploymentImage:
508
"""Container image specification for deployments."""
509
name: str
510
tag: str
511
dockerfile: Optional[str]
512
buildargs: Optional[Dict[str, str]]
513
514
class WorkPool:
515
"""Work pool configuration."""
516
name: str
517
type: str
518
description: Optional[str]
519
is_paused: bool
520
concurrency_limit: Optional[int]
521
default_queue_id: UUID
522
523
class WorkQueue:
524
"""Work queue within a work pool."""
525
id: UUID
526
name: str
527
description: Optional[str]
528
is_paused: bool
529
concurrency_limit: Optional[int]
530
priority: int
531
work_pool_id: UUID
532
533
# Schedule types
534
class CronSchedule:
535
"""Cron-based scheduling."""
536
cron: str
537
timezone: Optional[str]
538
day_or: bool
539
540
class IntervalSchedule:
541
"""Interval-based scheduling."""
542
interval: timedelta
543
anchor_date: Optional[datetime]
544
timezone: Optional[str]
545
546
class RRuleSchedule:
547
"""RRule-based scheduling."""
548
rrule: str
549
timezone: Optional[str]
550
```