CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-prefect

Workflow orchestration and management framework for building resilient data pipelines.

Pending
Overview
Eval results
Files

deployments.mddocs/

Deployments

Prefect's deployment system enables flows to be executed on remote infrastructure through work pools, scheduled runs, and deployment management. This provides the foundation for production workflow orchestration with infrastructure abstraction and scalable execution.

Capabilities

Flow Deployment

Deploy flows to work pools for remote execution with infrastructure management and scheduling capabilities.

def deploy(
    *flows: Flow,
    name: str = None,
    work_pool_name: str = None,
    image: Union[str, DeploymentImage] = None,
    build: bool = True,
    push: bool = True,
    print_next_steps: bool = True,
    ignore_warnings: bool = False,
    **kwargs,
) -> List[RunnerDeployment]:
    """
    Deploy flows to work pools for remote execution.
    
    Parameters:
    - flows: One or more flow objects to deploy
    - name: Name for the deployment (defaults to flow name)
    - work_pool_name: Target work pool for execution
    - image: Container image specification for deployment
    - build: Whether to build the deployment image
    - push: Whether to push the image to a registry
    - print_next_steps: Whether to print deployment instructions
    - ignore_warnings: Whether to ignore deployment warnings
    - **kwargs: Additional deployment configuration options
    
    Returns:
    List of created RunnerDeployment objects
    
    Raises:
    ValueError: If deployment configuration is invalid
    """

Usage Examples

from prefect import flow, deploy
from prefect.deployments.runner import RunnerDeployment

@flow
def data_pipeline():
    """ETL pipeline for data processing."""
    # Pipeline logic here
    pass

@flow  
def monitoring_flow():
    """Monitoring and alerting workflow."""
    # Monitoring logic here
    pass

# Basic deployment
deployments = deploy(
    data_pipeline,
    name="production-etl",
    work_pool_name="kubernetes-pool"
)

# Deployment with custom image
deployments = deploy(
    data_pipeline,
    monitoring_flow,
    name="data-workflows",
    work_pool_name="docker-pool",
    image="my-registry/prefect-workflows:v1.0.0",
    build=True,
    push=True
)

# Deployment with custom configuration
deployments = deploy(
    data_pipeline,
    name="scheduled-etl",
    work_pool_name="kubernetes-pool",
    schedule={"cron": "0 2 * * *"},  # Daily at 2 AM
    parameters={"env": "production"},
    tags=["production", "etl"]
)

Project Initialization

Initialize Prefect projects with configuration and deployment templates.

def initialize_project(
    name: str = None,
    recipe: str = None,
) -> None:
    """
    Initialize a new Prefect project with configuration templates.
    
    Parameters:
    - name: Name for the project (defaults to current directory name)
    - recipe: Template recipe to use for initialization
    
    Creates:
    - prefect.yaml: Project configuration file
    - .prefectignore: Files to ignore during deployment
    - flows/: Directory for flow definitions (optional)
    
    Raises:
    FileExistsError: If project files already exist
    """

Usage Examples

from prefect.deployments import initialize_project

# Initialize basic project
initialize_project(name="my-workflows")

# Initialize with Docker recipe
initialize_project(
    name="docker-workflows", 
    recipe="docker"
)

# Initialize with Kubernetes recipe
initialize_project(
    name="k8s-workflows",
    recipe="kubernetes"
)

Deployment Execution

Run deployed workflows on-demand or programmatically trigger execution.

def run_deployment(
    name: str,
    parameters: Dict[str, Any] = None,
    scheduled_time: datetime = None,
    flow_run_name: str = None,
    timeout: int = None,
    poll_interval: int = 10,
    tags: List[str] = None,
    idempotency_key: str = None,
    work_queue_name: str = None,
) -> FlowRun:
    """
    Run a deployment and return the flow run result.
    
    Parameters:
    - name: Name of the deployment to run
    - parameters: Parameters to pass to the flow
    - scheduled_time: When to schedule the run (defaults to now)
    - flow_run_name: Custom name for the flow run
    - timeout: Maximum time to wait for completion (seconds)
    - poll_interval: Polling interval for status updates (seconds)
    - tags: Tags to apply to the flow run
    - idempotency_key: Key to prevent duplicate runs
    - work_queue_name: Specific work queue to use
    
    Returns:
    FlowRun object with execution results
    
    Raises:
    TimeoutError: If execution exceeds timeout
    DeploymentNotFound: If deployment doesn't exist
    """

