Workflow orchestration and management framework for building resilient data pipelines.
—
Prefect's deployment system enables flows to be executed on remote infrastructure through work pools, scheduled runs, and deployment management. This provides the foundation for production workflow orchestration with infrastructure abstraction and scalable execution.
Deploy flows to work pools for remote execution with infrastructure management and scheduling capabilities.
def deploy(
*flows: Flow,
name: str = None,
work_pool_name: str = None,
image: Union[str, DeploymentImage] = None,
build: bool = True,
push: bool = True,
print_next_steps: bool = True,
ignore_warnings: bool = False,
**kwargs,
) -> List[RunnerDeployment]:
"""
Deploy flows to work pools for remote execution.
Parameters:
- flows: One or more flow objects to deploy
- name: Name for the deployment (defaults to flow name)
- work_pool_name: Target work pool for execution
- image: Container image specification for deployment
- build: Whether to build the deployment image
- push: Whether to push the image to a registry
- print_next_steps: Whether to print deployment instructions
- ignore_warnings: Whether to ignore deployment warnings
- **kwargs: Additional deployment configuration options
Returns:
List of created RunnerDeployment objects
Raises:
ValueError: If deployment configuration is invalid
"""from prefect import flow, deploy
from prefect.deployments.runner import RunnerDeployment
@flow
def data_pipeline():
"""ETL pipeline for data processing."""
# Pipeline logic here
pass
@flow
def monitoring_flow():
"""Monitoring and alerting workflow."""
# Monitoring logic here
pass
# Basic deployment
deployments = deploy(
data_pipeline,
name="production-etl",
work_pool_name="kubernetes-pool"
)
# Deployment with custom image
deployments = deploy(
data_pipeline,
monitoring_flow,
name="data-workflows",
work_pool_name="docker-pool",
image="my-registry/prefect-workflows:v1.0.0",
build=True,
push=True
)
# Deployment with custom configuration
deployments = deploy(
data_pipeline,
name="scheduled-etl",
work_pool_name="kubernetes-pool",
schedule={"cron": "0 2 * * *"}, # Daily at 2 AM
parameters={"env": "production"},
tags=["production", "etl"]
)Initialize Prefect projects with configuration and deployment templates.
def initialize_project(
name: str = None,
recipe: str = None,
) -> None:
"""
Initialize a new Prefect project with configuration templates.
Parameters:
- name: Name for the project (defaults to current directory name)
- recipe: Template recipe to use for initialization
Creates:
- prefect.yaml: Project configuration file
- .prefectignore: Files to ignore during deployment
- flows/: Directory for flow definitions (optional)
Raises:
FileExistsError: If project files already exist
"""from prefect.deployments import initialize_project
# Initialize basic project
initialize_project(name="my-workflows")
# Initialize with Docker recipe
initialize_project(
name="docker-workflows",
recipe="docker"
)
# Initialize with Kubernetes recipe
initialize_project(
name="k8s-workflows",
recipe="kubernetes"
)Run deployed workflows on-demand or programmatically trigger execution.
def run_deployment(
name: str,
parameters: Dict[str, Any] = None,
scheduled_time: datetime = None,
flow_run_name: str = None,
timeout: int = None,
poll_interval: int = 10,
tags: List[str] = None,
idempotency_key: str = None,
work_queue_name: str = None,
) -> FlowRun:
"""
Run a deployment and return the flow run result.
Parameters:
- name: Name of the deployment to run
- parameters: Parameters to pass to the flow
- scheduled_time: When to schedule the run (defaults to now)
- flow_run_name: Custom name for the flow run
- timeout: Maximum time to wait for completion (seconds)
- poll_interval: Polling interval for status updates (seconds)
- tags: Tags to apply to the flow run
- idempotency_key: Key to prevent duplicate runs
- work_queue_name: Specific work queue to use
Returns:
FlowRun object with execution results
Raises:
TimeoutError: If execution exceeds timeout
DeploymentNotFound: If deployment doesn't exist
"""from prefect.deployments.flow_runs import run_deployment
from datetime import datetime, timedelta
# Run deployment immediately
flow_run = run_deployment(
"data-pipeline/production-etl",
parameters={"source": "s3://data-bucket", "env": "prod"}
)
# Schedule for future execution
future_time = datetime.now() + timedelta(hours=2)
flow_run = run_deployment(
"monitoring/health-check",
scheduled_time=future_time,
tags=["scheduled", "monitoring"]
)
# Run with custom configuration
flow_run = run_deployment(
"etl-pipeline/daily-load",
parameters={
"batch_size": 1000,
"parallel_workers": 4
},
flow_run_name="manual-daily-load-2024-01-15",
timeout=3600, # 1 hour timeout
work_queue_name="high-priority"
)
print(f"Flow run completed: {flow_run.state}")The RunnerDeployment class provides programmatic deployment management with full configuration control.
class RunnerDeployment:
"""
Deployment configuration for runner-based execution.
Attributes:
- name: Deployment name
- flow: Associated flow object
- schedule: Scheduling configuration
- parameters: Default flow parameters
- tags: Deployment tags
- description: Deployment description
- version: Deployment version
- work_pool_name: Target work pool
- work_queue_name: Target work queue
- job_variables: Job-specific variables
"""
def __init__(
self,
name: str,
flow: Flow,
schedule: Union[CronSchedule, IntervalSchedule] = None,
parameters: Dict[str, Any] = None,
tags: List[str] = None,
description: str = None,
version: str = None,
work_pool_name: str = None,
work_queue_name: str = None,
job_variables: Dict[str, Any] = None,
enforce_parameter_schema: bool = None,
):
"""Initialize a runner deployment."""
def serve(
self,
pause_on_shutdown: bool = True,
print_starting_message: bool = True,
limit: int = None,
**kwargs
) -> None:
"""
Serve the deployment for remote execution.
Parameters:
- pause_on_shutdown: Whether to pause on shutdown
- print_starting_message: Whether to print startup message
- limit: Maximum concurrent runs
"""
@classmethod
def from_flow(
cls,
flow: Flow,
name: str = None,
**kwargs
) -> "RunnerDeployment":
"""Create a deployment from a flow object."""
def deploy(
self,
work_pool_name: str = None,
image: Union[str, DeploymentImage] = None,
**kwargs
) -> UUID:
"""
Deploy to the Prefect server.
Parameters:
- work_pool_name: Target work pool
- image: Container image specification
Returns:
UUID of the created deployment
"""from prefect import flow
from prefect.deployments.runner import RunnerDeployment
from prefect.client.schemas.schedules import CronSchedule
@flow
def etl_pipeline():
# ETL logic here
pass
# Create deployment programmatically
deployment = RunnerDeployment(
name="etl-deployment",
flow=etl_pipeline,
schedule=CronSchedule(cron="0 2 * * *"), # Daily at 2 AM
parameters={"env": "production"},
tags=["etl", "production"],
description="Daily ETL pipeline",
version="1.0.0",
work_pool_name="kubernetes-pool"
)
# Deploy to server
deployment_id = deployment.deploy()
# Serve for local execution
deployment.serve(limit=5)Functions for managing existing deployments, including updates and deletion.
async def get_deployment(
name: str,
client: PrefectClient = None,
) -> Deployment:
"""
Retrieve a deployment by name.
Parameters:
- name: Name of the deployment
- client: Prefect client (defaults to current client)
Returns:
Deployment object
"""
async def update_deployment(
name: str,
schedule: Union[CronSchedule, IntervalSchedule] = None,
parameters: Dict[str, Any] = None,
tags: List[str] = None,
description: str = None,
version: str = None,
work_pool_name: str = None,
client: PrefectClient = None,
) -> Deployment:
"""
Update an existing deployment.
Parameters:
- name: Name of the deployment to update
- schedule: New scheduling configuration
- parameters: New default parameters
- tags: New tags
- description: New description
- version: New version
- work_pool_name: New work pool
- client: Prefect client (defaults to current client)
Returns:
Updated deployment object
"""
async def delete_deployment(
name: str,
client: PrefectClient = None,
) -> bool:
"""
Delete a deployment.
Parameters:
- name: Name of the deployment to delete
- client: Prefect client (defaults to current client)
Returns:
True if deletion was successful
"""from prefect.deployments.base import get_deployment, update_deployment, delete_deployment
from prefect.client.schemas.schedules import IntervalSchedule
from datetime import timedelta
# Get existing deployment
deployment = await get_deployment("data-pipeline/production-etl")
# Update deployment schedule
updated_deployment = await update_deployment(
"data-pipeline/production-etl",
schedule=IntervalSchedule(interval=timedelta(hours=6)),
parameters={"batch_size": 2000},
tags=["production", "etl", "updated"]
)
# Delete deployment
success = await delete_deployment("old-deployment")Integration with work pools for infrastructure-aware deployment execution.
class WorkPoolJobConfiguration:
"""Configuration for jobs running in work pools."""
def __init__(
self,
command: List[str] = None,
env: Dict[str, str] = None,
labels: Dict[str, str] = None,
name: str = None,
**kwargs
):
"""Initialize work pool job configuration."""
async def get_work_pool(
work_pool_name: str,
client: PrefectClient = None,
) -> WorkPool:
"""
Retrieve work pool information.
Parameters:
- work_pool_name: Name of the work pool
- client: Prefect client (defaults to current client)
Returns:
WorkPool object with configuration details
"""
async def create_work_queue(
work_pool_name: str,
work_queue_name: str,
description: str = None,
is_paused: bool = False,
concurrency_limit: int = None,
priority: int = None,
client: PrefectClient = None,
) -> WorkQueue:
"""
Create a work queue within a work pool.
Parameters:
- work_pool_name: Target work pool name
- work_queue_name: Name for the new work queue
- description: Queue description
- is_paused: Whether to start the queue paused
- concurrency_limit: Maximum concurrent jobs
- priority: Queue priority for job assignment
- client: Prefect client (defaults to current client)
Returns:
Created WorkQueue object
"""Types related to deployment functionality:
from typing import Any, Dict, List, Optional, Union
from datetime import datetime
from uuid import UUID
from enum import Enum
class Deployment:
"""Deployment configuration object."""
id: UUID
name: str
flow_id: UUID
schedule: Optional[Union[CronSchedule, IntervalSchedule]]
parameters: Dict[str, Any]
tags: List[str]
description: Optional[str]
version: Optional[str]
work_pool_name: Optional[str]
work_queue_name: Optional[str]
created: datetime
updated: datetime
class FlowRun:
"""Flow run result object."""
id: UUID
name: str
flow_id: UUID
deployment_id: Optional[UUID]
state: State
parameters: Dict[str, Any]
tags: List[str]
created: datetime
expected_start_time: datetime
start_time: Optional[datetime]
end_time: Optional[datetime]
class DeploymentImage:
"""Container image specification for deployments."""
name: str
tag: str
dockerfile: Optional[str]
buildargs: Optional[Dict[str, str]]
class WorkPool:
"""Work pool configuration."""
name: str
type: str
description: Optional[str]
is_paused: bool
concurrency_limit: Optional[int]
default_queue_id: UUID
class WorkQueue:
"""Work queue within a work pool."""
id: UUID
name: str
description: Optional[str]
is_paused: bool
concurrency_limit: Optional[int]
priority: int
work_pool_id: UUID
# Schedule types
class CronSchedule:
"""Cron-based scheduling."""
cron: str
timezone: Optional[str]
day_or: bool
class IntervalSchedule:
"""Interval-based scheduling."""
interval: timedelta
anchor_date: Optional[datetime]
timezone: Optional[str]
class RRuleSchedule:
"""RRule-based scheduling."""
rrule: str
timezone: Optional[str]Install with Tessl CLI
npx tessl i tessl/pypi-prefect