Usage Examples

from prefect.deployments.flow_runs import run_deployment
from datetime import datetime, timedelta

# Run deployment immediately
flow_run = run_deployment(
    "data-pipeline/production-etl",
    parameters={"source": "s3://data-bucket", "env": "prod"}
)

# Schedule for future execution
future_time = datetime.now() + timedelta(hours=2)
flow_run = run_deployment(
    "monitoring/health-check",
    scheduled_time=future_time,
    tags=["scheduled", "monitoring"]
)

# Run with custom configuration
flow_run = run_deployment(
    "etl-pipeline/daily-load",
    parameters={
        "batch_size": 1000,
        "parallel_workers": 4
    },
    flow_run_name="manual-daily-load-2024-01-15",
    timeout=3600,  # 1 hour timeout
    work_queue_name="high-priority"
)

print(f"Flow run completed: {flow_run.state}")

Runner Deployment Class

The RunnerDeployment class provides programmatic deployment management with full configuration control.

class RunnerDeployment:
    """
    Deployment configuration for runner-based execution.
    
    Attributes:
    - name: Deployment name
    - flow: Associated flow object
    - schedule: Scheduling configuration
    - parameters: Default flow parameters
    - tags: Deployment tags
    - description: Deployment description
    - version: Deployment version
    - work_pool_name: Target work pool
    - work_queue_name: Target work queue
    - job_variables: Job-specific variables
    """
    
    def __init__(
        self,
        name: str,
        flow: Flow,
        schedule: Union[CronSchedule, IntervalSchedule] = None,
        parameters: Dict[str, Any] = None,
        tags: List[str] = None,
        description: str = None,
        version: str = None,
        work_pool_name: str = None,
        work_queue_name: str = None,
        job_variables: Dict[str, Any] = None,
        enforce_parameter_schema: bool = None,
    ):
        """Initialize a runner deployment."""
    
    def serve(
        self,
        pause_on_shutdown: bool = True,
        print_starting_message: bool = True,
        limit: int = None,
        **kwargs
    ) -> None:
        """
        Serve the deployment for remote execution.
        
        Parameters:
        - pause_on_shutdown: Whether to pause on shutdown
        - print_starting_message: Whether to print startup message
        - limit: Maximum concurrent runs
        """
    
    @classmethod
    def from_flow(
        cls,
        flow: Flow,
        name: str = None,
        **kwargs
    ) -> "RunnerDeployment":
        """Create a deployment from a flow object."""
    
    def deploy(
        self,
        work_pool_name: str = None,
        image: Union[str, DeploymentImage] = None,
        **kwargs
    ) -> UUID:
        """
        Deploy to the Prefect server.
        
        Parameters:
        - work_pool_name: Target work pool
        - image: Container image specification
        
        Returns:
        UUID of the created deployment
        """

Usage Examples

from prefect import flow
from prefect.deployments.runner import RunnerDeployment
from prefect.client.schemas.schedules import CronSchedule

@flow
def etl_pipeline():
    # ETL logic here
    pass

# Create deployment programmatically
deployment = RunnerDeployment(
    name="etl-deployment",
    flow=etl_pipeline,
    schedule=CronSchedule(cron="0 2 * * *"),  # Daily at 2 AM
    parameters={"env": "production"},
    tags=["etl", "production"],
    description="Daily ETL pipeline",
    version="1.0.0",
    work_pool_name="kubernetes-pool"
)

# Deploy to server
deployment_id = deployment.deploy()

# Serve for local execution
deployment.serve(limit=5)

Deployment Management

Functions for managing existing deployments, including updates and deletion.

async def get_deployment(
    name: str,
    client: PrefectClient = None,
) -> Deployment:
    """
    Retrieve a deployment by name.
    
    Parameters:
    - name: Name of the deployment
    - client: Prefect client (defaults to current client)
    
    Returns:
    Deployment object
    """

async def update_deployment(
    name: str,
    schedule: Union[CronSchedule, IntervalSchedule] = None,
    parameters: Dict[str, Any] = None,
    tags: List[str] = None,
    description: str = None,
    version: str = None,
    work_pool_name: str = None,
    client: PrefectClient = None,
) -> Deployment:
    """
    Update an existing deployment.
    
    Parameters:
    - name: Name of the deployment to update
    - schedule: New scheduling configuration
    - parameters: New default parameters
    - tags: New tags
    - description: New description
    - version: New version
    - work_pool_name: New work pool
    - client: Prefect client (defaults to current client)
    
    Returns:
    Updated deployment object
    """

async def delete_deployment(
    name: str,
    client: PrefectClient = None,
) -> bool:
    """
    Delete a deployment.
    
    Parameters:
    - name: Name of the deployment to delete
    - client: Prefect client (defaults to current client)
    
    Returns:
    True if deletion was successful
    """

Usage Examples

from prefect.deployments.base import get_deployment, update_deployment, delete_deployment
from prefect.client.schemas.schedules import IntervalSchedule
from datetime import timedelta

# Get existing deployment
deployment = await get_deployment("data-pipeline/production-etl")

# Update deployment schedule
updated_deployment = await update_deployment(
    "data-pipeline/production-etl",
    schedule=IntervalSchedule(interval=timedelta(hours=6)),
    parameters={"batch_size": 2000},
    tags=["production", "etl", "updated"]
)

# Delete deployment
success = await delete_deployment("old-deployment")

Work Pool Integration

Integration with work pools for infrastructure-aware deployment execution.

class WorkPoolJobConfiguration:
    """Configuration for jobs running in work pools."""
    
    def __init__(
        self,
        command: List[str] = None,
        env: Dict[str, str] = None,
        labels: Dict[str, str] = None,
        name: str = None,
        **kwargs
    ):
        """Initialize work pool job configuration."""

async def get_work_pool(
    work_pool_name: str,
    client: PrefectClient = None,
) -> WorkPool:
    """
    Retrieve work pool information.
    
    Parameters:
    - work_pool_name: Name of the work pool
    - client: Prefect client (defaults to current client)
    
    Returns:
    WorkPool object with configuration details
    """

async def create_work_queue(
    work_pool_name: str,
    work_queue_name: str,
    description: str = None,
    is_paused: bool = False,
    concurrency_limit: int = None,
    priority: int = None,
    client: PrefectClient = None,
) -> WorkQueue:
    """
    Create a work queue within a work pool.
    
    Parameters:
    - work_pool_name: Target work pool name
    - work_queue_name: Name for the new work queue
    - description: Queue description
    - is_paused: Whether to start the queue paused
    - concurrency_limit: Maximum concurrent jobs
    - priority: Queue priority for job assignment
    - client: Prefect client (defaults to current client)
    
    Returns:
    Created WorkQueue object
    """

Types

Types related to deployment functionality:

from typing import Any, Dict, List, Optional, Union
from datetime import datetime
from uuid import UUID
from enum import Enum

class Deployment:
    """Deployment configuration object."""
    id: UUID
    name: str
    flow_id: UUID
    schedule: Optional[Union[CronSchedule, IntervalSchedule]]
    parameters: Dict[str, Any]
    tags: List[str]
    description: Optional[str]
    version: Optional[str]
    work_pool_name: Optional[str]
    work_queue_name: Optional[str]
    created: datetime
    updated: datetime

class FlowRun:
    """Flow run result object."""
    id: UUID
    name: str
    flow_id: UUID
    deployment_id: Optional[UUID]
    state: State
    parameters: Dict[str, Any]
    tags: List[str]
    created: datetime
    expected_start_time: datetime
    start_time: Optional[datetime]
    end_time: Optional[datetime]

class DeploymentImage:
    """Container image specification for deployments."""
    name: str
    tag: str
    dockerfile: Optional[str]
    buildargs: Optional[Dict[str, str]]
    
class WorkPool:
    """Work pool configuration."""
    name: str
    type: str
    description: Optional[str]
    is_paused: bool
    concurrency_limit: Optional[int]
    default_queue_id: UUID

class WorkQueue:
    """Work queue within a work pool."""
    id: UUID
    name: str
    description: Optional[str]
    is_paused: bool
    concurrency_limit: Optional[int]
    priority: int
    work_pool_id: UUID

# Schedule types
class CronSchedule:
    """Cron-based scheduling."""
    cron: str
    timezone: Optional[str]
    day_or: bool

class IntervalSchedule:
    """Interval-based scheduling."""
    interval: timedelta
    anchor_date: Optional[datetime]
    timezone: Optional[str]

class RRuleSchedule:
    """RRule-based scheduling."""
    rrule: str
    timezone: Optional[str]

Install with Tessl CLI

npx tessl i tessl/pypi-prefect

docs

client-api.md

configuration.md

context-utilities.md

core-workflows.md

deployments.md

index.md

runtime-context.md

state-management.md

variables.md

tile.json